Fix indentation style.

This commit is contained in:
Sönke Ludwig 2019-08-21 16:10:57 +02:00
parent e23090b513
commit e1c6d99798

View file

@ -11,45 +11,45 @@ import std.variant : visit;
import std.stdint; import std.stdint;
private struct ProcessInfo { private struct ProcessInfo {
bool exited = true; bool exited = true;
int exitCode; int exitCode;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
size_t refCount = 0; size_t refCount = 0;
EventDriverProcesses driver; EventDriverProcesses driver;
DataInitializer userDataDestructor; DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData; ubyte[16*size_t.sizeof] userData;
} }
private struct StaticProcesses { private struct StaticProcesses {
@safe: nothrow: @safe: nothrow:
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
private { private {
static shared Mutex m_mutex; static shared Mutex m_mutex;
static __gshared ProcessInfo[ProcessID] m_processes; static __gshared ProcessInfo[ProcessID] m_processes;
} }
shared static this() shared static this()
{ {
m_mutex = new shared Mutex; m_mutex = new shared Mutex;
} }
static void add(ProcessID pid, ProcessInfo info) @trusted { static void add(ProcessID pid, ProcessInfo info) @trusted {
m_mutex.lock_nothrow(); m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow(); scope (exit) m_mutex.unlock_nothrow();
assert(pid !in m_processes, "Process adopted twice"); assert(pid !in m_processes, "Process adopted twice");
m_processes[pid] = info; m_processes[pid] = info;
} }
} }
private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted { private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted {
StaticProcesses.m_mutex.lock_nothrow(); StaticProcesses.m_mutex.lock_nothrow();
scope (exit) StaticProcesses.m_mutex.unlock_nothrow(); scope (exit) StaticProcesses.m_mutex.unlock_nothrow();
auto info = pid in StaticProcesses.m_processes; auto info = pid in StaticProcesses.m_processes;
return fn(info); return fn(info);
} }
@ -57,380 +57,380 @@ private enum SIGCHLD = 17;
final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.linux.sys.signalfd; import core.sys.linux.sys.signalfd;
import core.sys.posix.unistd : close, read, write, dup; import core.sys.posix.unistd : close, read, write, dup;
private { private {
Loop m_loop; Loop m_loop;
EventDriver m_driver; EventDriver m_driver;
SignalListenID m_sighandle; SignalListenID m_sighandle;
} }
this(Loop loop, EventDriver driver) this(Loop loop, EventDriver driver)
{ {
import core.sys.posix.signal; import core.sys.posix.signal;
m_loop = loop; m_loop = loop;
m_driver = driver; m_driver = driver;
// Listen for child process exits using SIGCHLD // Listen for child process exits using SIGCHLD
m_sighandle = () @trusted { m_sighandle = () @trusted {
sigset_t sset; sigset_t sset;
sigemptyset(&sset); sigemptyset(&sset);
sigaddset(&sset, SIGCHLD); sigaddset(&sset, SIGCHLD);
assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); assert(sigprocmask(SIG_BLOCK, &sset, null) == 0);
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC));
} (); } ();
m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null));
m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); m_loop.registerFD(cast(FD)m_sighandle, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal);
onSignal(cast(FD)m_sighandle); onSignal(cast(FD)m_sighandle);
} }
void dispose() void dispose()
{ {
FD sighandle = cast(FD)m_sighandle; FD sighandle = cast(FD)m_sighandle;
m_loop.m_fds[sighandle].common.refCount--; m_loop.m_fds[sighandle].common.refCount--;
m_loop.setNotifyCallback!(EventType.read)(sighandle, null); m_loop.setNotifyCallback!(EventType.read)(sighandle, null);
m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status); m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!(SignalSlot)(sighandle); m_loop.clearFD!(SignalSlot)(sighandle);
close(cast(int)sighandle); close(cast(int)sighandle);
} }
final override ProcessID adopt(int system_pid) final override ProcessID adopt(int system_pid)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
ProcessInfo info; ProcessInfo info;
info.exited = false; info.exited = false;
info.refCount = 1; info.refCount = 1;
info.driver = this; info.driver = this;
StaticProcesses.add(pid, info); StaticProcesses.add(pid, info);
return pid; return pid;
} }
final override Process spawn( final override Process spawn(
string[] args, string[] args,
ProcessStdinFile stdin, ProcessStdinFile stdin,
ProcessStdoutFile stdout, ProcessStdoutFile stdout,
ProcessStderrFile stderr, ProcessStderrFile stderr,
const string[string] env, const string[string] env,
ProcessConfig config, ProcessConfig config,
string working_dir) string working_dir)
@trusted { @trusted {
// Use std.process to spawn processes // Use std.process to spawn processes
import std.process : pipe, Pid, spawnProcess; import std.process : pipe, Pid, spawnProcess;
import std.stdio : File; import std.stdio : File;
static import std.stdio; static import std.stdio;
static File fdToFile(int fd, scope const(char)[] mode) static File fdToFile(int fd, scope const(char)[] mode)
{ {
try { try {
File f; File f;
f.fdopen(fd, mode); f.fdopen(fd, mode);
return f; return f;
} catch (Exception e) { } catch (Exception e) {
assert(0); assert(0);
} }
} }
try { try {
Process process; Process process;
File stdinFile, stdoutFile, stderrFile; File stdinFile, stdoutFile, stderrFile;
stdinFile = stdin.visit!( stdinFile = stdin.visit!(
(int handle) => fdToFile(handle, "r"), (int handle) => fdToFile(handle, "r"),
(ProcessRedirect redirect) { (ProcessRedirect redirect) {
final switch (redirect) { final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdin; case ProcessRedirect.inherit: return std.stdio.stdin;
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno));
return p.readEnd; return p.readEnd;
} }
}); });
stdoutFile = stdout.visit!( stdoutFile = stdout.visit!(
(int handle) => fdToFile(handle, "w"), (int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) { (ProcessRedirect redirect) {
final switch (redirect) { final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdout; case ProcessRedirect.inherit: return std.stdio.stdout;
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd; return p.writeEnd;
} }
}, },
(_) => File.init); (_) => File.init);
stderrFile = stderr.visit!( stderrFile = stderr.visit!(
(int handle) => fdToFile(handle, "w"), (int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) { (ProcessRedirect redirect) {
final switch (redirect) { final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stderr; case ProcessRedirect.inherit: return std.stdio.stderr;
case ProcessRedirect.none: return File.init; case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe: case ProcessRedirect.pipe:
auto p = pipe(); auto p = pipe();
process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd; return p.writeEnd;
} }
}, },
(_) => File.init); (_) => File.init);
const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect;
const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
if (redirectStdout) { if (redirectStdout) {
assert(!redirectStderr, "Can't redirect both stdout and stderr"); assert(!redirectStderr, "Can't redirect both stdout and stderr");
stdoutFile = stderrFile; stdoutFile = stderrFile;
} else if (redirectStderr) { } else if (redirectStderr) {
stderrFile = stdoutFile; stderrFile = stdoutFile;
} }
Pid stdPid = spawnProcess( Pid stdPid = spawnProcess(
args, args,
stdinFile, stdinFile,
stdoutFile, stdoutFile,
stderrFile, stderrFile,
env, env,
cast(std.process.Config)config, cast(std.process.Config)config,
working_dir); working_dir);
process.pid = adopt(stdPid.osHandle); process.pid = adopt(stdPid.osHandle);
stdPid.destroy(); stdPid.destroy();
return process; return process;
} catch (Exception e) { } catch (Exception e) {
return Process.init; return Process.init;
} }
} }
final override void kill(ProcessID pid, int signal) final override void kill(ProcessID pid, int signal)
@trusted { @trusted {
import core.sys.posix.signal : pkill = kill; import core.sys.posix.signal : pkill = kill;
pkill(cast(int)pid, signal); pkill(cast(int)pid, signal);
} }
final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{ {
bool exited; bool exited;
int exitCode; int exitCode;
size_t id = lockedProcessInfo!((info) { size_t id = lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
if (info.exited) { if (info.exited) {
exited = true; exited = true;
exitCode = info.exitCode; exitCode = info.exitCode;
return 0; return 0;
} else { } else {
info.callbacks ~= on_process_exit; info.callbacks ~= on_process_exit;
return info.callbacks.length - 1; return info.callbacks.length - 1;
} }
})(pid); })(pid);
if (exited) { if (exited) {
on_process_exit(pid, exitCode); on_process_exit(pid, exitCode);
} }
return id; return id;
} }
final override void cancelWait(ProcessID pid, size_t waitId) final override void cancelWait(ProcessID pid, size_t waitId)
{ {
lockedProcessInfo!((info) { lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending"); assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > waitId, "Invalid process wait ID"); assert(info.callbacks.length > waitId, "Invalid process wait ID");
info.callbacks[waitId] = null; info.callbacks[waitId] = null;
})(pid); })(pid);
} }
private void onSignal(FD fd) private void onSignal(FD fd)
{ {
SignalListenID lid = cast(SignalListenID)fd; SignalListenID lid = cast(SignalListenID)fd;
signalfd_siginfo nfo; signalfd_siginfo nfo;
do { do {
auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } (); auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof)
return; return;
onProcessExit(nfo.ssi_pid, nfo.ssi_status); onProcessExit(nfo.ssi_pid, nfo.ssi_status);
} while (true); } while (true);
} }
private void onProcessExit(int system_pid, int exitCode) private void onProcessExit(int system_pid, int exitCode)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) @safe { auto driver = lockedProcessInfo!((info) @safe {
// We get notified of any child exiting, so ignore the ones we're // We get notified of any child exiting, so ignore the ones we're
// not aware of // not aware of
if (info is null) { if (info is null) {
return null; return null;
} }
// Increment the ref count to make sure it doesn't get removed // Increment the ref count to make sure it doesn't get removed
info.refCount++; info.refCount++;
info.exited = true; info.exited = true;
info.exitCode = exitCode; info.exitCode = exitCode;
return info.driver; return info.driver;
})(pid); })(pid);
// Need to call callbacks in the owner thread as this function can be // Need to call callbacks in the owner thread as this function can be
// called from any thread. Without extra threads this is always the main // called from any thread. Without extra threads this is always the main
// thread. // thread.
if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) {
onLocalProcessExit(cast(intptr_t)pid); onLocalProcessExit(cast(intptr_t)pid);
} else if (driver) { } else if (driver) {
auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } (); auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } ();
sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid);
} }
} }
private static void onLocalProcessExit(intptr_t system_pid) private static void onLocalProcessExit(intptr_t system_pid)
{ {
auto pid = cast(ProcessID)system_pid; auto pid = cast(ProcessID)system_pid;
int exitCode; int exitCode;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) { auto driver = lockedProcessInfo!((info) {
assert(info !is null); assert(info !is null);
exitCode = info.exitCode; exitCode = info.exitCode;
callbacks = info.callbacks; callbacks = info.callbacks;
info.callbacks = null; info.callbacks = null;
return info.driver; return info.driver;
})(pid); })(pid);
foreach (cb; callbacks) { foreach (cb; callbacks) {
if (cb) if (cb)
cb(pid, exitCode); cb(pid, exitCode);
} }
driver.releaseRef(pid); driver.releaseRef(pid);
} }
final override bool hasExited(ProcessID pid) final override bool hasExited(ProcessID pid)
{ {
return lockedProcessInfo!((info) { return lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
return info.exited; return info.exited;
})(pid); })(pid);
} }
final override void addRef(ProcessID pid) final override void addRef(ProcessID pid)
{ {
lockedProcessInfo!((info) { lockedProcessInfo!((info) {
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++; info.refCount++;
})(pid); })(pid);
} }
final override bool releaseRef(ProcessID pid) final override bool releaseRef(ProcessID pid)
{ {
return lockedProcessInfo!((info) { return lockedProcessInfo!((info) {
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) { if (--info.refCount == 0) {
// Remove/deallocate process // Remove/deallocate process
if (info.userDataDestructor) if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } (); () @trusted { info.userDataDestructor(info.userData.ptr); } ();
StaticProcesses.m_processes.remove(pid); StaticProcesses.m_processes.remove(pid);
return false; return false;
} }
return true; return true;
})(pid); })(pid);
} }
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
return lockedProcessInfo!((info) { return lockedProcessInfo!((info) {
assert(info.userDataDestructor is null || info.userDataDestructor is destroy, assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
if (!info.userDataDestructor) { if (!info.userDataDestructor) {
initialize(info.userData.ptr); initialize(info.userData.ptr);
info.userDataDestructor = destroy; info.userDataDestructor = destroy;
} }
return info.userData.ptr; return info.userData.ptr;
})(pid); })(pid);
} }
package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; }
} }
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
this(Loop loop, EventDriver driver) {} this(Loop loop, EventDriver driver) {}
void dispose() {} void dispose() {}
override ProcessID adopt(int system_pid) override ProcessID adopt(int system_pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool hasExited(ProcessID pid) override bool hasExited(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void kill(ProcessID pid, int signal) override void kill(ProcessID pid, int signal)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void cancelWait(ProcessID pid, size_t waitId) override void cancelWait(ProcessID pid, size_t waitId)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void addRef(ProcessID pid) override void addRef(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool releaseRef(ProcessID pid) override bool releaseRef(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
assert(false, "TODO!"); assert(false, "TODO!");
} }
package final @property size_t pendingCount() const nothrow { return 0; } package final @property size_t pendingCount() const nothrow { return 0; }
} }