Skip to content

Commit 31d8b70

Browse files
committed
Updates
1 parent 8ac48fa commit 31d8b70

File tree

4 files changed

+109
-68
lines changed

4 files changed

+109
-68
lines changed

src/Context/ForkContext.php

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
/**
1414
* USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its
15-
* use must be purposeful and situational.
15+
* creation must be purposeful and situational.
1616
*
1717
* Forking is not recommended at arbitrary points in an application since the entire state of the parent process is
1818
* inherited into the child process, including the event-loop!
@@ -83,7 +83,9 @@ public static function start(
8383
exit(0);
8484
}
8585

86-
private bool $exited = false;
86+
private ?int $exited = null;
87+
88+
private bool $weKilled = false;
8789

8890
/**
8991
* @param StreamChannel<TReceive, TSend> $ipcChannel
@@ -101,21 +103,61 @@ public function __destruct()
101103
$this->close();
102104
}
103105

106+
public function receive(?Cancellation $cancellation = null): mixed
107+
{
108+
$this->checkExit(false);
109+
110+
return parent::receive($cancellation);
111+
}
112+
113+
public function send(mixed $data): void
114+
{
115+
$this->checkExit(false);
116+
117+
parent::send($data);
118+
}
119+
120+
private function checkExit(bool $wait): ?int
121+
{
122+
if ($this->exited === null) {
123+
if (\pcntl_waitpid($this->pid, $status, $wait ? 0 : \WNOHANG) === 0) {
124+
return null;
125+
}
126+
127+
$this->exited = match (true) {
128+
\pcntl_wifsignaled($status) => \pcntl_wtermsig($status),
129+
\pcntl_wifexited($status) => \pcntl_wexitstatus($status) - 128,
130+
\pcntl_wifstopped($status) => \pcntl_wstopsig($status),
131+
default => -1,
132+
};
133+
}
134+
135+
if (!$this->weKilled && $this->exited > 0) {
136+
throw new ContextException("Worker exited due to signal {$this->exited}", $this->exited);
137+
}
138+
139+
return $this->exited;
140+
}
141+
104142
public function close(): void
105143
{
106144
if (!$this->exited) {
145+
$this->weKilled = true;
107146
\posix_kill($this->pid, \SIGKILL);
108-
$this->exited = true;
147+
148+
$this->checkExit(true);
109149
}
110150

111151
parent::close();
112152
}
113153

114154
public function join(?Cancellation $cancellation = null): mixed
115155
{
116-
$data = $this->receiveExitResult($cancellation);
117-
118-
$this->close();
156+
try {
157+
$data = $this->receiveExitResult($cancellation);
158+
} finally {
159+
$this->close();
160+
}
119161

120162
return $data->getResult();
121163
}

src/Context/Internal/functions.php

Lines changed: 55 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use Amp\Serialization\NativeSerializer;
1010
use Amp\Serialization\SerializationException;
1111
use Amp\Serialization\Serializer;
12-
use Revolt\EventLoop;
1312

1413
/** @internal */
1514
function runContext(
@@ -19,73 +18,69 @@ function runContext(
1918
array $argv,
2019
Serializer $serializer = new NativeSerializer(),
2120
): void {
22-
EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation, $serializer): void {
23-
/** @noinspection PhpUnusedLocalVariableInspection */
24-
$argc = \count($argv);
21+
/** @noinspection PhpUnusedLocalVariableInspection */
22+
$argc = \count($argv);
2523

26-
try {
27-
$socket = Ipc\connect($uri, $key, $connectCancellation);
28-
$ipcChannel = new StreamChannel($socket, $socket, $serializer);
29-
30-
$socket = Ipc\connect($uri, $key, $connectCancellation);
31-
$resultChannel = new StreamChannel($socket, $socket, $serializer);
32-
} catch (\Throwable $exception) {
33-
\trigger_error($exception->getMessage(), E_USER_ERROR);
34-
}
35-
36-
try {
37-
if (!isset($argv[0])) {
38-
throw new \Error("No script path given");
39-
}
24+
try {
25+
$socket = Ipc\connect($uri, $key, $connectCancellation);
26+
$ipcChannel = new StreamChannel($socket, $socket, $serializer);
4027

41-
if (!\is_file($argv[0])) {
42-
throw new \Error(\sprintf(
43-
"No script found at '%s' (be sure to provide the full path to the script)",
44-
$argv[0],
45-
));
46-
}
28+
$socket = Ipc\connect($uri, $key, $connectCancellation);
29+
$resultChannel = new StreamChannel($socket, $socket, $serializer);
30+
} catch (\Throwable $exception) {
31+
\trigger_error($exception->getMessage(), E_USER_ERROR);
32+
}
4733

48-
try {
49-
// Protect current scope by requiring script within another function.
50-
// Using $argc, so it is available to the required script.
51-
$callable = (function () use ($argc, $argv): callable {
52-
/** @psalm-suppress UnresolvableInclude */
53-
return require $argv[0];
54-
})();
55-
} catch (\TypeError $exception) {
56-
throw new \Error(\sprintf(
57-
"Script '%s' did not return a callable function: %s",
58-
$argv[0],
59-
$exception->getMessage(),
60-
), 0, $exception);
61-
} catch (\ParseError $exception) {
62-
throw new \Error(\sprintf(
63-
"Script '%s' contains a parse error: %s",
64-
$argv[0],
65-
$exception->getMessage(),
66-
), 0, $exception);
67-
}
34+
try {
35+
if (!isset($argv[0])) {
36+
throw new \Error("No script path given");
37+
}
6838

69-
$returnValue = $callable(new ContextChannel($ipcChannel));
70-
$result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue);
71-
} catch (\Throwable $exception) {
72-
$result = new ExitFailure($exception);
39+
if (!\is_file($argv[0])) {
40+
throw new \Error(\sprintf(
41+
"No script found at '%s' (be sure to provide the full path to the script)",
42+
$argv[0],
43+
));
7344
}
7445

7546
try {
76-
try {
77-
$resultChannel->send($result);
78-
} catch (SerializationException $exception) {
79-
// Serializing the result failed. Send the reason why.
80-
$resultChannel->send(new ExitFailure($exception));
81-
}
82-
} catch (\Throwable $exception) {
83-
\trigger_error(\sprintf(
84-
"Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent",
47+
// Protect current scope by requiring script within another function.
48+
// Using $argc, so it is available to the required script.
49+
$callable = (function () use ($argc, $argv): callable {
50+
/** @psalm-suppress UnresolvableInclude */
51+
return require $argv[0];
52+
})();
53+
} catch (\TypeError $exception) {
54+
throw new \Error(\sprintf(
55+
"Script '%s' did not return a callable function: %s",
56+
$argv[0],
57+
$exception->getMessage(),
58+
), 0, $exception);
59+
} catch (\ParseError $exception) {
60+
throw new \Error(\sprintf(
61+
"Script '%s' contains a parse error: %s",
62+
$argv[0],
8563
$exception->getMessage(),
86-
), E_USER_ERROR);
64+
), 0, $exception);
8765
}
88-
});
8966

90-
EventLoop::run();
67+
$returnValue = $callable(new ContextChannel($ipcChannel));
68+
$result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue);
69+
} catch (\Throwable $exception) {
70+
$result = new ExitFailure($exception);
71+
}
72+
73+
try {
74+
try {
75+
$resultChannel->send($result);
76+
} catch (SerializationException $exception) {
77+
// Serializing the result failed. Send the reason why.
78+
$resultChannel->send(new ExitFailure($exception));
79+
}
80+
} catch (\Throwable $exception) {
81+
\trigger_error(\sprintf(
82+
"Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent",
83+
$exception->getMessage(),
84+
), E_USER_ERROR);
85+
}
9186
}

src/Context/Internal/process-runner.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,7 @@
8585
\trigger_error($exception->getMessage(), E_USER_ERROR);
8686
}
8787

88-
runContext($uri, $key, $cancellation, $argv);
88+
EventLoop::queue(runContext(...), $uri, $key, $cancellation, $argv);
89+
90+
EventLoop::run();
8991
})();

src/Context/ThreadContext.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ public static function start(
104104
// such as select() will not be interrupted.
105105
}));
106106

107-
Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv);
107+
EventLoop::queue(Internal\runContext(...), $uri, $key, new TimeoutCancellation($connectTimeout), $argv);
108+
109+
EventLoop::run();
108110

109111
return 0;
110112
// @codeCoverageIgnoreEnd

0 commit comments

Comments
 (0)