Rework the child process exit code to not rely in SIGCHLD.

It turns out that in a heterogeneous process where other parts of the code may start processes or threads and may be waiting for those to finish, it is not realistic to rely on signalfd or even SIGCHLD in general to get notified about child process exits. The only solid way appears to be to start a separate waiter thread that uses waitid/waitpid to wait for exited child processes in a blocking way.

This also fixes the hanging vibe.core.process test in vibe-core with DMD 2.087.x.
This commit is contained in:
Sönke Ludwig 2019-08-22 12:32:54 +02:00 committed by Sönke Ludwig
parent 1ef320c329
commit 7ebad49ed0

View file

@ -15,6 +15,7 @@ private struct ProcessInfo {
int exitCode; int exitCode;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
size_t refCount = 0; size_t refCount = 0;
void delegate(int pid) shared nothrow onExit;
EventDriverProcesses driver; EventDriverProcesses driver;
DataInitializer userDataDestructor; DataInitializer userDataDestructor;
@ -24,10 +25,12 @@ private struct ProcessInfo {
private struct StaticProcesses { private struct StaticProcesses {
@safe: nothrow: @safe: nothrow:
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
import core.thread : Thread;
private { private {
static shared Mutex m_mutex; static shared Mutex m_mutex;
static __gshared ProcessInfo[ProcessID] m_processes; static __gshared ProcessInfo[ProcessID] m_processes;
static __gshared Thread m_waitThread;
} }
shared static this() shared static this()
@ -39,9 +42,84 @@ private struct StaticProcesses {
m_mutex.lock_nothrow(); m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow(); scope (exit) m_mutex.unlock_nothrow();
if (!m_waitThread) {
m_waitThread = new Thread(&waitForProcesses);
m_waitThread.start();
}
assert(pid !in m_processes, "Process adopted twice"); assert(pid !in m_processes, "Process adopted twice");
m_processes[pid] = info; m_processes[pid] = info;
} }
private static void waitForProcesses()
@system {
import core.stdc.errno : ECHILD, errno;
import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid;
import core.sys.posix.signal : siginfo_t;
while (true) {
siginfo_t dummy;
auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT);
if (ret == -1) {
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
m_waitThread = null;
}
break;
}
ProcessID[] allprocs;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
() @trusted {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
foreach (ref entry; StaticProcesses.m_processes.byKeyValue) {
if (!entry.value.exited)
allprocs ~= entry.key;
}
} ();
}
foreach (pid; allprocs) {
int status;
ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
if (ret == cast(int)pid) {
int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
onProcessExit(ret, exitstatus);
}
}
}
}
private static void onProcessExit(int system_pid, int exit_status)
{
auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks;
auto onexit = lockedProcessInfo!((info) @safe {
// We get notified of any child exiting, so ignore the ones we're
// not aware of
if (info is null) {
return null;
}
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exit_status;
return info.onExit;
})(pid);
onexit(cast(int)pid);
}
} }
private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted {
@ -63,8 +141,8 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
private { private {
Loop m_loop; Loop m_loop;
// FIXME: avoid virtual funciton calls and use the final type instead
EventDriver m_driver; EventDriver m_driver;
SignalListenID m_sighandle;
} }
this(Loop loop, EventDriver driver) this(Loop loop, EventDriver driver)
@ -73,45 +151,24 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
m_loop = loop; m_loop = loop;
m_driver = driver; m_driver = driver;
// Listen for child process exits using SIGCHLD
m_sighandle = () @trusted {
sigset_t sset;
sigemptyset(&sset);
sigaddset(&sset, SIGCHLD);
assert(sigprocmask(SIG_BLOCK, &sset, null) == 0);
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC));
} ();
m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null));
m_loop.registerFD(cast(FD)m_sighandle, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal);
onSignal(cast(FD)m_sighandle);
} }
void dispose() void dispose()
{ {
FD sighandle = cast(FD)m_sighandle;
m_loop.m_fds[sighandle].common.refCount--;
m_loop.setNotifyCallback!(EventType.read)(sighandle, null);
m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!(SignalSlot)(sighandle);
close(cast(int)sighandle);
} }
final override ProcessID adopt(int system_pid) final override ProcessID adopt(int system_pid)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
auto sthis = () @trusted { return cast(shared)this; } ();
ProcessInfo info; ProcessInfo info;
info.exited = false; info.exited = false;
info.refCount = 1; info.refCount = 1;
info.onExit = () @trusted { return &sthis.onProcessExit; } ();
info.driver = this; info.driver = this;
StaticProcesses.add(pid, info); StaticProcesses.add(pid, info);
return pid; return pid;
} }
@ -258,66 +315,9 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
})(pid); })(pid);
} }
private void onSignal(FD fd) private void onProcessExit(int system_pid)
{ shared {
import core.sys.posix.sys.wait : WNOHANG, WEXITSTATUS, waitpid; m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid);
SignalListenID lid = cast(SignalListenID)fd;
// drain the signalfd - note that multiple signals can be combined into
// a single value, so that individual siginfo results are useless for
// us
signalfd_siginfo nfo;
while (() @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } () == nfo.sizeof)
{
}
// instead, use waitpid to determine all exited processes
ProcessID[] allprocs;
() @trusted {
try synchronized (StaticProcesses.m_mutex)
allprocs = StaticProcesses.m_processes.keys;
catch (Exception e) assert(false, e.msg);
} ();
foreach (pid; allprocs) {
int status;
auto ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
if (ret == cast(int)pid)
onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ());
}
}
private void onProcessExit(int system_pid, int exitCode)
{
auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) @safe {
// We get notified of any child exiting, so ignore the ones we're
// not aware of
if (info is null) {
return null;
}
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exitCode;
return info.driver;
})(pid);
// Need to call callbacks in the owner thread as this function can be
// called from any thread. Without extra threads this is always the main
// thread.
if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) {
onLocalProcessExit(cast(intptr_t)pid);
} else if (driver) {
auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } ();
sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid);
}
} }
private static void onLocalProcessExit(intptr_t system_pid) private static void onLocalProcessExit(intptr_t system_pid)