diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index f3e93e2..0681464 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -55,7 +55,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; - version (linux) alias ProcessDriver = SignalEventDriverProcesses!Loop; + version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop; else alias ProcessDriver = DummyEventDriverProcesses!Loop; Loop m_loop; diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 2dfca88..23cf4ec 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -10,427 +10,435 @@ import std.algorithm.comparison : among; import std.variant : visit; import std.stdint; -private struct ProcessInfo { - bool exited = true; - int exitCode; - ProcessWaitCallback[] callbacks; - size_t refCount = 0; - EventDriverProcesses driver; - - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; -} - -private struct StaticProcesses { -@safe: nothrow: - import core.sync.mutex : Mutex; - - private { - static shared Mutex m_mutex; - static __gshared ProcessInfo[ProcessID] m_processes; - } - - shared static this() - { - m_mutex = new shared Mutex; - } - - static void add(ProcessID pid, ProcessInfo info) @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - assert(pid !in m_processes, "Process adopted twice"); - m_processes[pid] = info; - } -} - -private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { - StaticProcesses.m_mutex.lock_nothrow(); - scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); - auto info = pid in StaticProcesses.m_processes; - - return fn(info); -} private enum SIGCHLD = 17; -final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { +final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN, EINPROGRESS; - import core.sys.linux.sys.signalfd; - import core.sys.posix.unistd : close, read, write, dup; + import core.sync.mutex : Mutex; + import core.sys.posix.unistd : dup; + import core.thread : Thread; - private { - Loop m_loop; - EventDriver m_driver; - SignalListenID m_sighandle; - } + private { + static shared Mutex s_mutex; + static __gshared ProcessInfo[ProcessID] s_processes; + static __gshared Thread s_waitThread; - this(Loop loop, EventDriver driver) - { - import core.sys.posix.signal; + Loop m_loop; + // FIXME: avoid virtual funciton calls and use the final type instead + EventDriver m_driver; + } - m_loop = loop; - m_driver = driver; + this(Loop loop, EventDriver driver) + { + m_loop = loop; + m_driver = driver; + } - // Listen for child process exits using SIGCHLD - m_sighandle = () @trusted { - sigset_t sset; - sigemptyset(&sset); - sigaddset(&sset, SIGCHLD); + void dispose() + { + } - assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); + final override ProcessID adopt(int system_pid) + { + auto pid = cast(ProcessID)system_pid; - return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); - } (); + ProcessInfo info; + info.exited = false; + info.refCount = 1; + info.driver = this; + add(pid, info); + return pid; + } - m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); - m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); - m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); + final override Process spawn( + string[] args, + ProcessStdinFile stdin, + ProcessStdoutFile stdout, + ProcessStderrFile stderr, + const string[string] env, + ProcessConfig config, + string working_dir) + @trusted { + // Use std.process to spawn processes + import std.process : pipe, Pid, spawnProcess; + import std.stdio : File; + static import std.stdio; - onSignal(cast(FD)m_sighandle); - } + static File fdToFile(int fd, scope const(char)[] mode) + { + try { + File f; + f.fdopen(fd, mode); + return f; + } catch (Exception e) { + assert(0); + } + } - void dispose() - { - FD sighandle = cast(FD)m_sighandle; - m_loop.m_fds[sighandle].common.refCount--; - m_loop.setNotifyCallback!(EventType.read)(sighandle, null); - m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!(SignalSlot)(sighandle); - close(cast(int)sighandle); - } + try { + Process process; + File stdinFile, stdoutFile, stderrFile; - final override ProcessID adopt(int system_pid) - { - auto pid = cast(ProcessID)system_pid; + stdinFile = stdin.visit!( + (int handle) => fdToFile(handle, "r"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdin; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); + return p.readEnd; + } + }); - ProcessInfo info; - info.exited = false; - info.refCount = 1; - info.driver = this; - StaticProcesses.add(pid, info); + stdoutFile = stdout.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdout; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); - return pid; - } + stderrFile = stderr.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stderr; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); - final override Process spawn( - string[] args, - ProcessStdinFile stdin, - ProcessStdoutFile stdout, - ProcessStderrFile stderr, - const string[string] env, - ProcessConfig config, - string working_dir) - @trusted { - // Use std.process to spawn processes - import std.process : pipe, Pid, spawnProcess; - import std.stdio : File; - static import std.stdio; + const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; + const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; - static File fdToFile(int fd, scope const(char)[] mode) - { - try { - File f; - f.fdopen(fd, mode); - return f; - } catch (Exception e) { - assert(0); - } - } + if (redirectStdout) { + assert(!redirectStderr, "Can't redirect both stdout and stderr"); - try { - Process process; - File stdinFile, stdoutFile, stderrFile; + stdoutFile = stderrFile; + } else if (redirectStderr) { + stderrFile = stdoutFile; + } - stdinFile = stdin.visit!( - (int handle) => fdToFile(handle, "r"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stdin; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); - return p.readEnd; - } - }); + Pid stdPid = spawnProcess( + args, + stdinFile, + stdoutFile, + stderrFile, + env, + cast(std.process.Config)config, + working_dir); + process.pid = adopt(stdPid.osHandle); + stdPid.destroy(); - stdoutFile = stdout.visit!( - (int handle) => fdToFile(handle, "w"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stdout; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); - return p.writeEnd; - } - }, - (_) => File.init); + return process; + } catch (Exception e) { + return Process.init; + } + } - stderrFile = stderr.visit!( - (int handle) => fdToFile(handle, "w"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stderr; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); - return p.writeEnd; - } - }, - (_) => File.init); + final override void kill(ProcessID pid, int signal) + @trusted { + import core.sys.posix.signal : pkill = kill; - const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; - const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; + assert(cast(int)pid > 0, "Invalid PID passed to kill."); - if (redirectStdout) { - assert(!redirectStderr, "Can't redirect both stdout and stderr"); + if (cast(int)pid > 0) + pkill(cast(int)pid, signal); + } - stdoutFile = stderrFile; - } else if (redirectStderr) { - stderrFile = stdoutFile; - } + final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + bool exited; + int exitCode; - Pid stdPid = spawnProcess( - args, - stdinFile, - stdoutFile, - stderrFile, - env, - cast(std.process.Config)config, - working_dir); - process.pid = adopt(stdPid.osHandle); - stdPid.destroy(); + size_t id = size_t.max; + lockedProcessInfo(pid, (info) { + assert(info !is null, "Unknown process ID"); - return process; - } catch (Exception e) { - return Process.init; - } - } + if (info.exited) { + exited = true; + exitCode = info.exitCode; + } else { + info.callbacks ~= on_process_exit; + id = info.callbacks.length - 1; + } + }); - final override void kill(ProcessID pid, int signal) - @trusted { - import core.sys.posix.signal : pkill = kill; + if (exited) { + on_process_exit(pid, exitCode); + } - pkill(cast(int)pid, signal); - } + return id; + } - final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) - { - bool exited; - int exitCode; + final override void cancelWait(ProcessID pid, size_t wait_id) + { + if (wait_id == size_t.max) return; - size_t id = lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); + lockedProcessInfo(pid, (info) { + assert(info !is null, "Unknown process ID"); + assert(!info.exited, "Cannot cancel wait when none are pending"); + assert(info.callbacks.length > wait_id, "Invalid process wait ID"); - if (info.exited) { - exited = true; - exitCode = info.exitCode; - return 0; - } else { - info.callbacks ~= on_process_exit; - return info.callbacks.length - 1; - } - })(pid); + info.callbacks[wait_id] = null; + }); + } - if (exited) { - on_process_exit(pid, exitCode); - } + private void onProcessExit(int system_pid) + shared { + m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); + } - return id; - } + private static void onLocalProcessExit(intptr_t system_pid) + { + auto pid = cast(ProcessID)system_pid; - final override void cancelWait(ProcessID pid, size_t waitId) - { - lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); - assert(!info.exited, "Cannot cancel wait when none are pending"); - assert(info.callbacks.length > waitId, "Invalid process wait ID"); + int exitCode; + ProcessWaitCallback[] callbacks; - info.callbacks[waitId] = null; - })(pid); - } + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (info) { + assert(info !is null); - private void onSignal(FD fd) - { - SignalListenID lid = cast(SignalListenID)fd; + exitCode = info.exitCode; - signalfd_siginfo nfo; - do { - auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); + callbacks = info.callbacks; + info.callbacks = null; - if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) - return; + driver = info.driver; + }); - onProcessExit(nfo.ssi_pid, nfo.ssi_status); - } while (true); - } + foreach (cb; callbacks) { + if (cb) + cb(pid, exitCode); + } - private void onProcessExit(int system_pid, int exitCode) - { - auto pid = cast(ProcessID)system_pid; + driver.releaseRef(pid); + } - ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } + final override bool hasExited(ProcessID pid) + { + bool ret; + lockedProcessInfo(pid, (info) { + assert(info !is null, "Unknown process ID"); + ret = info.exited; + }); + return ret; + } - // Increment the ref count to make sure it doesn't get removed - info.refCount++; + final override void addRef(ProcessID pid) + { + lockedProcessInfo(pid, (info) { + nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); + info.refCount++; + }); + } - info.exited = true; - info.exitCode = exitCode; - return info.driver; - })(pid); + final override bool releaseRef(ProcessID pid) + { + bool ret; + lockedProcessInfo(pid, (info) { + nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); + if (--info.refCount == 0) { + // Remove/deallocate process + if (info.userDataDestructor) + () @trusted { info.userDataDestructor(info.userData.ptr); } (); - // Need to call callbacks in the owner thread as this function can be - // called from any thread. Without extra threads this is always the main - // thread. - if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { - onLocalProcessExit(cast(intptr_t)pid); - } else if (driver) { - auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); + () @trusted { s_processes.remove(pid); } (); + ret = false; + } else ret = true; + }); + return ret; + } - sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); - } - } + final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + void* ret; + lockedProcessInfo(pid, (info) @safe nothrow { + assert(info.userDataDestructor is null || info.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); - private static void onLocalProcessExit(intptr_t system_pid) - { - auto pid = cast(ProcessID)system_pid; + if (!info.userDataDestructor) { + () @trusted { initialize(info.userData.ptr); } (); + info.userDataDestructor = destroy; + } + ret = () @trusted { return info.userData.ptr; } (); + }); + return ret; + } - int exitCode; - ProcessWaitCallback[] callbacks; + package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; } - auto driver = lockedProcessInfo!((info) { - assert(info !is null); - exitCode = info.exitCode; + shared static this() + { + s_mutex = new shared Mutex; + } - callbacks = info.callbacks; - info.callbacks = null; + private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + auto info = () @trusted { return pid in s_processes; } (); + fn(info); + } - return info.driver; - })(pid); + private static void add(ProcessID pid, ProcessInfo info) @trusted { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); - foreach (cb; callbacks) { - if (cb) - cb(pid, exitCode); - } + if (!s_waitThread) { + s_waitThread = new Thread(&waitForProcesses); + s_waitThread.start(); + } - driver.releaseRef(pid); - } + assert(pid !in s_processes, "Process adopted twice"); + s_processes[pid] = info; + } - final override bool hasExited(ProcessID pid) - { - return lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); + private static void waitForProcesses() + @system { + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; - return info.exited; - })(pid); - } + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + s_waitThread = null; + } + break; + } - final override void addRef(ProcessID pid) - { - lockedProcessInfo!((info) { - nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); - info.refCount++; - })(pid); - } + ProcessID[] allprocs; - final override bool releaseRef(ProcessID pid) - { - return lockedProcessInfo!((info) { - nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); - if (--info.refCount == 0) { - // Remove/deallocate process - if (info.userDataDestructor) - () @trusted { info.userDataDestructor(info.userData.ptr); } (); + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); - StaticProcesses.m_processes.remove(pid); - return false; - } - return true; - })(pid); - } - final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - return lockedProcessInfo!((info) { - assert(info.userDataDestructor is null || info.userDataDestructor is destroy, - "Requesting user data with differing type (destructor)."); - assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); + () @trusted { + foreach (ref entry; s_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; + } + } (); + } - if (!info.userDataDestructor) { - initialize(info.userData.ptr); - info.userDataDestructor = destroy; - } - return info.userData.ptr; - })(pid); - } + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExitStatic(ret, exitstatus); + } + } + } + } - package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } + private static void onProcessExitStatic(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; + + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (ProcessInfo* info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) return; + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exit_status; + driver = info.driver; + }); + + if (driver) + () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); + } + + private static struct ProcessInfo { + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + PosixEventDriverProcesses driver; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + } } final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - this(Loop loop, EventDriver driver) {} + this(Loop loop, EventDriver driver) {} - void dispose() {} + void dispose() {} - override ProcessID adopt(int system_pid) - { - assert(false, "TODO!"); - } + override ProcessID adopt(int system_pid) + { + assert(false, "TODO!"); + } - override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) - { - assert(false, "TODO!"); - } + override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) + { + assert(false, "TODO!"); + } - override bool hasExited(ProcessID pid) - { - assert(false, "TODO!"); - } + override bool hasExited(ProcessID pid) + { + assert(false, "TODO!"); + } - override void kill(ProcessID pid, int signal) - { - assert(false, "TODO!"); - } + override void kill(ProcessID pid, int signal) + { + assert(false, "TODO!"); + } - override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) - { - assert(false, "TODO!"); - } + override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + assert(false, "TODO!"); + } - override void cancelWait(ProcessID pid, size_t waitId) - { - assert(false, "TODO!"); - } + override void cancelWait(ProcessID pid, size_t waitId) + { + assert(false, "TODO!"); + } - override void addRef(ProcessID pid) - { - assert(false, "TODO!"); - } + override void addRef(ProcessID pid) + { + assert(false, "TODO!"); + } - override bool releaseRef(ProcessID pid) - { - assert(false, "TODO!"); - } + override bool releaseRef(ProcessID pid) + { + assert(false, "TODO!"); + } - protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - assert(false, "TODO!"); - } + protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } - package final @property size_t pendingCount() const nothrow { return 0; } + package final @property size_t pendingCount() const nothrow { return 0; } } diff --git a/tests/issue-122-coalesced-sigchld.d b/tests/issue-122-coalesced-sigchld.d new file mode 100644 index 0000000..625cbd4 --- /dev/null +++ b/tests/issue-122-coalesced-sigchld.d @@ -0,0 +1,76 @@ +#!/usr/bin/env dub +/+ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ + +module test; + +import core.time : Duration, msecs; +import eventcore.core; +import std.conv; +import std.datetime; +import std.process : thisProcessID; +import std.stdio; + +version (Windows) { + void main() + { + writefln("Skipping SIGCHLD coalesce test on Windows."); + } +} else: + +import core.sys.posix.sys.wait : waitpid, WNOHANG; + +int numProc; + +void main(string[] args) +{ + // child mode + if (args.length == 2) + { + import core.thread : Thread; + writefln("Child: %s (%s) from %s", args[1], (args[1].to!long - Clock.currStdTime).hnsecs, thisProcessID); + Thread.sleep((args[1].to!long - Clock.currStdTime).hnsecs); + return; + } + + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, 5.seconds, 0.msecs); + eventDriver.timers.wait(tm, (tm) @trusted { + assert(false, "Test hung."); + }); + + // attempt to let all child processes finish in exactly 1 second to force + // signal coalescing + auto targettime = Clock.currTime(UTC()) + 1.seconds; + + auto procs = new Process[](20); + foreach (i, ref p; procs) { + p = eventDriver.processes.spawn( + [args[0], targettime.stdTime.to!string], + ProcessStdinFile(ProcessRedirect.inherit), + ProcessStdoutFile(ProcessRedirect.inherit), + ProcessStderrFile(ProcessRedirect.inherit), + null, ProcessConfig.none, null + ); + assert(p != Process.init); + + writeln("Started child: ", p.pid); + numProc++; + } + + foreach (p; procs) { + eventDriver.processes.wait(p.pid, (ProcessID pid, int res) nothrow + { + numProc--; + try writefln("Child %s exited with %s", pid, res); + catch(Exception){} + }); + } + + do eventDriver.core.processEvents(Duration.max); + while (numProc); + + foreach (p; procs) assert(waitpid(cast(int)p.pid, null, WNOHANG) == -1); +}