From a8cd897df7cbc01286196c3d0f960e2f625730a6 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 21 Jul 2019 20:26:21 +1000 Subject: [PATCH] Proposed fix for processes in multi-threaded environments --- source/eventcore/drivers/posix/driver.d | 2 +- source/eventcore/drivers/posix/processes.d | 220 ++++++++++++++------- 2 files changed, 153 insertions(+), 69 deletions(-) diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index de8acd1..f3e93e2 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -79,7 +79,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_signals = mallocT!SignalsDriver(m_loop); m_timers = mallocT!TimerDriver; m_pipes = mallocT!PipeDriver(m_loop); - m_processes = mallocT!ProcessDriver(m_loop, m_pipes); + m_processes = mallocT!ProcessDriver(m_loop, this); m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes); m_dns = mallocT!DNSDriver(m_events, m_signals); m_files = mallocT!FileDriver(m_events); diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index c647b7c..658477c 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -9,6 +9,48 @@ import eventcore.internal.utils : nogc_assert, print; import std.algorithm.comparison : among; import std.variant : visit; +private struct ProcessInfo { + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + EventDriverProcesses driver; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; +} + +private struct StaticProcesses { +@safe: nothrow: + import core.sync.mutex : Mutex; + + private { + static shared Mutex m_mutex; + static __gshared ProcessInfo[ProcessID] m_processes; + } + + shared static this() + { + m_mutex = new shared Mutex; + } + + static void add(ProcessID pid, ProcessInfo info) @trusted { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + assert(pid !in m_processes, "Process adopted twice"); + m_processes[pid] = info; + } +} + +private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { + StaticProcesses.m_mutex.lock_nothrow(); + scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); + auto info = pid in StaticProcesses.m_processes; + + return fn(info); +} + private enum SIGCHLD = 17; @@ -19,28 +61,17 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce import core.sys.posix.unistd : close, read, write, dup; private { - static struct ProcessInfo { - bool exited = true; - int exitCode; - ProcessWaitCallback[] callbacks; - size_t refCount = 0; - - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; - } - Loop m_loop; - EventDriverPipes m_pipes; - ProcessInfo[ProcessID] m_processes; + EventDriver m_driver; SignalListenID m_sighandle; } - this(Loop loop, EventDriverPipes pipes) + this(Loop loop, EventDriver driver) { import core.sys.posix.signal; m_loop = loop; - m_pipes = pipes; + m_driver = driver; // Listen for child process exits using SIGCHLD m_sighandle = () @trusted { @@ -73,12 +104,12 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce final override ProcessID adopt(int system_pid) { auto pid = cast(ProcessID)system_pid; - assert(pid !in m_processes, "Process is already adopted"); ProcessInfo info; info.exited = false; info.refCount = 1; - m_processes[pid] = info; + info.driver = this; + StaticProcesses.add(pid, info); return pid; } @@ -120,7 +151,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce case ProcessRedirect.none: return File.init; case ProcessRedirect.pipe: auto p = pipe(); - process.stdin = m_pipes.adopt(dup(p.writeEnd.fileno)); + process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); return p.readEnd; } }); @@ -133,7 +164,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce case ProcessRedirect.none: return File.init; case ProcessRedirect.pipe: auto p = pipe(); - process.stdout = m_pipes.adopt(dup(p.readEnd.fileno)); + process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); return p.writeEnd; } }, @@ -147,7 +178,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce case ProcessRedirect.none: return File.init; case ProcessRedirect.pipe: auto p = pipe(); - process.stderr = m_pipes.adopt(dup(p.readEnd.fileno)); + process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); return p.writeEnd; } }, @@ -190,26 +221,38 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) { - auto info = () @trusted { return pid in m_processes; } (); - assert(info !is null, "Unknown process ID"); + bool exited; + int exitCode; - if (info.exited) { - on_process_exit(pid, info.exitCode); - return 0; - } else { - info.callbacks ~= on_process_exit; - return info.callbacks.length - 1; + size_t id = lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); + + if (info.exited) { + exited = true; + exitCode = info.exitCode; + return 0; + } else { + info.callbacks ~= on_process_exit; + return info.callbacks.length - 1; + } + })(pid); + + if (exited) { + on_process_exit(pid, exitCode); } + + return id; } final override void cancelWait(ProcessID pid, size_t waitId) { - auto info = () @trusted { return pid in m_processes; } (); - assert(info !is null, "Unknown process ID"); - assert(!info.exited, "Cannot cancel wait when none are pending"); - assert(info.callbacks.length > waitId, "Invalid process wait ID"); + lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); + assert(!info.exited, "Cannot cancel wait when none are pending"); + assert(info.callbacks.length > waitId, "Invalid process wait ID"); - info.callbacks[waitId] = null; + info.callbacks[waitId] = null; + })(pid); } private void onSignal(FD fd) @@ -230,75 +273,116 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce private void onProcessExit(int system_pid, int exitCode) { auto pid = cast(ProcessID)system_pid; - auto info = () @trusted { return pid in m_processes; } (); - // We get notified of any child exiting, so ignore the ones we're not - // aware of - if (info is null) { - return; + ProcessWaitCallback[] callbacks; + auto driver = lockedProcessInfo!((info) @safe { + // We get notified of any child exiting, so ignore the ones we're + // not aware of + if (info is null) { + return null; + } + + // Increment the ref count to make sure it doesn't get removed + info.refCount++; + + info.exited = true; + info.exitCode = exitCode; + return info.driver; + })(pid); + + // Need to call callbacks in the owner thread as this function can be + // called from any thread. Without extra threads this is always the main + // thread. + if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { + onLocalProcessExit(cast(intptr_t)pid); + } else { + auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); + + sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); } + } - info.exited = true; - info.exitCode = exitCode; + private static void onLocalProcessExit(intptr_t system_pid) + { + auto pid = cast(ProcessID)system_pid; - foreach (cb; info.callbacks) { + int exitCode; + ProcessWaitCallback[] callbacks; + + auto driver = lockedProcessInfo!((info) { + assert(info !is null); + + exitCode = info.exitCode; + + callbacks = info.callbacks; + info.callbacks = null; + + return info.driver; + })(pid); + + foreach (cb; callbacks) { if (cb) cb(pid, exitCode); } - info.callbacks = null; + + driver.releaseRef(pid); } final override bool hasExited(ProcessID pid) { - auto info = () @trusted { return pid in m_processes; } (); - assert(info !is null, "Unknown process ID"); + return lockedProcessInfo!((info) { + assert(info !is null, "Unknown process ID"); - return info.exited; + return info.exited; + })(pid); } final override void addRef(ProcessID pid) { - auto info = () @trusted { return &m_processes[pid]; } (); - nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); - info.refCount++; + lockedProcessInfo!((info) { + nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); + info.refCount++; + })(pid); } final override bool releaseRef(ProcessID pid) { - auto info = () @trusted { return &m_processes[pid]; } (); - nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); - if (--info.refCount == 0) { - // Remove/deallocate process - if (info.userDataDestructor) - () @trusted { info.userDataDestructor(info.userData.ptr); } (); + return lockedProcessInfo!((info) { + nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); + if (--info.refCount == 0) { + // Remove/deallocate process + if (info.userDataDestructor) + () @trusted { info.userDataDestructor(info.userData.ptr); } (); - m_processes.remove(pid); - return false; - } - return true; + StaticProcesses.m_processes.remove(pid); + return false; + } + return true; + })(pid); } final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) @system { - auto info = () @trusted { return &m_processes[pid]; } (); - assert(info.userDataDestructor is null || info.userDataDestructor is destroy, - "Requesting user data with differing type (destructor)."); - assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); + return lockedProcessInfo!((info) { + assert(info.userDataDestructor is null || info.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); - if (!info.userDataDestructor) { - initialize(info.userData.ptr); - info.userDataDestructor = destroy; - } - return info.userData.ptr; + if (!info.userDataDestructor) { + initialize(info.userData.ptr); + info.userDataDestructor = destroy; + } + return info.userData.ptr; + })(pid); } - package final @property size_t pendingCount() const nothrow { return m_processes.length; } + package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } } final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - this(Loop loop, EventDriverPipes pipes) {} + this(Loop loop, EventDriver driver) {} void dispose() {}