From 7d091ed5044b9f949b9914605381b0c2c4bb3217 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Tue, 15 Jan 2019 19:58:01 +1100 Subject: [PATCH] Add APIs for working with Subprocesses and Pipes with an implementation for Posix --- source/eventcore/driver.d | 189 ++++++++++- source/eventcore/drivers/posix/driver.d | 19 +- source/eventcore/drivers/posix/pipes.d | 337 ++++++++++++++++++++ source/eventcore/drivers/posix/processes.d | 292 +++++++++++++++++ source/eventcore/drivers/winapi/driver.d | 10 + source/eventcore/drivers/winapi/pipes.d | 59 ++++ source/eventcore/drivers/winapi/processes.d | 54 ++++ 7 files changed, 955 insertions(+), 5 deletions(-) create mode 100644 source/eventcore/drivers/posix/pipes.d create mode 100644 source/eventcore/drivers/posix/processes.d create mode 100644 source/eventcore/drivers/winapi/pipes.d create mode 100644 source/eventcore/drivers/winapi/processes.d diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 61a24f6..2bb05e6 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -19,8 +19,10 @@ module eventcore.driver; @safe: /*@nogc:*/ nothrow: import core.time : Duration; +import std.process : StdProcessConfig = Config; import std.socket : Address; import std.stdint : intptr_t; +import std.variant : Algebraic; /** Encapsulates a full event driver. @@ -51,6 +53,10 @@ interface EventDriver { @property inout(EventDriverFiles) files() inout; /// Directory change watching @property inout(EventDriverWatchers) watchers() inout; + /// Sub-process handling + @property inout(EventDriverProcesses) processes() inout; + /// Pipes + @property inout(EventDriverPipes) pipes() inout; /** Releases all resources associated with the driver. @@ -646,6 +652,154 @@ interface EventDriverWatchers { protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; } +interface EventDriverProcesses { +@safe: /*@nogc:*/ nothrow: + /** Adopt an existing process. + */ + ProcessID adopt(int system_pid); + + /** Spawn a child process. + + Note that if a default signal handler exists for the signal, it will be + disabled by using this function. + + Params: + args = The program arguments. First one must be an executable. + stdin = What should be done for stdin. Allows inheritance, piping, + nothing or any specific fd. If this results in a Pipe, + the PipeFD will be set in the stdin result. + stdout = See stdin, but also allows redirecting to stderr. + stderr = See stdin, but also allows redirecting to stdout. + env = The environment variables to spawn the process with. + config = Special process configurations. + working_dir = What to set the working dir in the process. + + Returns: + Returns a Process struct containing the ProcessID and whatever + pipes have been adopted for stdin, stdout and stderr. + */ + Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env = null, ProcessConfig config = ProcessConfig.none, string working_dir = null); + + /** Returns whether the process has exited yet. + */ + bool hasExited(ProcessID pid); + + /** Kill the process using the given signal. Has different effects on different platforms. + */ + void kill(ProcessID pid, int signal); + + /** Wait for the process to exit. Returns an identifier that can be used to cancel the wait. + */ + size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit); + + /** Cancel a wait for the given identifier returned by wait. + */ + void cancelWait(ProcessID pid, size_t waitId); + + /** Increments the reference count of the given resource. + */ + void addRef(ProcessID pid); + + /** Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. This will not kill + the sub-process, nor "detach" it. + + Returns: + Returns `false` $(I iff) the last reference was removed by this call. + */ + bool releaseRef(ProcessID pid); + + /** Retrieves a reference to a user-defined value associated with a descriptor. + */ + @property final ref T userData(T)(ProcessID descriptor) + @trusted { + import std.conv : emplace; + static void init(void* ptr) { emplace(cast(T*)ptr); } + static void destr(void* ptr) { destroy(*cast(T*)ptr); } + return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + } + + /// Low-level user data access. Use `userData` instead. + protected void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; +} + +interface EventDriverPipes { +@safe: /*@nogc:*/ nothrow: + /** Adopt an existing pipe. This will modify the pipe to be non-blocking. + + Note that pipes generally only allow either reads or writes but not + both, it is up to you to only call valid functions. + */ + PipeFD adopt(int system_pipe_handle); + + /** Reads data from a stream socket. + + Note that only a single read operation is allowed at once. The caller + needs to make sure that either `on_read_finish` got called, or + `cancelRead` was called before issuing the next call to `read`. + */ + void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish); + + /** Cancels an ongoing read operation. + + After this function has been called, the `PipeIOCallback` specified in + the call to `read` is guaranteed to not be called. + */ + void cancelRead(PipeFD pipe); + + /** Writes data from a stream socket. + + Note that only a single write operation is allowed at once. The caller + needs to make sure that either `on_write_finish` got called, or + `cancelWrite` was called before issuing the next call to `write`. + */ + void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish); + + /** Cancels an ongoing write operation. + + After this function has been called, the `PipeIOCallback` specified in + the call to `write` is guaranteed to not be called. + */ + void cancelWrite(PipeFD pipe); + + /** Waits for incoming data without actually reading it. + */ + void waitForData(PipeFD pipe, PipeIOCallback on_data_available); + + /** Immediately close the pipe. Future read or write operations may fail. + */ + void close(PipeFD pipe); + + /** Increments the reference count of the given resource. + */ + void addRef(PipeFD pid); + + /** Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + + Returns: + Returns `false` $(I iff) the last reference was removed by this call. + */ + bool releaseRef(PipeFD pid); + + /** Retrieves a reference to a user-defined value associated with a descriptor. + */ + @property final ref T userData(T)(PipeFD descriptor) + @trusted { + import std.conv : emplace; + static void init(void* ptr) { emplace(cast(T*)ptr); } + static void destr(void* ptr) { destroy(*cast(T*)ptr); } + return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + } + + /// Low-level user data access. Use `userData` instead. + protected void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; + +} // Helper class to enable fully stack allocated `std.socket.Address` instances. final class RefAddress : Address { @@ -680,13 +834,22 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); +alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias TimerCallback = void delegate(TimerID); alias TimerCallback2 = void delegate(TimerID, bool fired); alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change); +alias ProcessWaitCallback = void delegate(ProcessID, int); @system alias DataInitializer = void function(void*) @nogc; +enum ProcessRedirect { inherit, pipe, none } +alias ProcessStdinFile = Algebraic!(int, ProcessRedirect); +enum ProcessStdoutRedirect { toStderr } +alias ProcessStdoutFile = Algebraic!(int, ProcessRedirect, ProcessStdoutRedirect); +enum ProcessStderrRedirect { toStdout } +alias ProcessStderrFile = Algebraic!(int, ProcessRedirect, ProcessStderrRedirect); + enum ExitReason { timeout, idle, @@ -775,6 +938,13 @@ enum SignalStatus { error } +/// See std.process.Config +enum ProcessConfig { + none = StdProcessConfig.none, + detached = StdProcessConfig.detached, + newEnv = StdProcessConfig.newEnv, + suppressConsole = StdProcessConfig.suppressConsole, +} /** Describes a single change in a watched directory. */ @@ -799,6 +969,17 @@ struct FileChange { bool isDirectory; } +/** Describes a spawned process +*/ +struct Process { + ProcessID pid; + + // TODO: Convert these to PipeFD once dmd is fixed + PipeFD stdin; + PipeFD stdout; + PipeFD stderr; +} + mixin template Handle(string NAME, T, T invalid_value = T.init) { static if (is(T.BaseType)) alias BaseType = T.BaseType; else alias BaseType = T; @@ -813,13 +994,14 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) { this(BaseType value) { this.value = T(value); } - U opCast(U : Handle!(V, M), V, int M)() { + U opCast(U : Handle!(V, M), V, int M)() + const { // TODO: verify that U derives from typeof(this)! return U(value); } U opCast(U : BaseType)() - { + const { return cast(U)value; } @@ -834,9 +1016,12 @@ struct StreamSocketFD { mixin Handle!("streamSocket", SocketFD); } struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); } struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); } struct FileFD { mixin Handle!("file", FD); } +// FD.init is required here due to https://issues.dlang.org/show_bug.cgi?id=19585 +struct PipeFD { mixin Handle!("pipe", FD, FD.init); } struct EventID { mixin Handle!("event", FD); } struct TimerID { mixin Handle!("timer", size_t, size_t.max); } struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); } struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); } struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); } struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); } +struct ProcessID { mixin Handle!("process", size_t, size_t.max); } diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index bd41642..15b8abe 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -12,6 +12,8 @@ import eventcore.drivers.posix.events; import eventcore.drivers.posix.signals; import eventcore.drivers.posix.sockets; import eventcore.drivers.posix.watchers; +import eventcore.drivers.posix.processes; +import eventcore.drivers.posix.pipes; import eventcore.drivers.timer; import eventcore.drivers.threadedfile; import eventcore.internal.consumablequeue : ConsumableQueue; @@ -48,9 +50,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); alias FileDriver = ThreadedFileEventDriver!EventsDriver; + alias PipeDriver = PosixEventDriverPipes!Loop; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; + alias ProcessDriver = SignalEventDriverProcesses!Loop; Loop m_loop; CoreDriver m_core; @@ -60,7 +64,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { SocketsDriver m_sockets; DNSDriver m_dns; FileDriver m_files; + PipeDriver m_pipes; WatcherDriver m_watchers; + ProcessDriver m_processes; } this() @@ -73,7 +79,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); m_dns = mallocT!DNSDriver(m_events, m_signals); m_files = mallocT!FileDriver(m_events); + m_pipes = mallocT!PipeDriver(m_loop); m_watchers = mallocT!WatcherDriver(m_events); + m_processes = mallocT!ProcessDriver(m_loop, m_pipes); } // force overriding these in the (final) sub classes to avoid virtual calls @@ -86,7 +94,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { final override @property inout(SocketsDriver) sockets() inout { return m_sockets; } final override @property inout(DNSDriver) dns() inout { return m_dns; } final override @property inout(FileDriver) files() inout { return m_files; } + final override @property inout(PipeDriver) pipes() inout { return m_pipes; } final override @property inout(WatcherDriver) watchers() inout { return m_watchers; } + final override @property inout(ProcessDriver) processes() inout { return m_processes; } final override bool dispose() { @@ -111,13 +121,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { return false; } + m_processes.dispose(); m_files.dispose(); m_dns.dispose(); m_core.dispose(); m_loop.dispose(); try () @trusted { + freeT(m_processes); freeT(m_watchers); + freeT(m_pipes); freeT(m_files); freeT(m_dns); freeT(m_core); @@ -300,7 +313,7 @@ package class PosixEventLoop { import core.time : Duration; package { - AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds; + AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot, PipeSlot) m_fds; size_t m_handleCount = 0; size_t m_waiterCount = 0; } @@ -342,11 +355,11 @@ package class PosixEventLoop { // ensure that the FD doesn't get closed before the callback gets called. with (m_fds[fd.value]) { if (callback !is null) { - if (!(common.flags & FDFlags.internal)) m_waiterCount++; + m_waiterCount++; common.refCount++; } else { common.refCount--; - if (!(common.flags & FDFlags.internal)) m_waiterCount--; + m_waiterCount--; } common.callback[evt] = callback; } diff --git a/source/eventcore/drivers/posix/pipes.d b/source/eventcore/drivers/posix/pipes.d new file mode 100644 index 0000000..55812df --- /dev/null +++ b/source/eventcore/drivers/posix/pipes.d @@ -0,0 +1,337 @@ +module eventcore.drivers.posix.pipes; +@safe: + +import eventcore.driver; +import eventcore.drivers.posix.driver; +import eventcore.internal.utils : nogc_assert, print; + +import std.algorithm : min, max; + + +final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { +@safe: /*@nogc:*/ nothrow: + import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.sys.posix.signal; + import core.sys.posix.unistd : close, read, write; + import core.sys.posix.fcntl; + import core.sys.posix.poll; + import core.sys.linux.sys.signalfd; + + private Loop m_loop; + + this(Loop loop) + @nogc { + m_loop = loop; + } + + final override PipeFD adopt(int system_fd) + { + auto fd = PipeFD(system_fd); + if (m_loop.m_fds[fd].common.refCount) // FD already in use? + return PipeFD.invalid; + + () @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK); } (); + + m_loop.initFD(fd, FDFlags.none, PipeSlot.init); + m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + return fd; + } + + final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) + { + auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); + + // Read failed + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + print("Pipe error %s!", err); + on_read_finish(pipe, IOStatus.error, 0); + return; + } + } + + // EOF + if (ret == 0 && buffer.length > 0) { + on_read_finish(pipe, IOStatus.disconnected, 0); + return; + } + + // Handle immediate mode + if (ret < 0 && mode == IOMode.immediate) { + on_read_finish(pipe, IOStatus.wouldBlock, 0); + return; + } + + // Handle successful read + if (ret >= 0) { + buffer = buffer[ret .. $]; + + // Handle completed read + if (mode != IOMode.all || buffer.length == 0) { + on_read_finish(pipe, IOStatus.ok, ret); + return; + } + } + + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.readCallback is null, "Concurrent reads are not allowed."); + slot.readCallback = on_read_finish; + slot.readMode = mode; + slot.bytesRead = max(ret, 0); + slot.readBuffer = buffer; + + // Need to use EventType.status as well, as pipes don't otherwise notify + // of closes + m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead); + m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead); + } + + private void onPipeRead(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; + + void finalize(IOStatus status) + { + addRef(pipe); + scope(exit) releaseRef(pipe); + + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + m_loop.setNotifyCallback!(EventType.status)(pipe, null); + auto cb = slot.readCallback; + slot.readCallback = null; + slot.readBuffer = null; + cb(pipe, status, slot.bytesRead); + } + + ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } (); + + // Read failed + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + print("Pipe error %s!", err); + finalize(IOStatus.error); + return; + } + } + + // EOF + if (ret == 0 && slot.readBuffer.length > 0) { + finalize(IOStatus.disconnected); + return; + } + + // Successful read + if (ret > 0 || !slot.readBuffer.length) { + slot.readBuffer = slot.readBuffer[ret .. $]; + slot.bytesRead += ret; + + // Handle completed read + if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { + finalize(IOStatus.ok); + return; + } + } + } + + final override void cancelRead(PipeFD pipe) + { + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.readCallback !is null, "Cancelling read when there is no read in progress."); + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + slot.readBuffer = null; + } + + final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) + { + if (buffer.length == 0) { + on_write_finish(pipe, IOStatus.ok, 0); + return; + } + + ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + on_write_finish(pipe, IOStatus.error, 0); + return; + } + + if (mode == IOMode.immediate) { + on_write_finish(pipe, IOStatus.wouldBlock, 0); + return; + } + } else { + buffer = buffer[ret .. $]; + + if (mode != IOMode.all || buffer.length == 0) { + on_write_finish(pipe, IOStatus.ok, ret); + return; + } + } + + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.writeCallback is null, "Concurrent writes not allowed."); + slot.writeCallback = on_write_finish; + slot.writeMode = mode; + slot.bytesWritten = max(ret, 0); + slot.writeBuffer = buffer; + + m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite); + } + + private void onPipeWrite(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; + + void finalize(IOStatus status) + { + addRef(pipe); + scope(exit) releaseRef(pipe); + + m_loop.setNotifyCallback!(EventType.write)(pipe, null); + auto cb = slot.writeCallback; + slot.writeCallback = null; + slot.writeBuffer = null; + cb(pipe, IOStatus.error, slot.bytesRead); + } + + ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + finalize(IOStatus.error); + } + } else { + slot.bytesWritten += ret; + slot.writeBuffer = slot.writeBuffer[ret .. $]; + + if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { + finalize(IOStatus.ok); + } + } + + } + + final override void cancelWrite(PipeFD pipe) + { + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress."); + m_loop.setNotifyCallback!(EventType.write)(pipe, null); + slot.writeCallback = null; + slot.writeBuffer = null; + } + + final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) + { + if (pollPipe(pipe, on_data_available)) + { + return; + } + + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + + assert(slot.readCallback is null, "Concurrent reads are not allowed."); + slot.readCallback = on_data_available; + slot.readMode = IOMode.once; // currently meaningless + slot.bytesRead = 0; // currently meaningless + slot.readBuffer = null; + m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable); + } + + private void onPipeDataAvailable(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; + + auto callback = (PipeFD f, IOStatus s, size_t m) { + addRef(f); + scope(exit) releaseRef(f); + + auto cb = slot.readCallback; + slot.readCallback = null; + slot.readBuffer = null; + cb(f, s, m); + }; + + if (pollPipe(pipe, callback)) + { + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + } + } + + private bool pollPipe(PipeFD pipe, PipeIOCallback callback) + @trusted { + // Use poll to check if any data is available + pollfd pfd; + pfd.fd = cast(int)pipe; + pfd.events = POLLIN; + int ret = poll(&pfd, 1, 0); + + if (ret == -1) { + print("Error polling pipe: %s!", errno); + callback(pipe, IOStatus.error, 0); + return true; + } + + if (ret == 1) { + callback(pipe, IOStatus.error, 0); + return true; + } + + return false; + } + + final override void close(PipeFD pipe) + { + // TODO: Maybe actually close here instead of waiting for releaseRef? + close(cast(int)pipe); + } + + final override void addRef(PipeFD pipe) + { + auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); + assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD."); + slot.common.refCount++; + } + + final override bool releaseRef(PipeFD pipe) + { + import taggedalgebraic : hasType; + auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); + nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); + + if (--slot.common.refCount == 0) { + m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!PipeSlot(pipe); + + close(cast(int)pipe); + return false; + } + return true; + } + + final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + return m_loop.rawUserDataImpl(fd, size, initialize, destroy); + } +} + +package struct PipeSlot { + alias Handle = PipeFD; + + size_t bytesRead; + ubyte[] readBuffer; + IOMode readMode; + PipeIOCallback readCallback; + + size_t bytesWritten; + const(ubyte)[] writeBuffer; + IOMode writeMode; + PipeIOCallback writeCallback; +} diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d new file mode 100644 index 0000000..e70d7fc --- /dev/null +++ b/source/eventcore/drivers/posix/processes.d @@ -0,0 +1,292 @@ +module eventcore.drivers.posix.processes; +@safe: + +import eventcore.driver; +import eventcore.drivers.posix.driver; +import eventcore.drivers.posix.signals; +import eventcore.internal.utils : nogc_assert, print; + +import std.algorithm.comparison : among; +import std.variant : visit; + +import core.stdc.errno : errno, EAGAIN, EINPROGRESS; +import core.sys.linux.sys.signalfd; +import core.sys.posix.signal; +import core.sys.posix.unistd : close, read, write, dup ; + + +private enum SIGCHLD = 17; + +final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { +@safe: /*@nogc:*/ nothrow: + + private { + static struct ProcessInfo { + bool exited = true; + int exitCode; + ProcessWaitCallback[] callbacks; + size_t refCount = 0; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + } + + Loop m_loop; + EventDriverPipes m_pipes; + ProcessInfo[ProcessID] m_processes; + SignalListenID m_sighandle; + } + + this(Loop loop, EventDriverPipes pipes) + { + m_loop = loop; + m_pipes = pipes; + + // Listen for child process exits using SIGCHLD + m_sighandle = () @trusted { + sigset_t sset; + sigemptyset(&sset); + sigaddset(&sset, SIGCHLD); + + assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); + + return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); + } (); + + m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); + m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); + m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); + + onSignal(cast(FD)m_sighandle); + } + + void dispose() + { + FD sighandle = cast(FD)m_sighandle; + m_loop.m_fds[sighandle].common.refCount--; + m_loop.setNotifyCallback!(EventType.read)(sighandle, null); + m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!(SignalSlot)(sighandle); + close(cast(int)sighandle); + } + + final override ProcessID adopt(int system_pid) + { + auto pid = cast(ProcessID)system_pid; + assert(pid !in m_processes, "Process is already adopted"); + + ProcessInfo info; + info.exited = false; + info.refCount = 1; + m_processes[pid] = info; + + return pid; + } + + final override Process spawn( + string[] args, + ProcessStdinFile stdin, + ProcessStdoutFile stdout, + ProcessStderrFile stderr, + const string[string] env, + ProcessConfig config, + string working_dir) + @trusted { + // Use std.process to spawn processes + import std.process : pipe, Pid, spawnProcess; + import std.stdio : File; + static import std.stdio; + + static File fdToFile(int fd, scope const(char)[] mode) + { + try { + File f; + f.fdopen(fd, mode); + return f; + } catch (Exception e) { + assert(0); + } + } + + try { + Process process; + File stdinFile, stdoutFile, stderrFile; + + stdinFile = stdin.visit!( + (int handle) => fdToFile(handle, "r"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdin; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdin = m_pipes.adopt(dup(p.writeEnd.fileno)); + return p.readEnd; + } + }); + + stdoutFile = stdout.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stdout; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stdout = m_pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); + + stderrFile = stderr.visit!( + (int handle) => fdToFile(handle, "w"), + (ProcessRedirect redirect) { + final switch (redirect) { + case ProcessRedirect.inherit: return std.stdio.stderr; + case ProcessRedirect.none: return File.init; + case ProcessRedirect.pipe: + auto p = pipe(); + process.stderr = m_pipes.adopt(dup(p.readEnd.fileno)); + return p.writeEnd; + } + }, + (_) => File.init); + + const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; + const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; + + if (redirectStdout) { + assert(!redirectStderr, "Can't redirect both stdout and stderr"); + + stdoutFile = stderrFile; + } else if (redirectStderr) { + stderrFile = stdoutFile; + } + + Pid stdPid = spawnProcess( + args, + stdinFile, + stdoutFile, + stderrFile, + env, + cast(std.process.Config)config, + working_dir); + process.pid = adopt(stdPid.osHandle); + stdPid.destroy(); + + return process; + } catch (Exception e) { + return Process.init; + } + } + + final override void kill(ProcessID pid, int signal) + @trusted { + .kill(cast(int)pid, signal); + } + + final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + auto info = () @trusted { return pid in m_processes; } (); + assert(info !is null, "Unknown process ID"); + + if (info.exited) { + on_process_exit(pid, info.exitCode); + return 0; + } else { + info.callbacks ~= on_process_exit; + return info.callbacks.length - 1; + } + } + + final override void cancelWait(ProcessID pid, size_t waitId) + { + auto info = () @trusted { return pid in m_processes; } (); + assert(info !is null, "Unknown process ID"); + assert(!info.exited, "Cannot cancel wait when none are pending"); + assert(info.callbacks.length > waitId, "Invalid process wait ID"); + + info.callbacks[waitId] = null; + } + + private void onSignal(FD fd) + { + SignalListenID lid = cast(SignalListenID)fd; + + signalfd_siginfo nfo; + do { + auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); + + if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) + return; + + onProcessExit(nfo.ssi_pid, nfo.ssi_status); + } while (true); + } + + private void onProcessExit(int system_pid, int exitCode) + { + auto pid = cast(ProcessID)system_pid; + auto info = () @trusted { return pid in m_processes; } (); + + // We get notified of any child exiting, so ignore the ones we're not + // aware of + if (info is null) { + return; + } + + info.exited = true; + info.exitCode = exitCode; + + foreach (cb; info.callbacks) { + if (cb) + cb(pid, exitCode); + } + info.callbacks = null; + } + + final override bool hasExited(ProcessID pid) + { + auto info = () @trusted { return pid in m_processes; } (); + assert(info !is null, "Unknown process ID"); + + return info.exited; + } + + final override void addRef(ProcessID pid) + { + auto info = () @trusted { return &m_processes[pid]; } (); + nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); + info.refCount++; + } + + final override bool releaseRef(ProcessID pid) + { + auto info = () @trusted { return &m_processes[pid]; } (); + nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); + if (--info.refCount == 0) { + // Remove/deallocate process + if (info.userDataDestructor) + () @trusted { info.userDataDestructor(info.userData.ptr); } (); + + m_processes.remove(pid); + return false; + } + return true; + } + + final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + auto info = () @trusted { return &m_processes[pid]; } (); + assert(info.userDataDestructor is null || info.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); + + if (!info.userDataDestructor) { + initialize(info.userData.ptr); + info.userDataDestructor = destroy; + } + return info.userData.ptr; + } +} diff --git a/source/eventcore/drivers/winapi/driver.d b/source/eventcore/drivers/winapi/driver.d index 47dd1f8..5e30235 100644 --- a/source/eventcore/drivers/winapi/driver.d +++ b/source/eventcore/drivers/winapi/driver.d @@ -15,6 +15,8 @@ import eventcore.drivers.winapi.core; import eventcore.drivers.winapi.dns; import eventcore.drivers.winapi.events; import eventcore.drivers.winapi.files; +import eventcore.drivers.winapi.pipes; +import eventcore.drivers.winapi.processes; import eventcore.drivers.winapi.signals; import eventcore.drivers.winapi.sockets; import eventcore.drivers.winapi.watchers; @@ -35,6 +37,8 @@ final class WinAPIEventDriver : EventDriver { WinAPIEventDriverEvents m_events; WinAPIEventDriverSignals m_signals; WinAPIEventDriverWatchers m_watchers; + WinAPIEventDriverProcesses m_processes; + WinAPIEventDriverPipes m_pipes; } static WinAPIEventDriver threadInstance; @@ -57,8 +61,10 @@ final class WinAPIEventDriver : EventDriver { m_events = mallocT!WinAPIEventDriverEvents(m_core); m_files = mallocT!WinAPIEventDriverFiles(m_core); m_sockets = mallocT!WinAPIEventDriverSockets(m_core); + m_pipes = mallocT!WinAPIEventDriverPipes(); m_dns = mallocT!WinAPIEventDriverDNS(); m_watchers = mallocT!WinAPIEventDriverWatchers(m_core); + m_processes = mallocT!WinAPIEventDriverProcesses(); } @safe: /*@nogc:*/ nothrow: @@ -73,6 +79,8 @@ final class WinAPIEventDriver : EventDriver { override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; } override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; } override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; } + override @property inout(WinAPIEventDriverProcesses) processes() inout { return m_processes; } + override @property inout(WinAPIEventDriverPipes) pipes() inout { return m_pipes; } override bool dispose() { @@ -88,8 +96,10 @@ final class WinAPIEventDriver : EventDriver { threadInstance = null; try () @trusted { + freeT(m_processes); freeT(m_watchers); freeT(m_dns); + freeT(m_pipes); freeT(m_sockets); freeT(m_files); freeT(m_events); diff --git a/source/eventcore/drivers/winapi/pipes.d b/source/eventcore/drivers/winapi/pipes.d new file mode 100644 index 0000000..ecc9226 --- /dev/null +++ b/source/eventcore/drivers/winapi/pipes.d @@ -0,0 +1,59 @@ +module eventcore.drivers.winapi.pipes; + +version (Windows): + +import eventcore.driver; +import eventcore.internal.win32; + +final class WinAPIEventDriverPipes : EventDriverPipes { +@safe: /*@nogc:*/ nothrow: + override PipeFD adopt(int system_pipe_handle) + { + assert(false, "TODO!"); + } + + override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) + { + assert(false, "TODO!"); + } + + override void cancelRead(PipeFD pipe) + { + assert(false, "TODO!"); + } + + override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) + { + assert(false, "TODO!"); + } + + override void cancelWrite(PipeFD pipe) + { + assert(false, "TODO!"); + } + + override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) + { + assert(false, "TODO!"); + } + + override void close(PipeFD pipe) + { + assert(false, "TODO!"); + } + + override void addRef(PipeID pid) + { + assert(false, "TODO!"); + } + + override bool releaseRef(PipeID pid) + { + assert(false, "TODO!"); + } + + protected override void* rawUserData(PipeID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } +} diff --git a/source/eventcore/drivers/winapi/processes.d b/source/eventcore/drivers/winapi/processes.d new file mode 100644 index 0000000..19816b0 --- /dev/null +++ b/source/eventcore/drivers/winapi/processes.d @@ -0,0 +1,54 @@ +module eventcore.drivers.winapi.processes; + +version (Windows): + +import eventcore.driver; +import eventcore.internal.win32; + +final class WinAPIEventDriverProcesses : EventDriverProcesses { +@safe: /*@nogc:*/ nothrow: + override ProcessID adopt(int system_pid) + { + assert(false, "TODO!"); + } + + override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) + { + assert(false, "TODO!"); + } + + override bool hasExited(ProcessID pid) + { + assert(false, "TODO!"); + } + + override void kill(ProcessID pid, int signal) + { + assert(false, "TODO!"); + } + + override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + assert(false, "TODO!"); + } + + override void cancelWait(ProcessID pid, size_t waitId) + { + assert(false, "TODO!"); + } + + override void addRef(ProcessID pid) + { + assert(false, "TODO!"); + } + + override bool releaseRef(ProcessID pid) + { + assert(false, "TODO!"); + } + + protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } +}