Merge pull request #100 from BenjaminSchaaf/process

Add APIs for working with Subprocesses and Pipes
merged-on-behalf-of: Sönke Ludwig <s-ludwig@users.noreply.github.com>
This commit is contained in:
The Dlang Bot 2019-04-01 14:48:41 +02:00 committed by GitHub
commit b11936e4bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 1077 additions and 8 deletions

View file

@ -57,6 +57,8 @@ Unix Signals | yes² | yes | &mdash; | &mdash; | &mdash;
Files | yes | yes | yes | yes | &mdash; Files | yes | yes | yes | yes | &mdash;
UI Integration | yes¹ | yes¹ | yes | yes¹ | &mdash; UI Integration | yes¹ | yes¹ | yes | yes¹ | &mdash;
File watcher | yes² | yes | yes | yes² | &mdash; File watcher | yes² | yes | yes | yes² | &mdash;
Pipes | yes | yes | &mdash; | yes | &mdash;
Processes | yes | yes | &mdash; | yes | &mdash;
¹ Manually, by adopting the X11 display connection socket ¹ Manually, by adopting the X11 display connection socket

View file

@ -19,8 +19,10 @@ module eventcore.driver;
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.time : Duration; import core.time : Duration;
import std.process : StdProcessConfig = Config;
import std.socket : Address; import std.socket : Address;
import std.stdint : intptr_t; import std.stdint : intptr_t;
import std.variant : Algebraic;
/** Encapsulates a full event driver. /** Encapsulates a full event driver.
@ -51,6 +53,10 @@ interface EventDriver {
@property inout(EventDriverFiles) files() inout; @property inout(EventDriverFiles) files() inout;
/// Directory change watching /// Directory change watching
@property inout(EventDriverWatchers) watchers() inout; @property inout(EventDriverWatchers) watchers() inout;
/// Sub-process handling
@property inout(EventDriverProcesses) processes() inout;
/// Pipes
@property inout(EventDriverPipes) pipes() inout;
/** Releases all resources associated with the driver. /** Releases all resources associated with the driver.
@ -646,6 +652,154 @@ interface EventDriverWatchers {
protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
} }
interface EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
/** Adopt an existing process.
*/
ProcessID adopt(int system_pid);
/** Spawn a child process.
Note that if a default signal handler exists for the signal, it will be
disabled by using this function.
Params:
args = The program arguments. First one must be an executable.
stdin = What should be done for stdin. Allows inheritance, piping,
nothing or any specific fd. If this results in a Pipe,
the PipeFD will be set in the stdin result.
stdout = See stdin, but also allows redirecting to stderr.
stderr = See stdin, but also allows redirecting to stdout.
env = The environment variables to spawn the process with.
config = Special process configurations.
working_dir = What to set the working dir in the process.
Returns:
Returns a Process struct containing the ProcessID and whatever
pipes have been adopted for stdin, stdout and stderr.
*/
Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env = null, ProcessConfig config = ProcessConfig.none, string working_dir = null);
/** Returns whether the process has exited yet.
*/
bool hasExited(ProcessID pid);
/** Kill the process using the given signal. Has different effects on different platforms.
*/
void kill(ProcessID pid, int signal);
/** Wait for the process to exit. Returns an identifier that can be used to cancel the wait.
*/
size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit);
/** Cancel a wait for the given identifier returned by wait.
*/
void cancelWait(ProcessID pid, size_t waitId);
/** Increments the reference count of the given resource.
*/
void addRef(ProcessID pid);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated. This will not kill
the sub-process, nor "detach" it.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(ProcessID pid);
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T)(ProcessID descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// Low-level user data access. Use `userData` instead.
protected void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
}
interface EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
/** Adopt an existing pipe. This will modify the pipe to be non-blocking.
Note that pipes generally only allow either reads or writes but not
both, it is up to you to only call valid functions.
*/
PipeFD adopt(int system_pipe_handle);
/** Reads data from a stream socket.
Note that only a single read operation is allowed at once. The caller
needs to make sure that either `on_read_finish` got called, or
`cancelRead` was called before issuing the next call to `read`.
*/
void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish);
/** Cancels an ongoing read operation.
After this function has been called, the `PipeIOCallback` specified in
the call to `read` is guaranteed to not be called.
*/
void cancelRead(PipeFD pipe);
/** Writes data from a stream socket.
Note that only a single write operation is allowed at once. The caller
needs to make sure that either `on_write_finish` got called, or
`cancelWrite` was called before issuing the next call to `write`.
*/
void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish);
/** Cancels an ongoing write operation.
After this function has been called, the `PipeIOCallback` specified in
the call to `write` is guaranteed to not be called.
*/
void cancelWrite(PipeFD pipe);
/** Waits for incoming data without actually reading it.
*/
void waitForData(PipeFD pipe, PipeIOCallback on_data_available);
/** Immediately close the pipe. Future read or write operations may fail.
*/
void close(PipeFD pipe);
/** Increments the reference count of the given resource.
*/
void addRef(PipeFD pid);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(PipeFD pid);
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T)(PipeFD descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// Low-level user data access. Use `userData` instead.
protected void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
}
// Helper class to enable fully stack allocated `std.socket.Address` instances. // Helper class to enable fully stack allocated `std.socket.Address` instances.
final class RefAddress : Address { final class RefAddress : Address {
@ -680,13 +834,22 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
alias TimerCallback2 = void delegate(TimerID, bool fired); alias TimerCallback2 = void delegate(TimerID, bool fired);
alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change); alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change);
alias ProcessWaitCallback = void delegate(ProcessID, int);
@system alias DataInitializer = void function(void*) @nogc; @system alias DataInitializer = void function(void*) @nogc;
enum ProcessRedirect { inherit, pipe, none }
alias ProcessStdinFile = Algebraic!(int, ProcessRedirect);
enum ProcessStdoutRedirect { toStderr }
alias ProcessStdoutFile = Algebraic!(int, ProcessRedirect, ProcessStdoutRedirect);
enum ProcessStderrRedirect { toStdout }
alias ProcessStderrFile = Algebraic!(int, ProcessRedirect, ProcessStderrRedirect);
enum ExitReason { enum ExitReason {
timeout, timeout,
idle, idle,
@ -775,6 +938,13 @@ enum SignalStatus {
error error
} }
/// See std.process.Config
enum ProcessConfig {
none = StdProcessConfig.none,
detached = StdProcessConfig.detached,
newEnv = StdProcessConfig.newEnv,
suppressConsole = StdProcessConfig.suppressConsole,
}
/** Describes a single change in a watched directory. /** Describes a single change in a watched directory.
*/ */
@ -799,6 +969,17 @@ struct FileChange {
bool isDirectory; bool isDirectory;
} }
/** Describes a spawned process
*/
struct Process {
ProcessID pid;
// TODO: Convert these to PipeFD once dmd is fixed
PipeFD stdin;
PipeFD stdout;
PipeFD stderr;
}
mixin template Handle(string NAME, T, T invalid_value = T.init) { mixin template Handle(string NAME, T, T invalid_value = T.init) {
static if (is(T.BaseType)) alias BaseType = T.BaseType; static if (is(T.BaseType)) alias BaseType = T.BaseType;
else alias BaseType = T; else alias BaseType = T;
@ -813,13 +994,14 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) {
this(BaseType value) { this.value = T(value); } this(BaseType value) { this.value = T(value); }
U opCast(U : Handle!(V, M), V, int M)() { U opCast(U : Handle!(V, M), V, int M)()
const {
// TODO: verify that U derives from typeof(this)! // TODO: verify that U derives from typeof(this)!
return U(value); return U(value);
} }
U opCast(U : BaseType)() U opCast(U : BaseType)()
{ const {
return cast(U)value; return cast(U)value;
} }
@ -834,9 +1016,12 @@ struct StreamSocketFD { mixin Handle!("streamSocket", SocketFD); }
struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); } struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); }
struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); } struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); }
struct FileFD { mixin Handle!("file", FD); } struct FileFD { mixin Handle!("file", FD); }
// FD.init is required here due to https://issues.dlang.org/show_bug.cgi?id=19585
struct PipeFD { mixin Handle!("pipe", FD, FD.init); }
struct EventID { mixin Handle!("event", FD); } struct EventID { mixin Handle!("event", FD); }
struct TimerID { mixin Handle!("timer", size_t, size_t.max); } struct TimerID { mixin Handle!("timer", size_t, size_t.max); }
struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); } struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); }
struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); } struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); }
struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); } struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); }
struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); } struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); }
struct ProcessID { mixin Handle!("process", size_t, size_t.max); }

View file

@ -12,6 +12,8 @@ import eventcore.drivers.posix.events;
import eventcore.drivers.posix.signals; import eventcore.drivers.posix.signals;
import eventcore.drivers.posix.sockets; import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers; import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.timer; import eventcore.drivers.timer;
import eventcore.drivers.threadedfile; import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.consumablequeue : ConsumableQueue;
@ -38,7 +40,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
private { private {
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver); alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver);
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver); alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop; version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop; else alias SignalsDriver = DummyEventDriverSignals!Loop;
@ -48,9 +50,13 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver; alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
else alias PipeDriver = DummyEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
//else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
version (linux) alias ProcessDriver = SignalEventDriverProcesses!Loop;
else alias ProcessDriver = DummyEventDriverProcesses!Loop;
Loop m_loop; Loop m_loop;
CoreDriver m_core; CoreDriver m_core;
@ -60,7 +66,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
SocketsDriver m_sockets; SocketsDriver m_sockets;
DNSDriver m_dns; DNSDriver m_dns;
FileDriver m_files; FileDriver m_files;
PipeDriver m_pipes;
WatcherDriver m_watchers; WatcherDriver m_watchers;
ProcessDriver m_processes;
} }
this() this()
@ -70,7 +78,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_events = mallocT!EventsDriver(m_loop, m_sockets); m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop); m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver; m_timers = mallocT!TimerDriver;
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, m_pipes);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_dns = mallocT!DNSDriver(m_events, m_signals); m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events); m_files = mallocT!FileDriver(m_events);
m_watchers = mallocT!WatcherDriver(m_events); m_watchers = mallocT!WatcherDriver(m_events);
@ -86,7 +96,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
final override @property inout(SocketsDriver) sockets() inout { return m_sockets; } final override @property inout(SocketsDriver) sockets() inout { return m_sockets; }
final override @property inout(DNSDriver) dns() inout { return m_dns; } final override @property inout(DNSDriver) dns() inout { return m_dns; }
final override @property inout(FileDriver) files() inout { return m_files; } final override @property inout(FileDriver) files() inout { return m_files; }
final override @property inout(PipeDriver) pipes() inout { return m_pipes; }
final override @property inout(WatcherDriver) watchers() inout { return m_watchers; } final override @property inout(WatcherDriver) watchers() inout { return m_watchers; }
final override @property inout(ProcessDriver) processes() inout { return m_processes; }
final override bool dispose() final override bool dispose()
{ {
@ -111,13 +123,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
return false; return false;
} }
m_processes.dispose();
m_files.dispose(); m_files.dispose();
m_dns.dispose(); m_dns.dispose();
m_core.dispose(); m_core.dispose();
m_loop.dispose(); m_loop.dispose();
try () @trusted { try () @trusted {
freeT(m_processes);
freeT(m_watchers); freeT(m_watchers);
freeT(m_pipes);
freeT(m_files); freeT(m_files);
freeT(m_dns); freeT(m_dns);
freeT(m_core); freeT(m_core);
@ -134,7 +149,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
} }
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore { final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore {
@safe nothrow: @safe nothrow:
import core.atomic : atomicLoad, atomicStore; import core.atomic : atomicLoad, atomicStore;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
@ -148,6 +163,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Loop m_loop; Loop m_loop;
Timers m_timers; Timers m_timers;
Events m_events; Events m_events;
Processes m_processes;
bool m_exit = false; bool m_exit = false;
EventID m_wakeupEvent; EventID m_wakeupEvent;
@ -155,11 +171,12 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
} }
this(Loop loop, Timers timers, Events events) this(Loop loop, Timers timers, Events events, Processes processes)
@nogc { @nogc {
m_loop = loop; m_loop = loop;
m_timers = timers; m_timers = timers;
m_events = events; m_events = events;
m_processes = processes;
m_wakeupEvent = events.createInternal(); m_wakeupEvent = events.createInternal();
static if (__VERSION__ >= 2074) static if (__VERSION__ >= 2074)
@ -183,7 +200,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
final override ExitReason processEvents(Duration timeout) final override ExitReason processEvents(Duration timeout)
{ {
@ -300,7 +317,7 @@ package class PosixEventLoop {
import core.time : Duration; import core.time : Duration;
package { package {
AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds; AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot, PipeSlot) m_fds;
size_t m_handleCount = 0; size_t m_handleCount = 0;
size_t m_waiterCount = 0; size_t m_waiterCount = 0;
} }

View file

@ -0,0 +1,391 @@
module eventcore.drivers.posix.pipes;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.internal.utils : nogc_assert, print;
import std.algorithm : min, max;
final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN;
import core.sys.posix.unistd : close, read, write;
import core.sys.posix.fcntl;
import core.sys.posix.poll;
private Loop m_loop;
this(Loop loop)
@nogc {
m_loop = loop;
}
final override PipeFD adopt(int system_fd)
{
auto fd = PipeFD(system_fd);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return PipeFD.invalid;
() @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK); } ();
m_loop.initFD(fd, FDFlags.none, PipeSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
return fd;
}
final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
{
auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
// Read failed
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
print("Pipe error %s!", err);
on_read_finish(pipe, IOStatus.error, 0);
return;
}
}
// EOF
if (ret == 0 && buffer.length > 0) {
on_read_finish(pipe, IOStatus.disconnected, 0);
return;
}
// Handle immediate mode
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(pipe, IOStatus.wouldBlock, 0);
return;
}
// Handle successful read
if (ret >= 0) {
buffer = buffer[ret .. $];
// Handle completed read
if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(pipe, IOStatus.ok, ret);
return;
}
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
slot.readCallback = on_read_finish;
slot.readMode = mode;
slot.bytesRead = max(ret, 0);
slot.readBuffer = buffer;
// Need to use EventType.status as well, as pipes don't otherwise notify
// of closes
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead);
m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead);
}
private void onPipeRead(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
void finalize(IOStatus status)
{
addRef(pipe);
scope(exit) releaseRef(pipe);
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
m_loop.setNotifyCallback!(EventType.status)(pipe, null);
auto cb = slot.readCallback;
slot.readCallback = null;
slot.readBuffer = null;
cb(pipe, status, slot.bytesRead);
}
ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } ();
// Read failed
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
print("Pipe error %s!", err);
finalize(IOStatus.error);
return;
}
}
// EOF
if (ret == 0 && slot.readBuffer.length > 0) {
finalize(IOStatus.disconnected);
return;
}
// Successful read
if (ret > 0 || !slot.readBuffer.length) {
slot.readBuffer = slot.readBuffer[ret .. $];
slot.bytesRead += ret;
// Handle completed read
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
finalize(IOStatus.ok);
return;
}
}
}
final override void cancelRead(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
slot.readBuffer = null;
}
final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
{
if (buffer.length == 0) {
on_write_finish(pipe, IOStatus.ok, 0);
return;
}
ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
on_write_finish(pipe, IOStatus.error, 0);
return;
}
if (mode == IOMode.immediate) {
on_write_finish(pipe, IOStatus.wouldBlock, 0);
return;
}
} else {
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_write_finish(pipe, IOStatus.ok, ret);
return;
}
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.writeCallback is null, "Concurrent writes not allowed.");
slot.writeCallback = on_write_finish;
slot.writeMode = mode;
slot.bytesWritten = max(ret, 0);
slot.writeBuffer = buffer;
m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite);
}
private void onPipeWrite(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
void finalize(IOStatus status)
{
addRef(pipe);
scope(exit) releaseRef(pipe);
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
auto cb = slot.writeCallback;
slot.writeCallback = null;
slot.writeBuffer = null;
cb(pipe, IOStatus.error, slot.bytesRead);
}
ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
finalize(IOStatus.error);
}
} else {
slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
finalize(IOStatus.ok);
}
}
}
final override void cancelWrite(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
slot.writeCallback = null;
slot.writeBuffer = null;
}
final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
{
if (pollPipe(pipe, on_data_available))
{
return;
}
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
slot.readCallback = on_data_available;
slot.readMode = IOMode.once; // currently meaningless
slot.bytesRead = 0; // currently meaningless
slot.readBuffer = null;
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable);
}
private void onPipeDataAvailable(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
auto pipe = cast(PipeFD)fd;
auto callback = (PipeFD f, IOStatus s, size_t m) {
addRef(f);
scope(exit) releaseRef(f);
auto cb = slot.readCallback;
slot.readCallback = null;
slot.readBuffer = null;
cb(f, s, m);
};
if (pollPipe(pipe, callback))
{
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
}
}
private bool pollPipe(PipeFD pipe, PipeIOCallback callback)
@trusted {
// Use poll to check if any data is available
pollfd pfd;
pfd.fd = cast(int)pipe;
pfd.events = POLLIN;
int ret = poll(&pfd, 1, 0);
if (ret == -1) {
print("Error polling pipe: %s!", errno);
callback(pipe, IOStatus.error, 0);
return true;
}
if (ret == 1) {
callback(pipe, IOStatus.error, 0);
return true;
}
return false;
}
final override void close(PipeFD pipe)
{
// TODO: Maybe actually close here instead of waiting for releaseRef?
close(cast(int)pipe);
}
final override void addRef(PipeFD pipe)
{
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD.");
slot.common.refCount++;
}
final override bool releaseRef(PipeFD pipe)
{
import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
if (--slot.common.refCount == 0) {
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!PipeSlot(pipe);
close(cast(int)pipe);
return false;
}
return true;
}
final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
}
}
final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
this(Loop loop) {}
override PipeFD adopt(int system_pipe_handle)
{
assert(false, "TODO!");
}
override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
{
assert(false, "TODO!");
}
override void cancelRead(PipeFD pipe)
{
assert(false, "TODO!");
}
override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
{
assert(false, "TODO!");
}
override void cancelWrite(PipeFD pipe)
{
assert(false, "TODO!");
}
override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
{
assert(false, "TODO!");
}
override void close(PipeFD pipe)
{
assert(false, "TODO!");
}
override void addRef(PipeFD pid)
{
assert(false, "TODO!");
}
override bool releaseRef(PipeFD pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
}
package struct PipeSlot {
alias Handle = PipeFD;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
PipeIOCallback readCallback;
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
PipeIOCallback writeCallback;
}

View file

@ -0,0 +1,351 @@
module eventcore.drivers.posix.processes;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.drivers.posix.signals;
import eventcore.internal.utils : nogc_assert, print;
import std.algorithm.comparison : among;
import std.variant : visit;
private enum SIGCHLD = 17;
final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.linux.sys.signalfd;
import core.sys.posix.unistd : close, read, write, dup;
private {
static struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
Loop m_loop;
EventDriverPipes m_pipes;
ProcessInfo[ProcessID] m_processes;
SignalListenID m_sighandle;
}
this(Loop loop, EventDriverPipes pipes)
{
import core.sys.posix.signal;
m_loop = loop;
m_pipes = pipes;
// Listen for child process exits using SIGCHLD
m_sighandle = () @trusted {
sigset_t sset;
sigemptyset(&sset);
sigaddset(&sset, SIGCHLD);
assert(sigprocmask(SIG_BLOCK, &sset, null) == 0);
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC));
} ();
m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null));
m_loop.registerFD(cast(FD)m_sighandle, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal);
onSignal(cast(FD)m_sighandle);
}
void dispose()
{
FD sighandle = cast(FD)m_sighandle;
m_loop.m_fds[sighandle].common.refCount--;
m_loop.setNotifyCallback!(EventType.read)(sighandle, null);
m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!(SignalSlot)(sighandle);
close(cast(int)sighandle);
}
final override ProcessID adopt(int system_pid)
{
auto pid = cast(ProcessID)system_pid;
assert(pid !in m_processes, "Process is already adopted");
ProcessInfo info;
info.exited = false;
info.refCount = 1;
m_processes[pid] = info;
return pid;
}
final override Process spawn(
string[] args,
ProcessStdinFile stdin,
ProcessStdoutFile stdout,
ProcessStderrFile stderr,
const string[string] env,
ProcessConfig config,
string working_dir)
@trusted {
// Use std.process to spawn processes
import std.process : pipe, Pid, spawnProcess;
import std.stdio : File;
static import std.stdio;
static File fdToFile(int fd, scope const(char)[] mode)
{
try {
File f;
f.fdopen(fd, mode);
return f;
} catch (Exception e) {
assert(0);
}
}
try {
Process process;
File stdinFile, stdoutFile, stderrFile;
stdinFile = stdin.visit!(
(int handle) => fdToFile(handle, "r"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdin;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdin = m_pipes.adopt(dup(p.writeEnd.fileno));
return p.readEnd;
}
});
stdoutFile = stdout.visit!(
(int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdout;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdout = m_pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
stderrFile = stderr.visit!(
(int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stderr;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stderr = m_pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect;
const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
if (redirectStdout) {
assert(!redirectStderr, "Can't redirect both stdout and stderr");
stdoutFile = stderrFile;
} else if (redirectStderr) {
stderrFile = stdoutFile;
}
Pid stdPid = spawnProcess(
args,
stdinFile,
stdoutFile,
stderrFile,
env,
cast(std.process.Config)config,
working_dir);
process.pid = adopt(stdPid.osHandle);
stdPid.destroy();
return process;
} catch (Exception e) {
return Process.init;
}
}
final override void kill(ProcessID pid, int signal)
@trusted {
import core.sys.posix.signal : pkill = kill;
pkill(cast(int)pid, signal);
}
final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
if (info.exited) {
on_process_exit(pid, info.exitCode);
return 0;
} else {
info.callbacks ~= on_process_exit;
return info.callbacks.length - 1;
}
}
final override void cancelWait(ProcessID pid, size_t waitId)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > waitId, "Invalid process wait ID");
info.callbacks[waitId] = null;
}
private void onSignal(FD fd)
{
SignalListenID lid = cast(SignalListenID)fd;
signalfd_siginfo nfo;
do {
auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof)
return;
onProcessExit(nfo.ssi_pid, nfo.ssi_status);
} while (true);
}
private void onProcessExit(int system_pid, int exitCode)
{
auto pid = cast(ProcessID)system_pid;
auto info = () @trusted { return pid in m_processes; } ();
// We get notified of any child exiting, so ignore the ones we're not
// aware of
if (info is null) {
return;
}
info.exited = true;
info.exitCode = exitCode;
foreach (cb; info.callbacks) {
if (cb)
cb(pid, exitCode);
}
info.callbacks = null;
}
final override bool hasExited(ProcessID pid)
{
auto info = () @trusted { return pid in m_processes; } ();
assert(info !is null, "Unknown process ID");
return info.exited;
}
final override void addRef(ProcessID pid)
{
auto info = () @trusted { return &m_processes[pid]; } ();
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++;
}
final override bool releaseRef(ProcessID pid)
{
auto info = () @trusted { return &m_processes[pid]; } ();
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) {
// Remove/deallocate process
if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } ();
m_processes.remove(pid);
return false;
}
return true;
}
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
auto info = () @trusted { return &m_processes[pid]; } ();
assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
if (!info.userDataDestructor) {
initialize(info.userData.ptr);
info.userDataDestructor = destroy;
}
return info.userData.ptr;
}
package final @property size_t pendingCount() const nothrow { return m_processes.length; }
}
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
this(Loop loop, EventDriverPipes pipes) {}
void dispose() {}
override ProcessID adopt(int system_pid)
{
assert(false, "TODO!");
}
override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
{
assert(false, "TODO!");
}
override bool hasExited(ProcessID pid)
{
assert(false, "TODO!");
}
override void kill(ProcessID pid, int signal)
{
assert(false, "TODO!");
}
override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{
assert(false, "TODO!");
}
override void cancelWait(ProcessID pid, size_t waitId)
{
assert(false, "TODO!");
}
override void addRef(ProcessID pid)
{
assert(false, "TODO!");
}
override bool releaseRef(ProcessID pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
package final @property size_t pendingCount() const nothrow { return 0; }
}

View file

@ -15,6 +15,8 @@ import eventcore.drivers.winapi.core;
import eventcore.drivers.winapi.dns; import eventcore.drivers.winapi.dns;
import eventcore.drivers.winapi.events; import eventcore.drivers.winapi.events;
import eventcore.drivers.winapi.files; import eventcore.drivers.winapi.files;
import eventcore.drivers.winapi.pipes;
import eventcore.drivers.winapi.processes;
import eventcore.drivers.winapi.signals; import eventcore.drivers.winapi.signals;
import eventcore.drivers.winapi.sockets; import eventcore.drivers.winapi.sockets;
import eventcore.drivers.winapi.watchers; import eventcore.drivers.winapi.watchers;
@ -35,6 +37,8 @@ final class WinAPIEventDriver : EventDriver {
WinAPIEventDriverEvents m_events; WinAPIEventDriverEvents m_events;
WinAPIEventDriverSignals m_signals; WinAPIEventDriverSignals m_signals;
WinAPIEventDriverWatchers m_watchers; WinAPIEventDriverWatchers m_watchers;
WinAPIEventDriverProcesses m_processes;
WinAPIEventDriverPipes m_pipes;
} }
static WinAPIEventDriver threadInstance; static WinAPIEventDriver threadInstance;
@ -57,8 +61,10 @@ final class WinAPIEventDriver : EventDriver {
m_events = mallocT!WinAPIEventDriverEvents(m_core); m_events = mallocT!WinAPIEventDriverEvents(m_core);
m_files = mallocT!WinAPIEventDriverFiles(m_core); m_files = mallocT!WinAPIEventDriverFiles(m_core);
m_sockets = mallocT!WinAPIEventDriverSockets(m_core); m_sockets = mallocT!WinAPIEventDriverSockets(m_core);
m_pipes = mallocT!WinAPIEventDriverPipes();
m_dns = mallocT!WinAPIEventDriverDNS(); m_dns = mallocT!WinAPIEventDriverDNS();
m_watchers = mallocT!WinAPIEventDriverWatchers(m_core); m_watchers = mallocT!WinAPIEventDriverWatchers(m_core);
m_processes = mallocT!WinAPIEventDriverProcesses();
} }
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
@ -73,6 +79,8 @@ final class WinAPIEventDriver : EventDriver {
override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; } override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; }
override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; } override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; }
override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; } override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; }
override @property inout(WinAPIEventDriverProcesses) processes() inout { return m_processes; }
override @property inout(WinAPIEventDriverPipes) pipes() inout { return m_pipes; }
override bool dispose() override bool dispose()
{ {
@ -88,8 +96,10 @@ final class WinAPIEventDriver : EventDriver {
threadInstance = null; threadInstance = null;
try () @trusted { try () @trusted {
freeT(m_processes);
freeT(m_watchers); freeT(m_watchers);
freeT(m_dns); freeT(m_dns);
freeT(m_pipes);
freeT(m_sockets); freeT(m_sockets);
freeT(m_files); freeT(m_files);
freeT(m_events); freeT(m_events);

View file

@ -0,0 +1,59 @@
module eventcore.drivers.winapi.pipes;
version (Windows):
import eventcore.driver;
import eventcore.internal.win32;
final class WinAPIEventDriverPipes : EventDriverPipes {
@safe: /*@nogc:*/ nothrow:
override PipeFD adopt(int system_pipe_handle)
{
assert(false, "TODO!");
}
override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
{
assert(false, "TODO!");
}
override void cancelRead(PipeFD pipe)
{
assert(false, "TODO!");
}
override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
{
assert(false, "TODO!");
}
override void cancelWrite(PipeFD pipe)
{
assert(false, "TODO!");
}
override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
{
assert(false, "TODO!");
}
override void close(PipeFD pipe)
{
assert(false, "TODO!");
}
override void addRef(PipeFD pid)
{
assert(false, "TODO!");
}
override bool releaseRef(PipeFD pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
}

View file

@ -0,0 +1,54 @@
module eventcore.drivers.winapi.processes;
version (Windows):
import eventcore.driver;
import eventcore.internal.win32;
final class WinAPIEventDriverProcesses : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow:
override ProcessID adopt(int system_pid)
{
assert(false, "TODO!");
}
override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
{
assert(false, "TODO!");
}
override bool hasExited(ProcessID pid)
{
assert(false, "TODO!");
}
override void kill(ProcessID pid, int signal)
{
assert(false, "TODO!");
}
override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{
assert(false, "TODO!");
}
override void cancelWait(ProcessID pid, size_t waitId)
{
assert(false, "TODO!");
}
override void addRef(ProcessID pid)
{
assert(false, "TODO!");
}
override bool releaseRef(ProcessID pid)
{
assert(false, "TODO!");
}
protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
assert(false, "TODO!");
}
}