From e1c6d99798dcb90429351fe83e806e68b27a6054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 16:10:57 +0200 Subject: [PATCH 01/11] Fix indentation style. --- source/eventcore/drivers/posix/processes.d | 656 ++++++++++----------- 1 file changed, 328 insertions(+), 328 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 2dfca88..becd969 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -11,45 +11,45 @@ import std.variant : visit; import std.stdint; private struct ProcessInfo { - bool exited = true; - int exitCode; - ProcessWaitCallback[] callbacks; - size_t refCount = 0; - EventDriverProcesses driver; + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + EventDriverProcesses driver; - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; } private struct StaticProcesses { @safe: nothrow: - import core.sync.mutex : Mutex; + import core.sync.mutex : Mutex; - private { - static shared Mutex m_mutex; - static __gshared ProcessInfo[ProcessID] m_processes; - } + private { + static shared Mutex m_mutex; + static __gshared ProcessInfo[ProcessID] m_processes; + } - shared static this() - { - m_mutex = new shared Mutex; - } + shared static this() + { + m_mutex = new shared Mutex; + } - static void add(ProcessID pid, ProcessInfo info) @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); + static void add(ProcessID pid, ProcessInfo info) @trusted { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); - assert(pid !in m_processes, "Process adopted twice"); - m_processes[pid] = info; - } + assert(pid !in m_processes, "Process adopted twice"); + m_processes[pid] = info; + } } private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { - StaticProcesses.m_mutex.lock_nothrow(); - scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); - auto info = pid in StaticProcesses.m_processes; + StaticProcesses.m_mutex.lock_nothrow(); + scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); + auto info = pid in StaticProcesses.m_processes; - return fn(info); + return fn(info); } @@ -57,380 +57,380 @@ private enum SIGCHLD = 17; final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN, EINPROGRESS; - import core.sys.linux.sys.signalfd; - import core.sys.posix.unistd : close, read, write, dup; + import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.sys.linux.sys.signalfd; + import core.sys.posix.unistd : close, read, write, dup; - private { - Loop m_loop; - EventDriver m_driver; - SignalListenID m_sighandle; - } + private { + Loop m_loop; + EventDriver m_driver; + SignalListenID m_sighandle; + } - this(Loop loop, EventDriver driver) - { - import core.sys.posix.signal; + this(Loop loop, EventDriver driver) + { + import core.sys.posix.signal; - m_loop = loop; - m_driver = driver; + m_loop = loop; + m_driver = driver; - // Listen for child process exits using SIGCHLD - m_sighandle = () @trusted { - sigset_t sset; - sigemptyset(&sset); - sigaddset(&sset, SIGCHLD); + // Listen for child process exits using SIGCHLD + m_sighandle = () @trusted { + sigset_t sset; + sigemptyset(&sset); + sigaddset(&sset, SIGCHLD); - assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); + assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); - return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); - } (); + return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); + } (); - m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); - m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); - m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); + m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); + m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); + m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); - onSignal(cast(FD)m_sighandle); - } + onSignal(cast(FD)m_sighandle); + } - void dispose() - { - FD sighandle = cast(FD)m_sighandle; - m_loop.m_fds[sighandle].common.refCount--; - m_loop.setNotifyCallback!(EventType.read)(sighandle, null); - m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!(SignalSlot)(sighandle); - close(cast(int)sighandle); - } + void dispose() + { + FD sighandle = cast(FD)m_sighandle; + m_loop.m_fds[sighandle].common.refCount--; + m_loop.setNotifyCallback!(EventType.read)(sighandle, null); + m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!(SignalSlot)(sighandle); + close(cast(int)sighandle); + } - final override ProcessID adopt(int system_pid) - { - auto pid = cast(ProcessID)system_pid; + final override ProcessID adopt(int system_pid) + { + auto pid = cast(ProcessID)system_pid; - ProcessInfo info; - info.exited = false; - info.refCount = 1; - info.driver = this; - StaticProcesses.add(pid, info); + ProcessInfo info; + info.exited = false; + info.refCount = 1; + info.driver = this; + StaticProcesses.add(pid, info); - return pid; - } + return pid; + } - final override Process spawn( - string[] args, - ProcessStdinFile stdin, - ProcessStdoutFile stdout, - ProcessStderrFile stderr, - const string[string] env, - ProcessConfig config, - string working_dir) - @trusted { - // Use std.process to spawn processes - import std.process : pipe, Pid, spawnProcess; - import std.stdio : File; - static import std.stdio; + final override Process spawn( + string[] args, + ProcessStdinFile stdin, + ProcessStdoutFile stdout, + ProcessStderrFile stderr, + const string[string] env, + ProcessConfig config, + string working_dir) + @trusted { + // Use std.process to spawn processes + import std.process : pipe, Pid, spawnProcess; + import std.stdio : File; + static import std.stdio; - static File fdToFile(int fd, scope const(char)[] mode) - { - try { - File f; - f.fdopen(fd, mode); - return f; - } catch (Exception e) { - assert(0); - } - } + static File fdToFile(int fd, scope const(char)[] mode) + { + try { + File f; + f.fdopen(fd, mode); + return f; + } catch (Exception e) { + assert(0); + } + } - try { - Process process; - File stdinFile, stdoutFile, stderrFile; + try { + Process process; + File stdinFile, stdoutFile, stderrFile; - stdinFile = stdin.visit!( - (int handle) => fdToFile(handle, "r"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stdin; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); - return p.readEnd; - } - }); + stdinFile = stdin.visit!( + (int handle) => fdToFile(handle, "r"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdin; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); + return p.readEnd; + } + }); - stdoutFile = stdout.visit!( - (int handle) => fdToFile(handle, "w"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stdout; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); - return p.writeEnd; - } - }, - (_) => File.init); + stdoutFile = stdout.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdout; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); - stderrFile = stderr.visit!( - (int handle) => fdToFile(handle, "w"), - (ProcessRedirect redirect) { - final switch (redirect) { - case ProcessRedirect.inherit: return std.stdio.stderr; - case ProcessRedirect.none: return File.init; - case ProcessRedirect.pipe: - auto p = pipe(); - process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); - return p.writeEnd; - } - }, - (_) => File.init); + stderrFile = stderr.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stderr; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); - const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; - const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; + const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; + const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; - if (redirectStdout) { - assert(!redirectStderr, "Can't redirect both stdout and stderr"); + if (redirectStdout) { + assert(!redirectStderr, "Can't redirect both stdout and stderr"); - stdoutFile = stderrFile; - } else if (redirectStderr) { - stderrFile = stdoutFile; - } + stdoutFile = stderrFile; + } else if (redirectStderr) { + stderrFile = stdoutFile; + } - Pid stdPid = spawnProcess( - args, - stdinFile, - stdoutFile, - stderrFile, - env, - cast(std.process.Config)config, - working_dir); - process.pid = adopt(stdPid.osHandle); - stdPid.destroy(); + Pid stdPid = spawnProcess( + args, + stdinFile, + stdoutFile, + stderrFile, + env, + cast(std.process.Config)config, + working_dir); + process.pid = adopt(stdPid.osHandle); + stdPid.destroy(); - return process; - } catch (Exception e) { - return Process.init; - } - } + return process; + } catch (Exception e) { + return Process.init; + } + } - final override void kill(ProcessID pid, int signal) - @trusted { - import core.sys.posix.signal : pkill = kill; + final override void kill(ProcessID pid, int signal) + @trusted { + import core.sys.posix.signal : pkill = kill; - pkill(cast(int)pid, signal); - } + pkill(cast(int)pid, signal); + } - final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) - { - bool exited; - int exitCode; + final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + bool exited; + int exitCode; - size_t id = lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); + size_t id = lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); - if (info.exited) { - exited = true; - exitCode = info.exitCode; - return 0; - } else { - info.callbacks ~= on_process_exit; - return info.callbacks.length - 1; - } - })(pid); + if (info.exited) { + exited = true; + exitCode = info.exitCode; + return 0; + } else { + info.callbacks ~= on_process_exit; + return info.callbacks.length - 1; + } + })(pid); - if (exited) { - on_process_exit(pid, exitCode); - } + if (exited) { + on_process_exit(pid, exitCode); + } - return id; - } + return id; + } - final override void cancelWait(ProcessID pid, size_t waitId) - { - lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); - assert(!info.exited, "Cannot cancel wait when none are pending"); - assert(info.callbacks.length > waitId, "Invalid process wait ID"); + final override void cancelWait(ProcessID pid, size_t waitId) + { + lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); + assert(!info.exited, "Cannot cancel wait when none are pending"); + assert(info.callbacks.length > waitId, "Invalid process wait ID"); - info.callbacks[waitId] = null; - })(pid); - } + info.callbacks[waitId] = null; + })(pid); + } - private void onSignal(FD fd) - { - SignalListenID lid = cast(SignalListenID)fd; + private void onSignal(FD fd) + { + SignalListenID lid = cast(SignalListenID)fd; - signalfd_siginfo nfo; - do { - auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); + signalfd_siginfo nfo; + do { + auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); - if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) - return; + if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) + return; - onProcessExit(nfo.ssi_pid, nfo.ssi_status); - } while (true); - } + onProcessExit(nfo.ssi_pid, nfo.ssi_status); + } while (true); + } - private void onProcessExit(int system_pid, int exitCode) - { - auto pid = cast(ProcessID)system_pid; + private void onProcessExit(int system_pid, int exitCode) + { + auto pid = cast(ProcessID)system_pid; - ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } + ProcessWaitCallback[] callbacks; + auto driver = lockedProcessInfo!((info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) { + return null; + } - // Increment the ref count to make sure it doesn't get removed - info.refCount++; + // Increment the ref count to make sure it doesn't get removed + info.refCount++; - info.exited = true; - info.exitCode = exitCode; - return info.driver; - })(pid); + info.exited = true; + info.exitCode = exitCode; + return info.driver; + })(pid); - // Need to call callbacks in the owner thread as this function can be - // called from any thread. Without extra threads this is always the main - // thread. - if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { - onLocalProcessExit(cast(intptr_t)pid); - } else if (driver) { - auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); + // Need to call callbacks in the owner thread as this function can be + // called from any thread. Without extra threads this is always the main + // thread. + if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { + onLocalProcessExit(cast(intptr_t)pid); + } else if (driver) { + auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); - sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); - } - } + sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); + } + } - private static void onLocalProcessExit(intptr_t system_pid) - { - auto pid = cast(ProcessID)system_pid; + private static void onLocalProcessExit(intptr_t system_pid) + { + auto pid = cast(ProcessID)system_pid; - int exitCode; - ProcessWaitCallback[] callbacks; + int exitCode; + ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) { - assert(info !is null); + auto driver = lockedProcessInfo!((info) { + assert(info !is null); - exitCode = info.exitCode; + exitCode = info.exitCode; - callbacks = info.callbacks; - info.callbacks = null; + callbacks = info.callbacks; + info.callbacks = null; - return info.driver; - })(pid); + return info.driver; + })(pid); - foreach (cb; callbacks) { - if (cb) - cb(pid, exitCode); - } + foreach (cb; callbacks) { + if (cb) + cb(pid, exitCode); + } - driver.releaseRef(pid); - } + driver.releaseRef(pid); + } - final override bool hasExited(ProcessID pid) - { - return lockedProcessInfo!((info) { - assert(info !is null, "Unknown process ID"); + final override bool hasExited(ProcessID pid) + { + return lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); - return info.exited; - })(pid); - } + return info.exited; + })(pid); + } - final override void addRef(ProcessID pid) - { - lockedProcessInfo!((info) { - nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); - info.refCount++; - })(pid); - } + final override void addRef(ProcessID pid) + { + lockedProcessInfo!((info) { + nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); + info.refCount++; + })(pid); + } - final override bool releaseRef(ProcessID pid) - { - return lockedProcessInfo!((info) { - nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); - if (--info.refCount == 0) { - // Remove/deallocate process - if (info.userDataDestructor) - () @trusted { info.userDataDestructor(info.userData.ptr); } (); + final override bool releaseRef(ProcessID pid) + { + return lockedProcessInfo!((info) { + nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); + if (--info.refCount == 0) { + // Remove/deallocate process + if (info.userDataDestructor) + () @trusted { info.userDataDestructor(info.userData.ptr); } (); - StaticProcesses.m_processes.remove(pid); - return false; - } - return true; - })(pid); - } + StaticProcesses.m_processes.remove(pid); + return false; + } + return true; + })(pid); + } - final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - return lockedProcessInfo!((info) { - assert(info.userDataDestructor is null || info.userDataDestructor is destroy, - "Requesting user data with differing type (destructor)."); - assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); + final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + return lockedProcessInfo!((info) { + assert(info.userDataDestructor is null || info.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); - if (!info.userDataDestructor) { - initialize(info.userData.ptr); - info.userDataDestructor = destroy; - } - return info.userData.ptr; - })(pid); - } + if (!info.userDataDestructor) { + initialize(info.userData.ptr); + info.userDataDestructor = destroy; + } + return info.userData.ptr; + })(pid); + } - package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } + package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } } final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - this(Loop loop, EventDriver driver) {} + this(Loop loop, EventDriver driver) {} - void dispose() {} + void dispose() {} - override ProcessID adopt(int system_pid) - { - assert(false, "TODO!"); - } + override ProcessID adopt(int system_pid) + { + assert(false, "TODO!"); + } - override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) - { - assert(false, "TODO!"); - } + override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) + { + assert(false, "TODO!"); + } - override bool hasExited(ProcessID pid) - { - assert(false, "TODO!"); - } + override bool hasExited(ProcessID pid) + { + assert(false, "TODO!"); + } - override void kill(ProcessID pid, int signal) - { - assert(false, "TODO!"); - } + override void kill(ProcessID pid, int signal) + { + assert(false, "TODO!"); + } - override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) - { - assert(false, "TODO!"); - } + override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + assert(false, "TODO!"); + } - override void cancelWait(ProcessID pid, size_t waitId) - { - assert(false, "TODO!"); - } + override void cancelWait(ProcessID pid, size_t waitId) + { + assert(false, "TODO!"); + } - override void addRef(ProcessID pid) - { - assert(false, "TODO!"); - } + override void addRef(ProcessID pid) + { + assert(false, "TODO!"); + } - override bool releaseRef(ProcessID pid) - { - assert(false, "TODO!"); - } + override bool releaseRef(ProcessID pid) + { + assert(false, "TODO!"); + } - protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - assert(false, "TODO!"); - } + protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } - package final @property size_t pendingCount() const nothrow { return 0; } + package final @property size_t pendingCount() const nothrow { return 0; } } From 507fb5a0c95502f03ccd7ba3a1f2552a2d5b7e4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 16:18:30 +0200 Subject: [PATCH 02/11] Use waitpid to iterate over all exited child processes. Fixes #116. Closes #117. --- source/eventcore/drivers/posix/processes.d | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index becd969..faa806b 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -258,17 +258,26 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce private void onSignal(FD fd) { + import core.sys.posix.sys.wait : WNOHANG, WEXITSTATUS, waitpid; + SignalListenID lid = cast(SignalListenID)fd; + // drain the signalfd - note that multiple signals can be combined into + // a single value, so that individual siginfo results are useless for + // us signalfd_siginfo nfo; - do { - auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); + while (() @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } () == nfo.sizeof) + { + } - if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) - return; + // instead, use waitpid to determine all exited processes + while (true) { + int status; + auto ret = () @trusted { return waitpid(-1, &status, WNOHANG); } (); + if (ret <= 0) break; - onProcessExit(nfo.ssi_pid, nfo.ssi_status); - } while (true); + onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ()); + } } private void onProcessExit(int system_pid, int exitCode) From de199d3410003251ad461a0e05a0b9d0fce025ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 22:45:06 +0200 Subject: [PATCH 03/11] Add test for SIGCHLD coalescing. --- tests/issue-122-coalesced-sigchld.d | 76 +++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 tests/issue-122-coalesced-sigchld.d diff --git a/tests/issue-122-coalesced-sigchld.d b/tests/issue-122-coalesced-sigchld.d new file mode 100644 index 0000000..ef2822d --- /dev/null +++ b/tests/issue-122-coalesced-sigchld.d @@ -0,0 +1,76 @@ +#!/usr/bin/env dub +/+ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ + +module test; + +import core.time : Duration, msecs; +import eventcore.core; +import std.conv; +import std.datetime; +import std.process : thisProcessID; +import std.stdio; + +version (Windows) { + void main() + { + writefln("Skipping SIGCHLD coalesce test on Windows."); + } +} else: + +import core.sys.posix.sys.wait : waitpid, WNOHANG; + +int numProc; + +void main(string[] args) +{ + // child mode + if (args.length == 2) + { + import core.thread : Thread; + writefln("Child: %s (%s) from %s", args[1], (args[1].to!long - Clock.currStdTime).hnsecs, thisProcessID); + Thread.sleep((args[1].to!long - Clock.currStdTime).hnsecs); + return; + } + + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, 5.seconds, 0.msecs); + eventDriver.timers.wait(tm, (tm) @trusted { + assert(false, "Test hung."); + }); + + // attempt to let all child processes finish in exactly 1 second to force + // signal coalescing + auto targettime = Clock.currTime(UTC()) + 1.seconds; + + auto procs = new Process[](20); + foreach (i, ref p; procs) { + p = eventDriver.processes.spawn( + ["./test", targettime.stdTime.to!string], + ProcessStdinFile(ProcessRedirect.inherit), + ProcessStdoutFile(ProcessRedirect.inherit), + ProcessStderrFile(ProcessRedirect.inherit), + null, ProcessConfig.none, null + ); + assert(p != Process.init); + + writeln("Started child: ", p.pid); + numProc++; + } + + foreach (p; procs) { + eventDriver.processes.wait(p.pid, (ProcessID pid, int res) nothrow + { + numProc--; + try writefln("Child %s exited with %s", pid, res); + catch(Exception){} + }); + } + + do eventDriver.core.processEvents(Duration.max); + while (numProc); + + foreach (p; procs) assert(waitpid(cast(int)p.pid, null, WNOHANG) == -1); +} From 72234fc0a7eaec4d09dc7d8f83a6143f659d96d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 23:05:52 +0200 Subject: [PATCH 04/11] Return an invalid wait ID for processes.wait() if the process has already exited. Avoids overlap with valid wait IDs, so that a paired cancelWait() doesn't cancel a different wait. --- source/eventcore/drivers/posix/processes.d | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index faa806b..0b4724c 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -231,7 +231,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce if (info.exited) { exited = true; exitCode = info.exitCode; - return 0; + return size_t.max; } else { info.callbacks ~= on_process_exit; return info.callbacks.length - 1; @@ -245,14 +245,16 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce return id; } - final override void cancelWait(ProcessID pid, size_t waitId) + final override void cancelWait(ProcessID pid, size_t wait_id) { + if (wait_id == size_t.max) return; + lockedProcessInfo!((info) { assert(info !is null, "Unknown process ID"); assert(!info.exited, "Cannot cancel wait when none are pending"); - assert(info.callbacks.length > waitId, "Invalid process wait ID"); + assert(info.callbacks.length > wait_id, "Invalid process wait ID"); - info.callbacks[waitId] = null; + info.callbacks[wait_id] = null; })(pid); } From 4724f14145eba57d71c08cc89b6a6a1ae78d9dba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 23:24:02 +0200 Subject: [PATCH 05/11] Avoid interference with other users of waitpid. Instead of using waitpid(-1), explicitly waits on all known processes. This is inefficient for large numbers of child processes, but seems to be the only way to ensure to not interfere with other code that uses waitpid(). --- source/eventcore/drivers/posix/processes.d | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 0b4724c..9b82a06 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -273,12 +273,18 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce } // instead, use waitpid to determine all exited processes - while (true) { - int status; - auto ret = () @trusted { return waitpid(-1, &status, WNOHANG); } (); - if (ret <= 0) break; + ProcessID[] allprocs; + () @trusted { + try synchronized (StaticProcesses.m_mutex) + allprocs = StaticProcesses.m_processes.keys; + catch (Exception e) assert(false, e.msg); + } (); - onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ()); + foreach (pid; allprocs) { + int status; + auto ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) + onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ()); } } From 1ef320c329ebc4895060e7682547c7fcfb423e81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 22 Aug 2019 09:40:21 +0200 Subject: [PATCH 06/11] Use a more robust way to self-execute the test binary. --- tests/issue-122-coalesced-sigchld.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/issue-122-coalesced-sigchld.d b/tests/issue-122-coalesced-sigchld.d index ef2822d..625cbd4 100644 --- a/tests/issue-122-coalesced-sigchld.d +++ b/tests/issue-122-coalesced-sigchld.d @@ -48,7 +48,7 @@ void main(string[] args) auto procs = new Process[](20); foreach (i, ref p; procs) { p = eventDriver.processes.spawn( - ["./test", targettime.stdTime.to!string], + [args[0], targettime.stdTime.to!string], ProcessStdinFile(ProcessRedirect.inherit), ProcessStdoutFile(ProcessRedirect.inherit), ProcessStderrFile(ProcessRedirect.inherit), From 7ebad49ed02a69dfd396485b6e069be606d6febf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 22 Aug 2019 12:32:54 +0200 Subject: [PATCH 07/11] Rework the child process exit code to not rely in SIGCHLD. It turns out that in a heterogeneous process where other parts of the code may start processes or threads and may be waiting for those to finish, it is not realistic to rely on signalfd or even SIGCHLD in general to get notified about child process exits. The only solid way appears to be to start a separate waiter thread that uses waitid/waitpid to wait for exited child processes in a blocking way. This also fixes the hanging vibe.core.process test in vibe-core with DMD 2.087.x. --- source/eventcore/drivers/posix/processes.d | 170 ++++++++++----------- 1 file changed, 85 insertions(+), 85 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 9b82a06..b382ec7 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -15,6 +15,7 @@ private struct ProcessInfo { int exitCode; ProcessWaitCallback[] callbacks; size_t refCount = 0; + void delegate(int pid) shared nothrow onExit; EventDriverProcesses driver; DataInitializer userDataDestructor; @@ -24,10 +25,12 @@ private struct ProcessInfo { private struct StaticProcesses { @safe: nothrow: import core.sync.mutex : Mutex; + import core.thread : Thread; private { static shared Mutex m_mutex; static __gshared ProcessInfo[ProcessID] m_processes; + static __gshared Thread m_waitThread; } shared static this() @@ -39,9 +42,84 @@ private struct StaticProcesses { m_mutex.lock_nothrow(); scope (exit) m_mutex.unlock_nothrow(); + if (!m_waitThread) { + m_waitThread = new Thread(&waitForProcesses); + m_waitThread.start(); + } + assert(pid !in m_processes, "Process adopted twice"); m_processes[pid] = info; } + + private static void waitForProcesses() + @system { + import core.stdc.errno : ECHILD, errno; + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; + + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + m_waitThread = null; + } + break; + } + + ProcessID[] allprocs; + + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + + () @trusted { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + foreach (ref entry; StaticProcesses.m_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; + } + } (); + } + + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExit(ret, exitstatus); + } + } + } + } + + private static void onProcessExit(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; + + ProcessWaitCallback[] callbacks; + auto onexit = lockedProcessInfo!((info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) { + return null; + } + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exit_status; + return info.onExit; + })(pid); + + onexit(cast(int)pid); + } } private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { @@ -63,8 +141,8 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce private { Loop m_loop; + // FIXME: avoid virtual funciton calls and use the final type instead EventDriver m_driver; - SignalListenID m_sighandle; } this(Loop loop, EventDriver driver) @@ -73,45 +151,24 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce m_loop = loop; m_driver = driver; - - // Listen for child process exits using SIGCHLD - m_sighandle = () @trusted { - sigset_t sset; - sigemptyset(&sset); - sigaddset(&sset, SIGCHLD); - - assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); - - return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); - } (); - - m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); - m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); - m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); - - onSignal(cast(FD)m_sighandle); } void dispose() { - FD sighandle = cast(FD)m_sighandle; - m_loop.m_fds[sighandle].common.refCount--; - m_loop.setNotifyCallback!(EventType.read)(sighandle, null); - m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!(SignalSlot)(sighandle); - close(cast(int)sighandle); } final override ProcessID adopt(int system_pid) { auto pid = cast(ProcessID)system_pid; + auto sthis = () @trusted { return cast(shared)this; } (); + ProcessInfo info; info.exited = false; info.refCount = 1; + info.onExit = () @trusted { return &sthis.onProcessExit; } (); info.driver = this; StaticProcesses.add(pid, info); - return pid; } @@ -258,66 +315,9 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce })(pid); } - private void onSignal(FD fd) - { - import core.sys.posix.sys.wait : WNOHANG, WEXITSTATUS, waitpid; - - SignalListenID lid = cast(SignalListenID)fd; - - // drain the signalfd - note that multiple signals can be combined into - // a single value, so that individual siginfo results are useless for - // us - signalfd_siginfo nfo; - while (() @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } () == nfo.sizeof) - { - } - - // instead, use waitpid to determine all exited processes - ProcessID[] allprocs; - () @trusted { - try synchronized (StaticProcesses.m_mutex) - allprocs = StaticProcesses.m_processes.keys; - catch (Exception e) assert(false, e.msg); - } (); - - foreach (pid; allprocs) { - int status; - auto ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) - onProcessExit(ret, () @trusted { return WEXITSTATUS(status); } ()); - } - } - - private void onProcessExit(int system_pid, int exitCode) - { - auto pid = cast(ProcessID)system_pid; - - ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } - - // Increment the ref count to make sure it doesn't get removed - info.refCount++; - - info.exited = true; - info.exitCode = exitCode; - return info.driver; - })(pid); - - // Need to call callbacks in the owner thread as this function can be - // called from any thread. Without extra threads this is always the main - // thread. - if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { - onLocalProcessExit(cast(intptr_t)pid); - } else if (driver) { - auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); - - sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); - } + private void onProcessExit(int system_pid) + shared { + m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); } private static void onLocalProcessExit(intptr_t system_pid) From f1c2eb779f84462f94ca7752776d2419e6cbfe84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 22 Aug 2019 12:37:17 +0200 Subject: [PATCH 08/11] Use the Posix process driver on all Posix operating systems. --- source/eventcore/drivers/posix/driver.d | 2 +- source/eventcore/drivers/posix/processes.d | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index f3e93e2..0681464 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -55,7 +55,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; - version (linux) alias ProcessDriver = SignalEventDriverProcesses!Loop; + version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop; else alias ProcessDriver = DummyEventDriverProcesses!Loop; Loop m_loop; diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index b382ec7..cdf24e9 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -133,7 +133,7 @@ private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { private enum SIGCHLD = 17; -final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { +final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sys.linux.sys.signalfd; From 01c2c2696455af4c630ad04c68d4d1eb41f2b38c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 22 Aug 2019 13:59:43 +0200 Subject: [PATCH 09/11] Fix Windows compilation. Integrates the contents of StaticProcesses into PosixEventDriverProcesses to fully hide it form the Windows build. It also changes lockedProcessInfo to be a non-template function, as that lead to a linker error on macOS. --- source/eventcore/drivers/posix/processes.d | 292 ++++++++++----------- 1 file changed, 143 insertions(+), 149 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index cdf24e9..d2ac0cf 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -10,125 +10,6 @@ import std.algorithm.comparison : among; import std.variant : visit; import std.stdint; -private struct ProcessInfo { - bool exited = true; - int exitCode; - ProcessWaitCallback[] callbacks; - size_t refCount = 0; - void delegate(int pid) shared nothrow onExit; - EventDriverProcesses driver; - - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; -} - -private struct StaticProcesses { -@safe: nothrow: - import core.sync.mutex : Mutex; - import core.thread : Thread; - - private { - static shared Mutex m_mutex; - static __gshared ProcessInfo[ProcessID] m_processes; - static __gshared Thread m_waitThread; - } - - shared static this() - { - m_mutex = new shared Mutex; - } - - static void add(ProcessID pid, ProcessInfo info) @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!m_waitThread) { - m_waitThread = new Thread(&waitForProcesses); - m_waitThread.start(); - } - - assert(pid !in m_processes, "Process adopted twice"); - m_processes[pid] = info; - } - - private static void waitForProcesses() - @system { - import core.stdc.errno : ECHILD, errno; - import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; - import core.sys.posix.signal : siginfo_t; - - while (true) { - siginfo_t dummy; - auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); - if (ret == -1) { - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - m_waitThread = null; - } - break; - } - - ProcessID[] allprocs; - - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - - () @trusted { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - foreach (ref entry; StaticProcesses.m_processes.byKeyValue) { - if (!entry.value.exited) - allprocs ~= entry.key; - } - } (); - } - - foreach (pid; allprocs) { - int status; - ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) { - int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); - onProcessExit(ret, exitstatus); - } - } - } - } - - private static void onProcessExit(int system_pid, int exit_status) - { - auto pid = cast(ProcessID)system_pid; - - ProcessWaitCallback[] callbacks; - auto onexit = lockedProcessInfo!((info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) { - return null; - } - - // Increment the ref count to make sure it doesn't get removed - info.refCount++; - - info.exited = true; - info.exitCode = exit_status; - return info.onExit; - })(pid); - - onexit(cast(int)pid); - } -} - -private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { - StaticProcesses.m_mutex.lock_nothrow(); - scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); - auto info = pid in StaticProcesses.m_processes; - - return fn(info); -} private enum SIGCHLD = 17; @@ -136,10 +17,16 @@ private enum SIGCHLD = 17; final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.sync.mutex : Mutex; import core.sys.linux.sys.signalfd; import core.sys.posix.unistd : close, read, write, dup; + import core.thread : Thread; private { + static shared Mutex s_mutex; + static __gshared ProcessInfo[ProcessID] s_processes; + static __gshared Thread s_waitThread; + Loop m_loop; // FIXME: avoid virtual funciton calls and use the final type instead EventDriver m_driver; @@ -161,14 +48,11 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { auto pid = cast(ProcessID)system_pid; - auto sthis = () @trusted { return cast(shared)this; } (); - ProcessInfo info; info.exited = false; info.refCount = 1; - info.onExit = () @trusted { return &sthis.onProcessExit; } (); info.driver = this; - StaticProcesses.add(pid, info); + add(pid, info); return pid; } @@ -282,18 +166,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces bool exited; int exitCode; - size_t id = lockedProcessInfo!((info) { + size_t id = size_t.max; + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); if (info.exited) { exited = true; exitCode = info.exitCode; - return size_t.max; } else { info.callbacks ~= on_process_exit; - return info.callbacks.length - 1; + id = info.callbacks.length - 1; } - })(pid); + }); if (exited) { on_process_exit(pid, exitCode); @@ -306,13 +190,13 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { if (wait_id == size_t.max) return; - lockedProcessInfo!((info) { + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); assert(!info.exited, "Cannot cancel wait when none are pending"); assert(info.callbacks.length > wait_id, "Invalid process wait ID"); info.callbacks[wait_id] = null; - })(pid); + }); } private void onProcessExit(int system_pid) @@ -327,7 +211,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces int exitCode; ProcessWaitCallback[] callbacks; - auto driver = lockedProcessInfo!((info) { + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (info) { assert(info !is null); exitCode = info.exitCode; @@ -335,8 +220,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces callbacks = info.callbacks; info.callbacks = null; - return info.driver; - })(pid); + driver = info.driver; + }); foreach (cb; callbacks) { if (cb) @@ -348,53 +233,162 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces final override bool hasExited(ProcessID pid) { - return lockedProcessInfo!((info) { + bool ret; + lockedProcessInfo(pid, (info) { assert(info !is null, "Unknown process ID"); - - return info.exited; - })(pid); + ret = info.exited; + }); + return ret; } final override void addRef(ProcessID pid) { - lockedProcessInfo!((info) { + lockedProcessInfo(pid, (info) { nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); info.refCount++; - })(pid); + }); } final override bool releaseRef(ProcessID pid) { - return lockedProcessInfo!((info) { + bool ret; + lockedProcessInfo(pid, (info) { nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); if (--info.refCount == 0) { // Remove/deallocate process if (info.userDataDestructor) () @trusted { info.userDataDestructor(info.userData.ptr); } (); - StaticProcesses.m_processes.remove(pid); - return false; - } - return true; - })(pid); + () @trusted { s_processes.remove(pid); } (); + ret = false; + } else ret = true; + }); + return ret; } final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) @system { - return lockedProcessInfo!((info) { + void* ret; + lockedProcessInfo(pid, (info) @safe nothrow { assert(info.userDataDestructor is null || info.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); if (!info.userDataDestructor) { - initialize(info.userData.ptr); + () @trusted { initialize(info.userData.ptr); } (); info.userDataDestructor = destroy; } - return info.userData.ptr; - })(pid); + ret = () @trusted { return info.userData.ptr; } (); + }); + return ret; } - package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } + package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; } + + + shared static this() + { + s_mutex = new shared Mutex; + } + + private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + auto info = () @trusted { return pid in s_processes; } (); + fn(info); + } + + private static void add(ProcessID pid, ProcessInfo info) @trusted { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + + if (!s_waitThread) { + s_waitThread = new Thread(&waitForProcesses); + s_waitThread.start(); + } + + assert(pid !in s_processes, "Process adopted twice"); + s_processes[pid] = info; + } + + private static void waitForProcesses() + @system { + import core.stdc.errno : ECHILD, errno; + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; + + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + s_waitThread = null; + } + break; + } + + ProcessID[] allprocs; + + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + + + () @trusted { + foreach (ref entry; s_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; + } + } (); + } + + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExitStatic(ret, exitstatus); + } + } + } + } + + private static void onProcessExitStatic(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; + + ProcessWaitCallback[] callbacks; + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (ProcessInfo* info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) return; + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exit_status; + driver = info.driver; + }); + + if (driver) + () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); + } + + private static struct ProcessInfo { + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + PosixEventDriverProcesses driver; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + } } final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { From 5c3afcc175d59e5410d3354037aba6164d1b02bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 22 Aug 2019 14:35:37 +0200 Subject: [PATCH 10/11] Ensure that a valid PID is passed to kill(). --- source/eventcore/drivers/posix/processes.d | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index d2ac0cf..2781b1f 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -158,7 +158,10 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces @trusted { import core.sys.posix.signal : pkill = kill; - pkill(cast(int)pid, signal); + assert(cast(int)pid > 0, "Invalid PID passed to kill."); + + if (cast(int)pid > 0) + pkill(cast(int)pid, signal); } final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) From 20373d10db0c9e2e9c13eae73263425605d7f847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 23 Aug 2019 19:54:55 +0200 Subject: [PATCH 11/11] Fix indentation and remove unused imports/variables. --- source/eventcore/drivers/posix/processes.d | 124 ++++++++++----------- 1 file changed, 59 insertions(+), 65 deletions(-) diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 2781b1f..23cf4ec 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -16,10 +16,8 @@ private enum SIGCHLD = 17; final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sync.mutex : Mutex; - import core.sys.linux.sys.signalfd; - import core.sys.posix.unistd : close, read, write, dup; + import core.sys.posix.unistd : dup; import core.thread : Thread; private { @@ -34,8 +32,6 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces this(Loop loop, EventDriver driver) { - import core.sys.posix.signal; - m_loop = loop; m_driver = driver; } @@ -302,85 +298,83 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces fn(info); } - private static void add(ProcessID pid, ProcessInfo info) @trusted { - s_mutex.lock_nothrow(); - scope (exit) s_mutex.unlock_nothrow(); + private static void add(ProcessID pid, ProcessInfo info) @trusted { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); - if (!s_waitThread) { - s_waitThread = new Thread(&waitForProcesses); - s_waitThread.start(); - } - - assert(pid !in s_processes, "Process adopted twice"); - s_processes[pid] = info; + if (!s_waitThread) { + s_waitThread = new Thread(&waitForProcesses); + s_waitThread.start(); } - private static void waitForProcesses() - @system { - import core.stdc.errno : ECHILD, errno; - import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; - import core.sys.posix.signal : siginfo_t; + assert(pid !in s_processes, "Process adopted twice"); + s_processes[pid] = info; + } - while (true) { - siginfo_t dummy; - auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); - if (ret == -1) { - { - s_mutex.lock_nothrow(); - scope (exit) s_mutex.unlock_nothrow(); - s_waitThread = null; - } - break; - } - - ProcessID[] allprocs; + private static void waitForProcesses() + @system { + import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; + import core.sys.posix.signal : siginfo_t; + while (true) { + siginfo_t dummy; + auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); + if (ret == -1) { { s_mutex.lock_nothrow(); scope (exit) s_mutex.unlock_nothrow(); - - - () @trusted { - foreach (ref entry; s_processes.byKeyValue) { - if (!entry.value.exited) - allprocs ~= entry.key; - } - } (); + s_waitThread = null; } + break; + } - foreach (pid; allprocs) { - int status; - ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) { - int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); - onProcessExitStatic(ret, exitstatus); + ProcessID[] allprocs; + + { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + + + () @trusted { + foreach (ref entry; s_processes.byKeyValue) { + if (!entry.value.exited) + allprocs ~= entry.key; } + } (); + } + + foreach (pid; allprocs) { + int status; + ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); + if (ret == cast(int)pid) { + int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); + onProcessExitStatic(ret, exitstatus); } } } + } - private static void onProcessExitStatic(int system_pid, int exit_status) - { - auto pid = cast(ProcessID)system_pid; + private static void onProcessExitStatic(int system_pid, int exit_status) + { + auto pid = cast(ProcessID)system_pid; - ProcessWaitCallback[] callbacks; - PosixEventDriverProcesses driver; - lockedProcessInfo(pid, (ProcessInfo* info) @safe { - // We get notified of any child exiting, so ignore the ones we're - // not aware of - if (info is null) return; + PosixEventDriverProcesses driver; + lockedProcessInfo(pid, (ProcessInfo* info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) return; - // Increment the ref count to make sure it doesn't get removed - info.refCount++; + // Increment the ref count to make sure it doesn't get removed + info.refCount++; - info.exited = true; - info.exitCode = exit_status; - driver = info.driver; - }); + info.exited = true; + info.exitCode = exit_status; + driver = info.driver; + }); - if (driver) - () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); - } + if (driver) + () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); + } private static struct ProcessInfo { bool exited = true;