diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 9b82a06..b382ec7 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -15,6 +15,7 @@ private struct ProcessInfo { int exitCode; ProcessWaitCallback[] callbacks; size_t refCount = 0; + void delegate(int pid) shared nothrow onExit; EventDriverProcesses driver; DataInitializer userDataDestructor; @@ -24,10 +25,12 @@ private struct ProcessInfo { private struct StaticProcesses { @safe: nothrow: import core.sync.mutex : Mutex; + import core.thread : Thread; private { static shared Mutex m_mutex; static __gshared ProcessInfo[ProcessID] m_processes; + static __gshared Thread m_waitThread; } shared static this() @@ -39,9 +42,84 @@ private struct StaticProcesses { m_mutex.lock_nothrow(); scope (exit) m_mutex.unlock_nothrow(); + if (!m_waitThread) { + m_waitThread = new Thread(&waitForProcesses); + m_waitThread.start(); + } + assert(pid !in m_processes, "Process adopted twice"); m_processes[pid] = info; } + + private static void waitForProcesses() + @system { + import core.stdc.errno : ECHILD, errno; + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; + + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + m_waitThread = null; + } + break; + } + + ProcessID[] allprocs; + + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + + () @trusted { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + foreach (ref entry; StaticProcesses.m_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; + } + } (); + } + + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExit(ret, exitstatus); + } + } + } + } + + private static void onProcessExit(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; + + ProcessWaitCallback[] callbacks; + auto onexit = lockedProcessInfo!((info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) { + return null; + } + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exit_status; + return info.onExit; + })(pid); + + onexit(cast(int)pid); + } } private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { @@ -63,8 +141,8 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce private { Loop m_loop; + // FIXME: avoid virtual funciton calls and use the final type instead EventDriver m_driver; - SignalListenID m_sighandle; } this(Loop loop, EventDriver driver) @@ -73,45 +151,24 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce m_loop = loop; m_driver = driver; - - // Listen for child process exits using SIGCHLD - m_sighandle = () @trusted { - sigset_t sset; - sigemptyset(&sset); - sigaddset(&sset, SIGCHLD); - - assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); - - return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); - } (); - - m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); - m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); - m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); - - onSignal(cast(FD)m_sighandle); } void dispose() { - FD sighandle = cast(FD)m_sighandle; - m_loop.m_fds[sighandle].common.refCount--; - m_loop.setNotifyCallback!(EventType.read)(sighandle, null); - m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!(SignalSlot)(sighandle); - close(cast(int)sighandle); } final override ProcessID adopt(int system_pid) { auto pid = cast(ProcessID)system_pid; + auto sthis = () @trusted { return cast(shared)this; } (); + ProcessInfo info; info.exited = false; info.refCount = 1; + info.onExit = () @trusted { return &sthis.onProcessExit; } (); info.driver = this; StaticProcesses.add(pid, info); - return pid; } @@ -258,66 +315,9 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce })(pid); } - private void onSignal(FD fd) - { - import core.sys.posix.sys.wait : WNOHANG, WEXITSTATUS, waitpid; - - SignalListenID lid = cast(SignalListenID)fd; - - // drain the signalfd - note that multiple signals can be combined into - // a single value, so that individual siginfo results are useless for - // us - signalfd_siginfo nfo; - while (() @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } () == nfo.sizeof) - { - } - - // instead, use waitpid to determine all exited processes - ProcessID[] allprocs; - () @trusted { - try synchronized (StaticProcesses.m_mutex) - allprocs = StaticProcesses.m_processes.keys; - catch (Exception e) assert(false, e.msg); - } (); - - foreach (pid; allprocs) { - int status; - auto ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) - onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ()); - } - } - - private void onProcessExit(int system_pid, int exitCode) - { - auto pid = cast(ProcessID)system_pid; - - ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } - - // Increment the ref count to make sure it doesn't get removed - info.refCount++; - - info.exited = true; - info.exitCode = exitCode; - return info.driver; - })(pid); - - // Need to call callbacks in the owner thread as this function can be - // called from any thread. Without extra threads this is always the main - // thread. - if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { - onLocalProcessExit(cast(intptr_t)pid); - } else if (driver) { - auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); - - sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); - } + private void onProcessExit(int system_pid) + shared { + m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); } private static void onLocalProcessExit(intptr_t system_pid)