Fix Windows compilation.

Integrates the contents of StaticProcesses into PosixEventDriverProcesses to fully hide it form the Windows build. It also changes lockedProcessInfo to be a non-template function, as that lead to a linker error on macOS.
This commit is contained in:
Sönke Ludwig 2019-08-22 13:59:43 +02:00
parent f1c2eb779f
commit 01c2c26964

View file

@ -10,125 +10,6 @@ import std.algorithm.comparison : among;
import std.variant : visit; import std.variant : visit;
import std.stdint; import std.stdint;
private struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
void delegate(int pid) shared nothrow onExit;
EventDriverProcesses driver;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
private struct StaticProcesses {
@safe: nothrow:
import core.sync.mutex : Mutex;
import core.thread : Thread;
private {
static shared Mutex m_mutex;
static __gshared ProcessInfo[ProcessID] m_processes;
static __gshared Thread m_waitThread;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static void add(ProcessID pid, ProcessInfo info) @trusted {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_waitThread) {
m_waitThread = new Thread(&waitForProcesses);
m_waitThread.start();
}
assert(pid !in m_processes, "Process adopted twice");
m_processes[pid] = info;
}
private static void waitForProcesses()
@system {
import core.stdc.errno : ECHILD, errno;
import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid;
import core.sys.posix.signal : siginfo_t;
while (true) {
siginfo_t dummy;
auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT);
if (ret == -1) {
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
m_waitThread = null;
}
break;
}
ProcessID[] allprocs;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
() @trusted {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
foreach (ref entry; StaticProcesses.m_processes.byKeyValue) {
if (!entry.value.exited)
allprocs ~= entry.key;
}
} ();
}
foreach (pid; allprocs) {
int status;
ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
if (ret == cast(int)pid) {
int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
onProcessExit(ret, exitstatus);
}
}
}
}
private static void onProcessExit(int system_pid, int exit_status)
{
auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks;
auto onexit = lockedProcessInfo!((info) @safe {
// We get notified of any child exiting, so ignore the ones we're
// not aware of
if (info is null) {
return null;
}
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exit_status;
return info.onExit;
})(pid);
onexit(cast(int)pid);
}
}
private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted {
StaticProcesses.m_mutex.lock_nothrow();
scope (exit) StaticProcesses.m_mutex.unlock_nothrow();
auto info = pid in StaticProcesses.m_processes;
return fn(info);
}
private enum SIGCHLD = 17; private enum SIGCHLD = 17;
@ -136,10 +17,16 @@ private enum SIGCHLD = 17;
final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sync.mutex : Mutex;
import core.sys.linux.sys.signalfd; import core.sys.linux.sys.signalfd;
import core.sys.posix.unistd : close, read, write, dup; import core.sys.posix.unistd : close, read, write, dup;
import core.thread : Thread;
private { private {
static shared Mutex s_mutex;
static __gshared ProcessInfo[ProcessID] s_processes;
static __gshared Thread s_waitThread;
Loop m_loop; Loop m_loop;
// FIXME: avoid virtual funciton calls and use the final type instead // FIXME: avoid virtual funciton calls and use the final type instead
EventDriver m_driver; EventDriver m_driver;
@ -161,14 +48,11 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
auto sthis = () @trusted { return cast(shared)this; } ();
ProcessInfo info; ProcessInfo info;
info.exited = false; info.exited = false;
info.refCount = 1; info.refCount = 1;
info.onExit = () @trusted { return &sthis.onProcessExit; } ();
info.driver = this; info.driver = this;
StaticProcesses.add(pid, info); add(pid, info);
return pid; return pid;
} }
@ -282,18 +166,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
bool exited; bool exited;
int exitCode; int exitCode;
size_t id = lockedProcessInfo!((info) { size_t id = size_t.max;
lockedProcessInfo(pid, (info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
if (info.exited) { if (info.exited) {
exited = true; exited = true;
exitCode = info.exitCode; exitCode = info.exitCode;
return size_t.max;
} else { } else {
info.callbacks ~= on_process_exit; info.callbacks ~= on_process_exit;
return info.callbacks.length - 1; id = info.callbacks.length - 1;
} }
})(pid); });
if (exited) { if (exited) {
on_process_exit(pid, exitCode); on_process_exit(pid, exitCode);
@ -306,13 +190,13 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
{ {
if (wait_id == size_t.max) return; if (wait_id == size_t.max) return;
lockedProcessInfo!((info) { lockedProcessInfo(pid, (info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending"); assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > wait_id, "Invalid process wait ID"); assert(info.callbacks.length > wait_id, "Invalid process wait ID");
info.callbacks[wait_id] = null; info.callbacks[wait_id] = null;
})(pid); });
} }
private void onProcessExit(int system_pid) private void onProcessExit(int system_pid)
@ -327,7 +211,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
int exitCode; int exitCode;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) { PosixEventDriverProcesses driver;
lockedProcessInfo(pid, (info) {
assert(info !is null); assert(info !is null);
exitCode = info.exitCode; exitCode = info.exitCode;
@ -335,8 +220,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
callbacks = info.callbacks; callbacks = info.callbacks;
info.callbacks = null; info.callbacks = null;
return info.driver; driver = info.driver;
})(pid); });
foreach (cb; callbacks) { foreach (cb; callbacks) {
if (cb) if (cb)
@ -348,53 +233,162 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
final override bool hasExited(ProcessID pid) final override bool hasExited(ProcessID pid)
{ {
return lockedProcessInfo!((info) { bool ret;
lockedProcessInfo(pid, (info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
ret = info.exited;
return info.exited; });
})(pid); return ret;
} }
final override void addRef(ProcessID pid) final override void addRef(ProcessID pid)
{ {
lockedProcessInfo!((info) { lockedProcessInfo(pid, (info) {
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++; info.refCount++;
})(pid); });
} }
final override bool releaseRef(ProcessID pid) final override bool releaseRef(ProcessID pid)
{ {
return lockedProcessInfo!((info) { bool ret;
lockedProcessInfo(pid, (info) {
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) { if (--info.refCount == 0) {
// Remove/deallocate process // Remove/deallocate process
if (info.userDataDestructor) if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } (); () @trusted { info.userDataDestructor(info.userData.ptr); } ();
StaticProcesses.m_processes.remove(pid); () @trusted { s_processes.remove(pid); } ();
return false; ret = false;
} } else ret = true;
return true; });
})(pid); return ret;
} }
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
return lockedProcessInfo!((info) { void* ret;
lockedProcessInfo(pid, (info) @safe nothrow {
assert(info.userDataDestructor is null || info.userDataDestructor is destroy, assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
if (!info.userDataDestructor) { if (!info.userDataDestructor) {
initialize(info.userData.ptr); () @trusted { initialize(info.userData.ptr); } ();
info.userDataDestructor = destroy; info.userDataDestructor = destroy;
} }
return info.userData.ptr; ret = () @trusted { return info.userData.ptr; } ();
})(pid); });
return ret;
} }
package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; }
shared static this()
{
s_mutex = new shared Mutex;
}
private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
{
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
auto info = () @trusted { return pid in s_processes; } ();
fn(info);
}
private static void add(ProcessID pid, ProcessInfo info) @trusted {
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
if (!s_waitThread) {
s_waitThread = new Thread(&waitForProcesses);
s_waitThread.start();
}
assert(pid !in s_processes, "Process adopted twice");
s_processes[pid] = info;
}
private static void waitForProcesses()
@system {
import core.stdc.errno : ECHILD, errno;
import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid;
import core.sys.posix.signal : siginfo_t;
while (true) {
siginfo_t dummy;
auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT);
if (ret == -1) {
{
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
s_waitThread = null;
}
break;
}
ProcessID[] allprocs;
{
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
() @trusted {
foreach (ref entry; s_processes.byKeyValue) {
if (!entry.value.exited)
allprocs ~= entry.key;
}
} ();
}
foreach (pid; allprocs) {
int status;
ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
if (ret == cast(int)pid) {
int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
onProcessExitStatic(ret, exitstatus);
}
}
}
}
private static void onProcessExitStatic(int system_pid, int exit_status)
{
auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks;
PosixEventDriverProcesses driver;
lockedProcessInfo(pid, (ProcessInfo* info) @safe {
// We get notified of any child exiting, so ignore the ones we're
// not aware of
if (info is null) return;
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exit_status;
driver = info.driver;
});
if (driver)
() @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid);
}
private static struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
PosixEventDriverProcesses driver;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
} }
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {