Add APIs for working with Subprocesses and Pipes with an implementation for Posix

This commit is contained in:
Benjamin Schaaf 2019-01-15 19:58:01 +11:00
parent 36f43690c6
commit 7d091ed504
7 changed files with 955 additions and 5 deletions

View file

@ -19,8 +19,10 @@ module eventcore.driver;
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.time : Duration; import core.time : Duration;
import std.process : StdProcessConfig = Config;
import std.socket : Address; import std.socket : Address;
import std.stdint : intptr_t; import std.stdint : intptr_t;
import std.variant : Algebraic;
/** Encapsulates a full event driver. /** Encapsulates a full event driver.
@ -51,6 +53,10 @@ interface EventDriver {
@property inout(EventDriverFiles) files() inout; @property inout(EventDriverFiles) files() inout;
/// Directory change watching /// Directory change watching
@property inout(EventDriverWatchers) watchers() inout; @property inout(EventDriverWatchers) watchers() inout;
/// Sub-process handling
@property inout(EventDriverProcesses) processes() inout;
/// Pipes
@property inout(EventDriverPipes) pipes() inout;
/** Releases all resources associated with the driver. /** Releases all resources associated with the driver.
@ -646,6 +652,154 @@ interface EventDriverWatchers {
protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
} }
interface EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
/** Adopt an existing process.
*/
ProcessID adopt(int system_pid);
/** Spawn a child process.
Note that if a default signal handler exists for the signal, it will be
disabled by using this function.
Params:
args = The program arguments. First one must be an executable.
stdin = What should be done for stdin. Allows inheritance, piping,
nothing or any specific fd. If this results in a Pipe,
the PipeFD will be set in the stdin result.
stdout = See stdin, but also allows redirecting to stderr.
stderr = See stdin, but also allows redirecting to stdout.
env = The environment variables to spawn the process with.
config = Special process configurations.
working_dir = What to set the working dir in the process.
Returns:
Returns a Process struct containing the ProcessID and whatever
pipes have been adopted for stdin, stdout and stderr.
*/
Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env = null, ProcessConfig config = ProcessConfig.none, string working_dir = null);
/** Returns whether the process has exited yet.
*/
bool hasExited(ProcessID pid);
/** Kill the process using the given signal. Has different effects on different platforms.
*/
void kill(ProcessID pid, int signal);
/** Wait for the process to exit. Returns an identifier that can be used to cancel the wait.
*/
size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit);
/** Cancel a wait for the given identifier returned by wait.
*/
void cancelWait(ProcessID pid, size_t waitId);
/** Increments the reference count of the given resource.
*/
void addRef(ProcessID pid);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated. This will not kill
the sub-process, nor "detach" it.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(ProcessID pid);
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T)(ProcessID descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// Low-level user data access. Use `userData` instead.
protected void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
}
interface EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
/** Adopt an existing pipe. This will modify the pipe to be non-blocking.
Note that pipes generally only allow either reads or writes but not
both, it is up to you to only call valid functions.
*/
PipeFD adopt(int system_pipe_handle);
/** Reads data from a stream socket.
Note that only a single read operation is allowed at once. The caller
needs to make sure that either `on_read_finish` got called, or
`cancelRead` was called before issuing the next call to `read`.
*/
void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish);
/** Cancels an ongoing read operation.
After this function has been called, the `PipeIOCallback` specified in
the call to `read` is guaranteed to not be called.
*/
void cancelRead(PipeFD pipe);
/** Writes data from a stream socket.
Note that only a single write operation is allowed at once. The caller
needs to make sure that either `on_write_finish` got called, or
`cancelWrite` was called before issuing the next call to `write`.
*/
void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish);
/** Cancels an ongoing write operation.
After this function has been called, the `PipeIOCallback` specified in
the call to `write` is guaranteed to not be called.
*/
void cancelWrite(PipeFD pipe);
/** Waits for incoming data without actually reading it.
*/
void waitForData(PipeFD pipe, PipeIOCallback on_data_available);
/** Immediately close the pipe. Future read or write operations may fail.
*/
void close(PipeFD pipe);
/** Increments the reference count of the given resource.
*/
void addRef(PipeFD pid);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(PipeFD pid);
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T)(PipeFD descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// Low-level user data access. Use `userData` instead.
protected void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
}
// Helper class to enable fully stack allocated `std.socket.Address` instances. // Helper class to enable fully stack allocated `std.socket.Address` instances.
final class RefAddress : Address { final class RefAddress : Address {
@ -680,13 +834,22 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
alias TimerCallback2 = void delegate(TimerID, bool fired); alias TimerCallback2 = void delegate(TimerID, bool fired);
alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change); alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change);
alias ProcessWaitCallback = void delegate(ProcessID, int);
@system alias DataInitializer = void function(void*) @nogc; @system alias DataInitializer = void function(void*) @nogc;
enum ProcessRedirect { inherit, pipe, none }
alias ProcessStdinFile = Algebraic!(int, ProcessRedirect);
enum ProcessStdoutRedirect { toStderr }
alias ProcessStdoutFile = Algebraic!(int, ProcessRedirect, ProcessStdoutRedirect);
enum ProcessStderrRedirect { toStdout }
alias ProcessStderrFile = Algebraic!(int, ProcessRedirect, ProcessStderrRedirect);
enum ExitReason { enum ExitReason {
timeout, timeout,
idle, idle,
@ -775,6 +938,13 @@ enum SignalStatus {
error error
} }
/// See std.process.Config
enum ProcessConfig {
none = StdProcessConfig.none,
detached = StdProcessConfig.detached,
newEnv = StdProcessConfig.newEnv,
suppressConsole = StdProcessConfig.suppressConsole,
}
/** Describes a single change in a watched directory. /** Describes a single change in a watched directory.
*/ */
@ -799,6 +969,17 @@ struct FileChange {
bool isDirectory; bool isDirectory;
} }
/** Describes a spawned process
*/
struct Process {
ProcessID pid;
// TODO: Convert these to PipeFD once dmd is fixed
PipeFD stdin;
PipeFD stdout;
PipeFD stderr;
}
mixin template Handle(string NAME, T, T invalid_value = T.init) { mixin template Handle(string NAME, T, T invalid_value = T.init) {
static if (is(T.BaseType)) alias BaseType = T.BaseType; static if (is(T.BaseType)) alias BaseType = T.BaseType;
else alias BaseType = T; else alias BaseType = T;
@ -813,13 +994,14 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) {
this(BaseType value) { this.value = T(value); } this(BaseType value) { this.value = T(value); }
U opCast(U : Handle!(V, M), V, int M)() { U opCast(U : Handle!(V, M), V, int M)()
const {
// TODO: verify that U derives from typeof(this)! // TODO: verify that U derives from typeof(this)!
return U(value); return U(value);
} }
U opCast(U : BaseType)() U opCast(U : BaseType)()
{ const {
return cast(U)value; return cast(U)value;
} }
@ -834,9 +1016,12 @@ struct StreamSocketFD { mixin Handle!("streamSocket", SocketFD); }
struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); } struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); }
struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); } struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); }
struct FileFD { mixin Handle!("file", FD); } struct FileFD { mixin Handle!("file", FD); }
// FD.init is required here due to https://issues.dlang.org/show_bug.cgi?id=19585
struct PipeFD { mixin Handle!("pipe", FD, FD.init); }
struct EventID { mixin Handle!("event", FD); } struct EventID { mixin Handle!("event", FD); }
struct TimerID { mixin Handle!("timer", size_t, size_t.max); } struct TimerID { mixin Handle!("timer", size_t, size_t.max); }
struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); } struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); }
struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); } struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); }
struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); } struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); }
struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); } struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); }
struct ProcessID { mixin Handle!("process", size_t, size_t.max); }

View file

@ -12,6 +12,8 @@ import eventcore.drivers.posix.events;
import eventcore.drivers.posix.signals; import eventcore.drivers.posix.signals;
import eventcore.drivers.posix.sockets; import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers; import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.timer; import eventcore.drivers.timer;
import eventcore.drivers.threadedfile; import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.consumablequeue : ConsumableQueue;
@ -48,9 +50,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver; alias FileDriver = ThreadedFileEventDriver!EventsDriver;
alias PipeDriver = PosixEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
//else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
alias ProcessDriver = SignalEventDriverProcesses!Loop;
Loop m_loop; Loop m_loop;
CoreDriver m_core; CoreDriver m_core;
@ -60,7 +64,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
SocketsDriver m_sockets; SocketsDriver m_sockets;
DNSDriver m_dns; DNSDriver m_dns;
FileDriver m_files; FileDriver m_files;
PipeDriver m_pipes;
WatcherDriver m_watchers; WatcherDriver m_watchers;
ProcessDriver m_processes;
} }
this() this()
@ -73,7 +79,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); m_core = mallocT!CoreDriver(m_loop, m_timers, m_events);
m_dns = mallocT!DNSDriver(m_events, m_signals); m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events); m_files = mallocT!FileDriver(m_events);
m_pipes = mallocT!PipeDriver(m_loop);
m_watchers = mallocT!WatcherDriver(m_events); m_watchers = mallocT!WatcherDriver(m_events);
m_processes = mallocT!ProcessDriver(m_loop, m_pipes);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls
@ -86,7 +94,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
final override @property inout(SocketsDriver) sockets() inout { return m_sockets; } final override @property inout(SocketsDriver) sockets() inout { return m_sockets; }
final override @property inout(DNSDriver) dns() inout { return m_dns; } final override @property inout(DNSDriver) dns() inout { return m_dns; }
final override @property inout(FileDriver) files() inout { return m_files; } final override @property inout(FileDriver) files() inout { return m_files; }
final override @property inout(PipeDriver) pipes() inout { return m_pipes; }
final override @property inout(WatcherDriver) watchers() inout { return m_watchers; } final override @property inout(WatcherDriver) watchers() inout { return m_watchers; }
final override @property inout(ProcessDriver) processes() inout { return m_processes; }
final override bool dispose() final override bool dispose()
{ {
@ -111,13 +121,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
return false; return false;
} }
m_processes.dispose();
m_files.dispose(); m_files.dispose();
m_dns.dispose(); m_dns.dispose();
m_core.dispose(); m_core.dispose();
m_loop.dispose(); m_loop.dispose();
try () @trusted { try () @trusted {
freeT(m_processes);
freeT(m_watchers); freeT(m_watchers);
freeT(m_pipes);
freeT(m_files); freeT(m_files);
freeT(m_dns); freeT(m_dns);
freeT(m_core); freeT(m_core);
@ -300,7 +313,7 @@ package class PosixEventLoop {
import core.time : Duration; import core.time : Duration;
package { package {
AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds; AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot, PipeSlot) m_fds;
size_t m_handleCount = 0; size_t m_handleCount = 0;
size_t m_waiterCount = 0; size_t m_waiterCount = 0;
} }
@ -342,11 +355,11 @@ package class PosixEventLoop {
// ensure that the FD doesn't get closed before the callback gets called. // ensure that the FD doesn't get closed before the callback gets called.
with (m_fds[fd.value]) { with (m_fds[fd.value]) {
if (callback !is null) { if (callback !is null) {
if (!(common.flags & FDFlags.internal)) m_waiterCount++; m_waiterCount++;
common.refCount++; common.refCount++;
} else { } else {
common.refCount--; common.refCount--;
if (!(common.flags & FDFlags.internal)) m_waiterCount--; m_waiterCount--;
} }
common.callback[evt] = callback; common.callback[evt] = callback;
} }

View file

@ -0,0 +1,337 @@
module eventcore.drivers.posix.pipes;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.internal.utils : nogc_assert, print;
import std.algorithm : min, max;
final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.signal;
import core.sys.posix.unistd : close, read, write;
import core.sys.posix.fcntl;
import core.sys.posix.poll;
import core.sys.linux.sys.signalfd;
private Loop m_loop;
this(Loop loop)
@nogc {
m_loop = loop;
}
final override PipeFD adopt(int system_fd)
{
auto fd = PipeFD(system_fd);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return PipeFD.invalid;
() @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK); } ();
m_loop.initFD(fd, FDFlags.none, PipeSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
return fd;
}
final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
{
auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
// Read failed
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
print("Pipe error %s!", err);
on_read_finish(pipe, IOStatus.error, 0);
return;
}
}
// EOF
if (ret == 0 && buffer.length > 0) {
on_read_finish(pipe, IOStatus.disconnected, 0);
return;
}
// Handle immediate mode
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(pipe, IOStatus.wouldBlock, 0);
return;
}
// Handle successful read
if (ret >= 0) {
buffer = buffer[ret .. $];
// Handle completed read
if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(pipe, IOStatus.ok, ret);
return;
}
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
slot.readCallback = on_read_finish;
slot.readMode = mode;
slot.bytesRead = max(ret, 0);
slot.readBuffer = buffer;
// Need to use EventType.status as well, as pipes don't otherwise notify
// of closes
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead);
m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead);
}
private void onPipeRead(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
void finalize(IOStatus status)
{
addRef(pipe);
scope(exit) releaseRef(pipe);
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
m_loop.setNotifyCallback!(EventType.status)(pipe, null);
auto cb = slot.readCallback;
slot.readCallback = null;
slot.readBuffer = null;
cb(pipe, status, slot.bytesRead);
}
ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } ();
// Read failed
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
print("Pipe error %s!", err);
finalize(IOStatus.error);
return;
}
}
// EOF
if (ret == 0 && slot.readBuffer.length > 0) {
finalize(IOStatus.disconnected);
return;
}
// Successful read
if (ret > 0 || !slot.readBuffer.length) {
slot.readBuffer = slot.readBuffer[ret .. $];
slot.bytesRead += ret;
// Handle completed read
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
finalize(IOStatus.ok);
return;
}
}
}
final override void cancelRead(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
slot.readBuffer = null;
}
final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
{
if (buffer.length == 0) {
on_write_finish(pipe, IOStatus.ok, 0);
return;
}
ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
on_write_finish(pipe, IOStatus.error, 0);
return;
}
if (mode == IOMode.immediate) {
on_write_finish(pipe, IOStatus.wouldBlock, 0);
return;
}
} else {
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_write_finish(pipe, IOStatus.ok, ret);
return;
}
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.writeCallback is null, "Concurrent writes not allowed.");
slot.writeCallback = on_write_finish;
slot.writeMode = mode;
slot.bytesWritten = max(ret, 0);
slot.writeBuffer = buffer;
m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite);
}
private void onPipeWrite(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
void finalize(IOStatus status)
{
addRef(pipe);
scope(exit) releaseRef(pipe);
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
auto cb = slot.writeCallback;
slot.writeCallback = null;
slot.writeBuffer = null;
cb(pipe, IOStatus.error, slot.bytesRead);
}
ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
finalize(IOStatus.error);
}
} else {
slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
finalize(IOStatus.ok);
}
}
}
final override void cancelWrite(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
slot.writeCallback = null;
slot.writeBuffer = null;
}
final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
{
if (pollPipe(pipe, on_data_available))
{
return;
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
slot.readCallback = on_data_available;
slot.readMode = IOMode.once; // currently meaningless
slot.bytesRead = 0; // currently meaningless
slot.readBuffer = null;
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable);
}
private void onPipeDataAvailable(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
auto callback = (PipeFD f, IOStatus s, size_t m) {
addRef(f);
scope(exit) releaseRef(f);
auto cb = slot.readCallback;
slot.readCallback = null;
slot.readBuffer = null;
cb(f, s, m);
};
if (pollPipe(pipe, callback))
{
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
}
}
private bool pollPipe(PipeFD pipe, PipeIOCallback callback)
@trusted {
// Use poll to check if any data is available
pollfd pfd;
pfd.fd = cast(int)pipe;
pfd.events = POLLIN;
int ret = poll(&pfd, 1, 0);
if (ret == -1) {
print("Error polling pipe: %s!", errno);
callback(pipe, IOStatus.error, 0);
return true;
}
if (ret == 1) {
callback(pipe, IOStatus.error, 0);
return true;
}
return false;
}
final override void close(PipeFD pipe)
{
// TODO: Maybe actually close here instead of waiting for releaseRef?
close(cast(int)pipe);
}
final override void addRef(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD.");
slot.common.refCount++;
}
final override bool releaseRef(PipeFD pipe)
{
import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
if (--slot.common.refCount == 0) {
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!PipeSlot(pipe);
close(cast(int)pipe);
return false;
}
return true;
}
final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
}
}
package struct PipeSlot {
alias Handle = PipeFD;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
PipeIOCallback readCallback;
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
PipeIOCallback writeCallback;
}

View file

@ -0,0 +1,292 @@
module eventcore.drivers.posix.processes;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.drivers.posix.signals;
import eventcore.internal.utils : nogc_assert, print;
import std.algorithm.comparison : among;
import std.variant : visit;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.linux.sys.signalfd;
import core.sys.posix.signal;
import core.sys.posix.unistd : close, read, write, dup ;
private enum SIGCHLD = 17;
final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
private {
static struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
Loop m_loop;
EventDriverPipes m_pipes;
ProcessInfo[ProcessID] m_processes;
SignalListenID m_sighandle;
}
this(Loop loop, EventDriverPipes pipes)
{
m_loop = loop;
m_pipes = pipes;
// Listen for child process exits using SIGCHLD
m_sighandle = () @trusted {
sigset_t sset;
sigemptyset(&sset);
sigaddset(&sset, SIGCHLD);
assert(sigprocmask(SIG_BLOCK, &sset, null) == 0);
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC));
} ();
m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null));
m_loop.registerFD(cast(FD)m_sighandle, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal);
onSignal(cast(FD)m_sighandle);
}
void dispose()
{
FD sighandle = cast(FD)m_sighandle;
m_loop.m_fds[sighandle].common.refCount--;
m_loop.setNotifyCallback!(EventType.read)(sighandle, null);
m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!(SignalSlot)(sighandle);
close(cast(int)sighandle);
}
final override ProcessID adopt(int system_pid)
{
auto pid = cast(ProcessID)system_pid;
assert(pid !in m_processes, "Process is already adopted");
ProcessInfo info;
info.exited = false;
info.refCount = 1;
m_processes[pid] = info;
return pid;
}
final override Process spawn(
string[] args,
ProcessStdinFile stdin,
ProcessStdoutFile stdout,
ProcessStderrFile stderr,
const string[string] env,
ProcessConfig config,
string working_dir)
@trusted {
// Use std.process to spawn processes
import std.process : pipe, Pid, spawnProcess;
import std.stdio : File;
static import std.stdio;
static File fdToFile(int fd, scope const(char)[] mode)
{
try {
File f;
f.fdopen(fd, mode);
return f;
} catch (Exception e) {
assert(0);
}
}
try {
Process process;
File stdinFile, stdoutFile, stderrFile;
stdinFile = stdin.visit!(
(int handle) => fdToFile(handle, "r"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdin;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdin = m_pipes.adopt(dup(p.writeEnd.fileno));
return p.readEnd;
}
});
stdoutFile = stdout.visit!(
(int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdout;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdout = m_pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
stderrFile = stderr.visit!(
(int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stderr;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stderr = m_pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect;
const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
if (redirectStdout) {
assert(!redirectStderr, "Can't redirect both stdout and stderr");
stdoutFile = stderrFile;
} else if (redirectStderr) {
stderrFile = stdoutFile;
}
Pid stdPid = spawnProcess(
args,
stdinFile,
stdoutFile,
stderrFile,
env,
cast(std.process.Config)config,
working_dir);
process.pid = adopt(stdPid.osHandle);
stdPid.destroy();
return process;
} catch (Exception e) {
return Process.init;
}
}
final override void kill(ProcessID pid, int signal)
@trusted {
.kill(cast(int)pid, signal);
}
final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
if (info.exited) {
on_process_exit(pid, info.exitCode);
return 0;
} else {
info.callbacks ~= on_process_exit;
return info.callbacks.length - 1;
}
}
final override void cancelWait(ProcessID pid, size_t waitId)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > waitId, "Invalid process wait ID");
info.callbacks[waitId] = null;
}
private void onSignal(FD fd)
{
SignalListenID lid = cast(SignalListenID)fd;
signalfd_siginfo nfo;
do {
auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof)
return;
onProcessExit(nfo.ssi_pid, nfo.ssi_status);
} while (true);
}
private void onProcessExit(int system_pid, int exitCode)
{
auto pid = cast(ProcessID)system_pid;
auto info = () @trusted { return pid in m_processes; } ();
// We get notified of any child exiting, so ignore the ones we're not
// aware of
if (info is null) {
return;
}
info.exited = true;
info.exitCode = exitCode;
foreach (cb; info.callbacks) {
if (cb)
cb(pid, exitCode);
}
info.callbacks = null;
}
final override bool hasExited(ProcessID pid)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
return info.exited;
}
final override void addRef(ProcessID pid)
{
auto info = () @trusted { return &m_processes[pid]; } ();
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++;
}
final override bool releaseRef(ProcessID pid)
{
auto info = () @trusted { return &m_processes[pid]; } ();
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) {
// Remove/deallocate process
if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } ();
m_processes.remove(pid);
return false;
}
return true;
}
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
auto info = () @trusted { return &m_processes[pid]; } ();
assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
if (!info.userDataDestructor) {
initialize(info.userData.ptr);
info.userDataDestructor = destroy;
}
return info.userData.ptr;
}
}

View file

@ -15,6 +15,8 @@ import eventcore.drivers.winapi.core;
import eventcore.drivers.winapi.dns; import eventcore.drivers.winapi.dns;
import eventcore.drivers.winapi.events; import eventcore.drivers.winapi.events;
import eventcore.drivers.winapi.files; import eventcore.drivers.winapi.files;
import eventcore.drivers.winapi.pipes;
import eventcore.drivers.winapi.processes;
import eventcore.drivers.winapi.signals; import eventcore.drivers.winapi.signals;
import eventcore.drivers.winapi.sockets; import eventcore.drivers.winapi.sockets;
import eventcore.drivers.winapi.watchers; import eventcore.drivers.winapi.watchers;
@ -35,6 +37,8 @@ final class WinAPIEventDriver : EventDriver {
WinAPIEventDriverEvents m_events; WinAPIEventDriverEvents m_events;
WinAPIEventDriverSignals m_signals; WinAPIEventDriverSignals m_signals;
WinAPIEventDriverWatchers m_watchers; WinAPIEventDriverWatchers m_watchers;
WinAPIEventDriverProcesses m_processes;
WinAPIEventDriverPipes m_pipes;
} }
static WinAPIEventDriver threadInstance; static WinAPIEventDriver threadInstance;
@ -57,8 +61,10 @@ final class WinAPIEventDriver : EventDriver {
m_events = mallocT!WinAPIEventDriverEvents(m_core); m_events = mallocT!WinAPIEventDriverEvents(m_core);
m_files = mallocT!WinAPIEventDriverFiles(m_core); m_files = mallocT!WinAPIEventDriverFiles(m_core);
m_sockets = mallocT!WinAPIEventDriverSockets(m_core); m_sockets = mallocT!WinAPIEventDriverSockets(m_core);
m_pipes = mallocT!WinAPIEventDriverPipes();
m_dns = mallocT!WinAPIEventDriverDNS(); m_dns = mallocT!WinAPIEventDriverDNS();
m_watchers = mallocT!WinAPIEventDriverWatchers(m_core); m_watchers = mallocT!WinAPIEventDriverWatchers(m_core);
m_processes = mallocT!WinAPIEventDriverProcesses();
} }
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
@ -73,6 +79,8 @@ final class WinAPIEventDriver : EventDriver {
override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; } override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; }
override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; } override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; }
override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; } override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; }
override @property inout(WinAPIEventDriverProcesses) processes() inout { return m_processes; }
override @property inout(WinAPIEventDriverPipes) pipes() inout { return m_pipes; }
override bool dispose() override bool dispose()
{ {
@ -88,8 +96,10 @@ final class WinAPIEventDriver : EventDriver {
threadInstance = null; threadInstance = null;
try () @trusted { try () @trusted {
freeT(m_processes);
freeT(m_watchers); freeT(m_watchers);
freeT(m_dns); freeT(m_dns);
freeT(m_pipes);
freeT(m_sockets); freeT(m_sockets);
freeT(m_files); freeT(m_files);
freeT(m_events); freeT(m_events);

View file

@ -0,0 +1,59 @@
module eventcore.drivers.winapi.pipes;
version (Windows):
import eventcore.driver;
import eventcore.internal.win32;
final class WinAPIEventDriverPipes : EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
override PipeFD adopt(int system_pipe_handle)
{
assert(false, "TODO!");
}
override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
{
assert(false, "TODO!");
}
override void cancelRead(PipeFD pipe)
{
assert(false, "TODO!");
}
override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
{
assert(false, "TODO!");
}
override void cancelWrite(PipeFD pipe)
{
assert(false, "TODO!");
}
override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
{
assert(false, "TODO!");
}
override void close(PipeFD pipe)
{
assert(false, "TODO!");
}
override void addRef(PipeID pid)
{
assert(false, "TODO!");
}
override bool releaseRef(PipeID pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(PipeID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
}

View file

@ -0,0 +1,54 @@
module eventcore.drivers.winapi.processes;
version (Windows):
import eventcore.driver;
import eventcore.internal.win32;
final class WinAPIEventDriverProcesses : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
override ProcessID adopt(int system_pid)
{
assert(false, "TODO!");
}
override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
{
assert(false, "TODO!");
}
override bool hasExited(ProcessID pid)
{
assert(false, "TODO!");
}
override void kill(ProcessID pid, int signal)
{
assert(false, "TODO!");
}
override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{
assert(false, "TODO!");
}
override void cancelWait(ProcessID pid, size_t waitId)
{
assert(false, "TODO!");
}
override void addRef(ProcessID pid)
{
assert(false, "TODO!");
}
override bool releaseRef(ProcessID pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
}