Proposed fix for processes in multi-threaded environments

This commit is contained in:
Benjamin Schaaf 2019-07-21 20:26:21 +10:00
parent e7bf50ea2d
commit a8cd897df7
2 changed files with 153 additions and 69 deletions

View file

@ -79,7 +79,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_signals = mallocT!SignalsDriver(m_loop); m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver; m_timers = mallocT!TimerDriver;
m_pipes = mallocT!PipeDriver(m_loop); m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, m_pipes); m_processes = mallocT!ProcessDriver(m_loop, this);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes); m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_dns = mallocT!DNSDriver(m_events, m_signals); m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events); m_files = mallocT!FileDriver(m_events);

View file

@ -9,6 +9,48 @@ import eventcore.internal.utils : nogc_assert, print;
import std.algorithm.comparison : among; import std.algorithm.comparison : among;
import std.variant : visit; import std.variant : visit;
private struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
EventDriverProcesses driver;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
private struct StaticProcesses {
@safe: nothrow:
import core.sync.mutex : Mutex;
private {
static shared Mutex m_mutex;
static __gshared ProcessInfo[ProcessID] m_processes;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static void add(ProcessID pid, ProcessInfo info) @trusted {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
assert(pid !in m_processes, "Process adopted twice");
m_processes[pid] = info;
}
}
private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted {
StaticProcesses.m_mutex.lock_nothrow();
scope (exit) StaticProcesses.m_mutex.unlock_nothrow();
auto info = pid in StaticProcesses.m_processes;
return fn(info);
}
private enum SIGCHLD = 17; private enum SIGCHLD = 17;
@ -19,28 +61,17 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
import core.sys.posix.unistd : close, read, write, dup; import core.sys.posix.unistd : close, read, write, dup;
private { private {
static struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
Loop m_loop; Loop m_loop;
EventDriverPipes m_pipes; EventDriver m_driver;
ProcessInfo[ProcessID] m_processes;
SignalListenID m_sighandle; SignalListenID m_sighandle;
} }
this(Loop loop, EventDriverPipes pipes) this(Loop loop, EventDriver driver)
{ {
import core.sys.posix.signal; import core.sys.posix.signal;
m_loop = loop; m_loop = loop;
m_pipes = pipes; m_driver = driver;
// Listen for child process exits using SIGCHLD // Listen for child process exits using SIGCHLD
m_sighandle = () @trusted { m_sighandle = () @trusted {
@ -73,12 +104,12 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
final override ProcessID adopt(int system_pid) final override ProcessID adopt(int system_pid)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
assert(pid !in m_processes, "Process is already adopted");
ProcessInfo info; ProcessInfo info;
info.exited = false; info.exited = false;
info.refCount = 1; info.refCount = 1;
m_processes[pid] = info; info.driver = this;
StaticProcesses.add(pid, info);
return pid; return pid;
} }
@ -120,7 +151,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stdin = m_pipes.adopt(dup(p.writeEnd.fileno)); process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno));
return p.readEnd; return p.readEnd;
} }
}); });
@ -133,7 +164,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stdout = m_pipes.adopt(dup(p.readEnd.fileno)); process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd; return p.writeEnd;
} }
}, },
@ -147,7 +178,7 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stderr = m_pipes.adopt(dup(p.readEnd.fileno)); process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd; return p.writeEnd;
} }
}, },
@ -190,26 +221,38 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{ {
auto info = () @trusted { return pid in m_processes; } (); bool exited;
assert(info !is null, "Unknown process ID"); int exitCode;
if (info.exited) { size_t id = lockedProcessInfo!((info) {
on_process_exit(pid, info.exitCode); assert(info !is null, "Unknown process ID");
return 0;
} else { if (info.exited) {
info.callbacks ~= on_process_exit; exited = true;
return info.callbacks.length - 1; exitCode = info.exitCode;
return 0;
} else {
info.callbacks ~= on_process_exit;
return info.callbacks.length - 1;
}
})(pid);
if (exited) {
on_process_exit(pid, exitCode);
} }
return id;
} }
final override void cancelWait(ProcessID pid, size_t waitId) final override void cancelWait(ProcessID pid, size_t waitId)
{ {
auto info = () @trusted { return pid in m_processes; } (); lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending"); assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > waitId, "Invalid process wait ID"); assert(info.callbacks.length > waitId, "Invalid process wait ID");
info.callbacks[waitId] = null; info.callbacks[waitId] = null;
})(pid);
} }
private void onSignal(FD fd) private void onSignal(FD fd)
@ -230,75 +273,116 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
private void onProcessExit(int system_pid, int exitCode) private void onProcessExit(int system_pid, int exitCode)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
auto info = () @trusted { return pid in m_processes; } ();
// We get notified of any child exiting, so ignore the ones we're not ProcessWaitCallback[] callbacks;
// aware of auto driver = lockedProcessInfo!((info) @safe {
if (info is null) { // We get notified of any child exiting, so ignore the ones we're
return; // not aware of
if (info is null) {
return null;
}
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exitCode;
return info.driver;
})(pid);
// Need to call callbacks in the owner thread as this function can be
// called from any thread. Without extra threads this is always the main
// thread.
if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) {
onLocalProcessExit(cast(intptr_t)pid);
} else {
auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } ();
sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid);
} }
}
info.exited = true; private static void onLocalProcessExit(intptr_t system_pid)
info.exitCode = exitCode; {
auto pid = cast(ProcessID)system_pid;
foreach (cb; info.callbacks) { int exitCode;
ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) {
assert(info !is null);
exitCode = info.exitCode;
callbacks = info.callbacks;
info.callbacks = null;
return info.driver;
})(pid);
foreach (cb; callbacks) {
if (cb) if (cb)
cb(pid, exitCode); cb(pid, exitCode);
} }
info.callbacks = null;
driver.releaseRef(pid);
} }
final override bool hasExited(ProcessID pid) final override bool hasExited(ProcessID pid)
{ {
auto info = () @trusted { return pid in m_processes; } (); return lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
return info.exited; return info.exited;
})(pid);
} }
final override void addRef(ProcessID pid) final override void addRef(ProcessID pid)
{ {
auto info = () @trusted { return &m_processes[pid]; } (); lockedProcessInfo!((info) {
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++; info.refCount++;
})(pid);
} }
final override bool releaseRef(ProcessID pid) final override bool releaseRef(ProcessID pid)
{ {
auto info = () @trusted { return &m_processes[pid]; } (); return lockedProcessInfo!((info) {
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) { if (--info.refCount == 0) {
// Remove/deallocate process // Remove/deallocate process
if (info.userDataDestructor) if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } (); () @trusted { info.userDataDestructor(info.userData.ptr); } ();
m_processes.remove(pid); StaticProcesses.m_processes.remove(pid);
return false; return false;
} }
return true; return true;
})(pid);
} }
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
auto info = () @trusted { return &m_processes[pid]; } (); return lockedProcessInfo!((info) {
assert(info.userDataDestructor is null || info.userDataDestructor is destroy, assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
if (!info.userDataDestructor) { if (!info.userDataDestructor) {
initialize(info.userData.ptr); initialize(info.userData.ptr);
info.userDataDestructor = destroy; info.userDataDestructor = destroy;
} }
return info.userData.ptr; return info.userData.ptr;
})(pid);
} }
package final @property size_t pendingCount() const nothrow { return m_processes.length; } package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; }
} }
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
this(Loop loop, EventDriverPipes pipes) {} this(Loop loop, EventDriver driver) {}
void dispose() {} void dispose() {}