diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index cdf24e9..d2ac0cf 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -10,125 +10,6 @@ import std.algorithm.comparison : among; import std.variant : visit; import std.stdint; -private struct ProcessInfo { - bool exited = true; - int exitCode; - ProcessWaitCallback[] callbacks; - size_t refCount = 0; - void delegate(int pid) shared nothrow onExit; - EventDriverProcesses driver; - - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; -} - -private struct StaticProcesses { -@safe: nothrow: - import core.sync.mutex : Mutex; - import core.thread : Thread; - - private { - static shared Mutex m_mutex; - static __gshared ProcessInfo[ProcessID] m_processes; - static __gshared Thread m_waitThread; - } - - shared static this() - { - m_mutex = new shared Mutex; - } - - static void add(ProcessID pid, ProcessInfo info) @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!m_waitThread) { - m_waitThread = new Thread(&waitForProcesses); - m_waitThread.start(); - } - - assert(pid !in m_processes, "Process adopted twice"); - m_processes[pid] = info; - } - - private static void waitForProcesses() - @system { - import core.stdc.errno : ECHILD, errno; - import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; - import core.sys.posix.signal : siginfo_t; - - while (true) { - siginfo_t dummy; - auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); - if (ret == -1) { - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - m_waitThread = null; - } - break; - } - - ProcessID[] allprocs; - - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - - () @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - foreach (ref entry; StaticProcesses.m_processes.byKeyValue) { - if (!entry.value.exited) - allprocs ~= entry.key; - } - } (); - } - - foreach (pid; allprocs) { - int status; - ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) { - int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); - onProcessExit(ret, exitstatus); - } - } - } - } - - private static void onProcessExit(int system_pid, int exit_status) - { - auto pid = cast(ProcessID)system_pid; - - ProcessWaitCallback[] callbacks; - auto onexit = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } - - // Increment the ref count to make sure it doesn't get removed - info.refCount++; - - info.exited = true; - info.exitCode = exit_status; - return info.onExit; - })(pid); - - onexit(cast(int)pid); - } -} - -private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { - StaticProcesses.m_mutex.lock_nothrow(); - scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); - auto info = pid in StaticProcesses.m_processes; - - return fn(info); -} private enum SIGCHLD = 17; @@ -136,10 +17,16 @@ private enum SIGCHLD = 17; final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.sync.mutex : Mutex; import core.sys.linux.sys.signalfd; import core.sys.posix.unistd : close, read, write, dup; + import core.thread : Thread; private { + static shared Mutex s_mutex; + static __gshared ProcessInfo[ProcessID] s_processes; + static __gshared Thread s_waitThread; + Loop m_loop; // FIXME: avoid virtual funciton calls and use the final type instead EventDriver m_driver; @@ -161,14 +48,11 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { auto pid = cast(ProcessID)system_pid; - auto sthis = () @trusted { return cast(shared)this; } (); - ProcessInfo info; info.exited = false; info.refCount = 1; - info.onExit = () @trusted { return &sthis.onProcessExit; } (); info.driver = this; - StaticProcesses.add(pid, info); + add(pid, info); return pid; } @@ -282,18 +166,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces bool exited; int exitCode; - size_t id = lockedProcessInfo!((info) { + size_t id = size_t.max; + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); if (info.exited) { exited = true; exitCode = info.exitCode; - return size_t.max; } else { info.callbacks ~= on_process_exit; - return info.callbacks.length - 1; + id = info.callbacks.length - 1; } - })(pid); + }); if (exited) { on_process_exit(pid, exitCode); @@ -306,13 +190,13 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { if (wait_id == size_t.max) return; - lockedProcessInfo!((info) { + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); assert(!info.exited, "Cannot cancel wait when none are pending"); assert(info.callbacks.length > wait_id, "Invalid process wait ID"); info.callbacks[wait_id] = null; - })(pid); + }); } private void onProcessExit(int system_pid) @@ -327,7 +211,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces int exitCode; ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) { + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (info) { assert(info !is null); exitCode = info.exitCode; @@ -335,8 +220,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces callbacks = info.callbacks; info.callbacks = null; - return info.driver; - })(pid); + driver = info.driver; + }); foreach (cb; callbacks) { if (cb) @@ -348,53 +233,162 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces final override bool hasExited(ProcessID pid) { - return lockedProcessInfo!((info) { + bool ret; + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); - - return info.exited; - })(pid); + ret = info.exited; + }); + return ret; } final override void addRef(ProcessID pid) { - lockedProcessInfo!((info) { + lockedProcessInfo(pid, (info) { nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); info.refCount++; - })(pid); + }); } final override bool releaseRef(ProcessID pid) { - return lockedProcessInfo!((info) { + bool ret; + lockedProcessInfo(pid, (info) { nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); if (--info.refCount == 0) { // Remove/deallocate process if (info.userDataDestructor) () @trusted { info.userDataDestructor(info.userData.ptr); } (); - StaticProcesses.m_processes.remove(pid); - return false; - } - return true; - })(pid); + () @trusted { s_processes.remove(pid); } (); + ret = false; + } else ret = true; + }); + return ret; } final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) @system { - return lockedProcessInfo!((info) { + void* ret; + lockedProcessInfo(pid, (info) @safe nothrow { assert(info.userDataDestructor is null || info.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); if (!info.userDataDestructor) { - initialize(info.userData.ptr); + () @trusted { initialize(info.userData.ptr); } (); info.userDataDestructor = destroy; } - return info.userData.ptr; - })(pid); + ret = () @trusted { return info.userData.ptr; } (); + }); + return ret; } - package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } + package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; } + + + shared static this() + { + s_mutex = new shared Mutex; + } + + private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + auto info = () @trusted { return pid in s_processes; } (); + fn(info); + } + + private static void add(ProcessID pid, ProcessInfo info) @trusted { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + + if (!s_waitThread) { + s_waitThread = new Thread(&waitForProcesses); + s_waitThread.start(); + } + + assert(pid !in s_processes, "Process adopted twice"); + s_processes[pid] = info; + } + + private static void waitForProcesses() + @system { + import core.stdc.errno : ECHILD, errno; + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; + + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + s_waitThread = null; + } + break; + } + + ProcessID[] allprocs; + + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + + + () @trusted { + foreach (ref entry; s_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; + } + } (); + } + + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExitStatic(ret, exitstatus); + } + } + } + } + + private static void onProcessExitStatic(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; + + ProcessWaitCallback[] callbacks; + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (ProcessInfo* info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) return; + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exit_status; + driver = info.driver; + }); + + if (driver) + () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); + } + + private static struct ProcessInfo { + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + PosixEventDriverProcesses driver; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + } } final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {