From c3f5ebb9fddc5397da4a64e3d560e6396eb2c860 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 5 May 2019 15:27:57 +1000 Subject: [PATCH 1/9] Implement vibe.core.process for subprocess handling similar to std.process --- source/vibe/core/process.d | 752 +++++++++++++++++++++++++++++++++++++ tests/vibe.core.process.d | 200 ++++++++++ 2 files changed, 952 insertions(+) create mode 100644 source/vibe/core/process.d create mode 100644 tests/vibe.core.process.d diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d new file mode 100644 index 0000000..0fb76e2 --- /dev/null +++ b/source/vibe/core/process.d @@ -0,0 +1,752 @@ +/** + Functions and structures for dealing with subprocesses and pipes. + + This module is modeled after std.process, but provides a fiber-aware + alternative to it. All blocking operations will yield the calling fiber + instead of blocking it. +*/ +module vibe.core.process; + +public import std.process : Pid, Redirect; +static import std.process; + +import core.time; +import std.array; +import std.typecons; +import std.exception : enforce; +import std.algorithm; +import eventcore.core; +import vibe.core.path; +import vibe.core.log; +import vibe.core.stream; +import vibe.internal.async; +import vibe.internal.array : BatchBuffer; +import vibe.core.internal.release; + +@safe: + +/** + Register a process with vibe for fibre-aware handling. This process can be + started from anywhere including external libraries or std.process. + + Params: + pid = A Pid or OS process handle +*/ +Process registerProcess(Pid pid) +@trusted { + return registerProcess(pid.osHandle); +} + +/// ditto +Process registerProcess(int pid) +{ + return Process(eventDriver.processes.adopt(pid)); +} + +/** + Path to the user's preferred command interpreter. + + See_Also: `nativeShell` +*/ +@property NativePath userShell() { return NativePath(std.process.userShell); } + +/** + The platform specific native shell path. + + See_Also: `userShell` +*/ +const NativePath nativeShell = NativePath(std.process.nativeShell); + +/** + Equivalent to `std.process.Config` except with less flag support +*/ +enum Config { + none = ProcessConfig.none, + newEnv = ProcessConfig.newEnv, + suppressConsole = ProcessConfig.suppressConsole, + detached = ProcessConfig.detached, +} + +/** + Equivalent to `std.process.spawnProcess`. + + Returns: + A reference to the running process. + + See_Also: `pipeProcess`, `execute` +*/ +Process spawnProcess( + scope string[] args, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) +@trusted { + return Process(eventDriver.processes.spawn( + args, + ProcessStdinFile(ProcessRedirect.inherit), + ProcessStdoutFile(ProcessRedirect.inherit), + ProcessStderrFile(ProcessRedirect.inherit), + env, + config, + workDir.toNativeString()).pid + ); +} + +/// ditto +Process spawnProcess( + scope string program, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) +{ + return spawnProcess( + [program], + env, + config, + workDir + ); +} + +/// ditto +Process spawnShell( + scope string command, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init, + scope NativePath shellPath = nativeShell) +{ + return spawnProcess( + shellCommand(command, shellPath), + env, + config, + workDir); +} + +private string[] shellCommand(string command, NativePath shellPath) +{ + version (Windows) + { + // CMD does not parse its arguments like other programs. + // It does not use CommandLineToArgvW. + // Instead, it treats the first and last quote specially. + // See CMD.EXE /? for details. + return [ + std.process.escapeShellFileName(shellPath.toNativeString()) + ~ ` /C "` ~ command ~ `"` + ]; + } + else version (Posix) + { + return [ + shellPath.toNativeString(), + "-c", + command, + ]; + } +} + +/** + Represents a running process. +*/ +struct Process { + private static struct Context { + //Duration waitTimeout; + shared(NativeEventDriver) driver; + } + + private { + ProcessID m_pid; + Context* m_context; + } + + private this(ProcessID p) + nothrow { + assert(p != ProcessID.invalid); + m_pid = p; + m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } + + this(this) + nothrow { + if (m_pid != ProcessID.invalid) + eventDriver.processes.addRef(m_pid); + } + + ~this() + nothrow { + if (m_pid != ProcessID.invalid) + releaseHandle!"processes"(m_pid, m_context.driver); + } + + /** + Check whether this is a valid process handle. The process may have + exited already. + */ + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; } + + /// + unittest { + Process p; + + assert(!p); + } + + /** + An operating system handle to the process. + */ + @property int osHandle() const nothrow @nogc { return cast(int)m_pid; } + + /** + Whether the process has exited. + */ + @property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); } + + /** + Wait for the process to exit, allowing other fibers to continue in the + meantime. + + Params: + timeout = Optionally wait until a timeout is reached. + + Returns: + The exit code of the process. If a timeout is given and reached, a + null value is returned. + */ + int wait() + @blocking { + return asyncAwaitUninterruptible!(ProcessWaitCallback, + cb => eventDriver.processes.wait(m_pid, cb) + )[1]; + } + + /// Ditto + Nullable!int wait(Duration timeout) + @blocking { + size_t waitId; + bool cancelled = false; + + int code = asyncAwaitUninterruptible!(ProcessWaitCallback, + (cb) nothrow @safe { + waitId = eventDriver.processes.wait(m_pid, cb); + }, + (cb) nothrow @safe { + eventDriver.processes.cancelWait(m_pid, waitId); + cancelled = true; + }, + )(timeout)[1]; + + if (cancelled) { + return Nullable!int.init; + } else { + return code.nullable; + } + } + + /** + Kill the process. + + By default on Linux this sends SIGTERM to the process. + + Params: + signal = Optional parameter for the signal to send to the process. + */ + void kill() + { + version (Posix) + { + import core.sys.posix.signal : SIGTERM; + eventDriver.processes.kill(m_pid, SIGTERM); + } + else static assert(0); + } + + /// ditto + void kill(int signal) + { + eventDriver.processes.kill(m_pid, signal); + } + + /** + Terminate the process immediately. + + On Linux this sends SIGKILL to the process. + */ + void forceKill() + { + version (Posix) + { + import core.sys.posix.signal : SIGKILL; + eventDriver.processes.kill(m_pid, SIGKILL); + } + else static assert(0); + } + + /** + Wait for the process to exit until a timeout is reached. If the process + doesn't exit before the timeout, force kill it. + + Returns: + The process exit code. + */ + int waitOrForceKill(Duration timeout) + @blocking { + auto code = wait(timeout); + + if (code.isNull) { + forceKill(); + return wait(); + } else { + return code.get; + } + } +} + +/** + A stream for tBatchBufferhe write end of a pipe. +*/ +struct PipeInputStream { + private static struct Context { + BatchBuffer!ubyte readBuffer; + shared(NativeEventDriver) driver; + } + + private { + PipeFD m_pipe; + Context* m_context; + } + + private this(PipeFD pipe) + nothrow { + m_pipe = pipe; + if (pipe != PipeFD.invalid) { + m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); + m_context.readBuffer.capacity = 4096; + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } + } + + this(this) + nothrow { + if (m_pipe != PipeFD.invalid) + eventDriver.pipes.addRef(m_pipe); + } + + ~this() + nothrow { + if (m_pipe != PipeFD.invalid) + releaseHandle!"pipes"(m_pipe, m_context.driver); + } + + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } + + @property bool empty() @blocking { return leastSize == 0; } + @property ulong leastSize() + @blocking { + waitForData(); + return m_context ? m_context.readBuffer.length : 0; + } + @property bool dataAvailableForRead() { return waitForData(0.seconds); } + + bool waitForData(Duration timeout = Duration.max) + @blocking { + if (!m_context) return false; + if (m_context.readBuffer.length > 0) return true; + auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; + + bool cancelled; + IOStatus status; + size_t nbytes; + + alias waiter = Waitable!(PipeIOCallback, + cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb), + (cb) { + cancelled = true; + eventDriver.pipes.cancelRead(m_pipe); + }, + (pipe, st, nb) { + // Handle closed pipes + if (m_pipe == PipeFD.invalid) { + cancelled = true; + return; + } + + assert(pipe == m_pipe); + status = st; + nbytes = nb; + } + ); + + asyncAwaitAny!(true, waiter)(timeout); + + if (cancelled || !m_context) return false; + + logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status); + + assert(m_context.readBuffer.length == 0); + m_context.readBuffer.putN(nbytes); + switch (status) { + case IOStatus.ok: break; + case IOStatus.disconnected: break; + case IOStatus.wouldBlock: + assert(mode == IOMode.immediate); + break; + default: + logDebug("Error status when waiting for data: %s", status); + break; + } + + return m_context.readBuffer.length > 0; + } + + const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } + + size_t read(scope ubyte[] dst, IOMode mode) + @blocking { + if (dst.empty) return 0; + + if (m_context.readBuffer.length >= dst.length) { + m_context.readBuffer.read(dst); + return dst.length; + } + + size_t nbytes = 0; + + while (true) { + if (m_context.readBuffer.length == 0) { + if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) + break; + + enforce(waitForData(), "Reached end of stream while reading data."); + } + + assert(m_context.readBuffer.length > 0); + auto l = min(dst.length, m_context.readBuffer.length); + m_context.readBuffer.read(dst[0 .. l]); + dst = dst[l .. $]; + nbytes += l; + if (dst.length == 0) + break; + } + + return nbytes; + } + + void read(scope ubyte[] dst) + @blocking { + auto r = read(dst, IOMode.all); + assert(r == dst.length); + } + + /** + Close the read end of the pipe immediately. + + Make sure that the pipe is not used after this is called and is released + as soon as possible. Due to implementation detail in eventcore this + reference could conflict with future pipes. + */ + void close() + nothrow { + eventDriver.pipes.close(m_pipe); + } +} + +mixin validateInputStream!PipeInputStream; + +/** + Stream for the read end of a pipe. +*/ +struct PipeOutputStream { + private static struct Context { + shared(NativeEventDriver) driver; + } + + private { + PipeFD m_pipe; + Context* m_context; + } + + private this(PipeFD pipe) + nothrow { + m_pipe = pipe; + if (pipe != PipeFD.invalid) { + m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } + } + + this(this) + nothrow { + if (m_pipe != PipeFD.invalid) + eventDriver.pipes.addRef(m_pipe); + } + + ~this() + nothrow { + if (m_pipe != PipeFD.invalid) + releaseHandle!"pipes"(m_pipe, m_context.driver); + } + + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } + + size_t write(in ubyte[] bytes, IOMode mode) + @blocking { + if (bytes.empty) return 0; + + auto res = asyncAwait!(PipeIOCallback, + cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb), + cb => eventDriver.pipes.cancelWrite(m_pipe)); + + switch (res[1]) { + case IOStatus.ok: break; + case IOStatus.disconnected: break; + default: + throw new Exception("Error writing data to pipe."); + } + + return res[2]; + } + + void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } + void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } + + void flush() {} + void finalize() {} + + /** + Close the write end of the pipe immediately. + + Make sure that the pipe is not used after this is called and is released + as soon as possible. Due to implementation detail in eventcore this + reference could conflict with future pipes. + */ + void close() + nothrow { + eventDriver.pipes.close(m_pipe); + } +} + +mixin validateOutputStream!PipeOutputStream; + +/** + A pipe created by `pipe`. +*/ +struct Pipe { + /// Read end of the pipe + PipeInputStream readEnd; + /// Write end of the pipe + PipeOutputStream writeEnd; + + /** + Close both ends of the pipe + */ + void close() + nothrow { + writeEnd.close(); + readEnd.close(); + } +} + +/** + Create a pipe, async equivalent of `std.process.pipe`. + + Returns: + A stream for each end of the pipe. +*/ +Pipe pipe() +{ + auto p = std.process.pipe(); + + auto read = eventDriver.pipes.adopt(p.readEnd.fileno); + auto write = eventDriver.pipes.adopt(p.writeEnd.fileno); + + return Pipe(PipeInputStream(read), PipeOutputStream(write)); +} + +/** + Returned from `pipeProcess`. + + See_Also: `pipeProcess`, `pipeShell` +*/ +struct ProcessPipes { + Process process; + PipeOutputStream stdin; + PipeInputStream stdout; + PipeInputStream stderr; +} + +/** + Equivalent to `std.process.pipeProcess`. + + Returns: + A struct containing the process and created pipes. + + See_Also: `spawnProcess`, `execute` +*/ +ProcessPipes pipeProcess( + scope string[] args, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) +@trusted { + auto stdin = ProcessStdinFile(ProcessRedirect.inherit); + if (Redirect.stdin & redirect) { + stdin = ProcessStdinFile(ProcessRedirect.pipe); + } + + auto stdout = ProcessStdoutFile(ProcessRedirect.inherit); + if (Redirect.stdoutToStderr & redirect) { + stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr); + } else if (Redirect.stdout & redirect) { + stdout = ProcessStdoutFile(ProcessRedirect.pipe); + } + + auto stderr = ProcessStderrFile(ProcessRedirect.inherit); + if (Redirect.stderrToStdout & redirect) { + stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout); + } else if (Redirect.stderr & redirect) { + stderr = ProcessStderrFile(ProcessRedirect.pipe); + } + + auto process = eventDriver.processes.spawn( + args, + stdin, + stdout, + stderr, + env, + config, + workDir.toNativeString()); + + return ProcessPipes( + Process(process.pid), + PipeOutputStream(cast(PipeFD)process.stdin), + PipeInputStream(cast(PipeFD)process.stdout), + PipeInputStream(cast(PipeFD)process.stderr) + ); +} + +/// ditto +ProcessPipes pipeProcess( + scope string program, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) +{ + return pipeProcess( + [program], + redirect, + env, + config, + workDir + ); +} + +/// ditto +ProcessPipes pipeShell( + scope string command, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init, + scope NativePath shellPath = nativeShell) +{ + return pipeProcess( + shellCommand(command, nativeShell), + redirect, + env, + config, + workDir); +} + +/** + Equivalent to `std.process.execute`. + + Returns: + Tuple containing the exit status and process output. + + See_Also: `spawnProcess`, `pipeProcess` +*/ +auto execute( + scope string[] args, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = NativePath.init) +@blocking { + return executeImpl!pipeProcess(args, env, config, maxOutput, workDir); +} + +/// ditto +auto execute( + scope string program, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = NativePath.init) +@blocking @trusted { + return executeImpl!pipeProcess(program, env, config, maxOutput, workDir); +} + +/// ditto +auto executeShell( + scope string command, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = null, + NativePath shellPath = nativeShell) +@blocking { + return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath); +} + +private auto executeImpl(alias spawn, Cmd, Args...)( + Cmd command, + const string[string] env, + Config config, + size_t maxOutput, + scope NativePath workDir, + Args args) +@blocking { + Redirect redirect = Redirect.stdout; + + auto processPipes = spawn(command, redirect, env, config, workDir, args); + + auto stringOutput = processPipes.stdout.collectOutput(maxOutput); + + return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); +} + +/** + Collect the string output of a stream in a blocking fashion. + + Params: + stream = The input stream to collect from. + nbytes = The maximum number of bytes to collect. + + Returns: + The collected data from the stream as a string. +*/ +string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max) +@blocking @trusted if (isInputStream!InputStream) { + auto output = appender!string(); + if (nbytes != size_t.max) { + output.reserve(nbytes); + } + + import vibe.internal.allocator : theAllocator, dispose; + + scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); + scope (exit) theAllocator.dispose(buffer); + + while (!stream.empty && nbytes > 0) { + size_t chunk = min(nbytes, stream.leastSize, buffer.length); + assert(chunk > 0, "leastSize returned zero for non-empty stream."); + + stream.read(buffer[0..chunk]); + output.put(buffer[0..chunk]); + } + + return output.data; +} diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d new file mode 100644 index 0000000..c4a1188 --- /dev/null +++ b/tests/vibe.core.process.d @@ -0,0 +1,200 @@ +/+ dub.sdl: +name "test" +description "Subprocesses" +dependency "vibe-core" path="../" ++/ +module test; + +import core.thread; +import vibe.core.log; +import vibe.core.core; +import vibe.core.process; +import std.array; +import std.range; +import std.algorithm; + +void testEcho() +{ + foreach (i; 0..100) { + auto procPipes = pipeProcess(["echo", "foo bar"], Redirect.stdout); + + assert(!procPipes.process.exited); + + auto output = procPipes.stdout.collectOutput(); + + assert(procPipes.process.wait() == 0); + assert(procPipes.process.exited); + + assert(output == "foo bar\n"); + } +} + +void testCat() +{ + auto procPipes = pipeProcess(["cat"]); + + string output; + auto outputTask = runTask({ + output = procPipes.stdout.collectOutput(); + }); + + auto inputs = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"] + .map!(s => s ~ "\n") + .repeat(4000).join.array; + foreach (input; inputs) { + procPipes.stdin.write(input); + } + + procPipes.stdin.close(); + assert(procPipes.process.wait() == 0); + + outputTask.join(); + + assert(output == inputs.join()); +} + +void testStderr() +{ + auto program = q{ + foreach (line; stdin.byLine()) + stderr.writeln(line); + }; + auto procPipes = pipeProcess(["rdmd", "--eval", program], Redirect.stdin | Redirect.stderr); + + string output; + auto outputTask = runTask({ + output = procPipes.stderr.collectOutput(); + }); + + auto inputs = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"] + .map!(s => s ~ "\n") + .repeat(4000).join.array; + foreach (input; inputs) { + procPipes.stdin.write(input); + } + + procPipes.stdin.close(); + assert(procPipes.process.wait() == 0); + + outputTask.join(); + + assert(output == inputs.join); +} + +void testRandomDeath() +{ + foreach (i; 0..20) { + auto program = q{ + import core.thread; + import std.random; + Thread.sleep(dur!"msecs"(uniform(0, 1000))); + }; + auto process = spawnProcess(["rdmd", "--eval", program]); + + assert(!process.exited); + + sleep(800.msecs); + try { + process.kill(); + } catch (Exception e) {} + process.wait(); + + assert(process.exited); + } +} + +void testIgnoreSigterm() +{ + auto program = q{ + import core.thread; + import core.sys.posix.signal; + + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + + foreach (line; stdin.byLine()) { + writeln(line); + stdout.flush(); + } + + // Zombie + while (true) Thread.sleep(100.dur!"msecs"); + }; + auto procPipes = pipeProcess( + ["rdmd", "--eval", program], + Redirect.stdin | Redirect.stdout | Redirect.stderrToStdout); + + string output; + auto outputTask = runTask({ + output = procPipes.stdout.collectOutput(); + }); + + assert(!procPipes.process.exited); + + // Give the program some time to install the signal handler + sleep(1.seconds); + + procPipes.process.kill(); + procPipes.stdin.write("foo\n"); + + assert(!procPipes.process.exited); + + assert(procPipes.process.waitOrForceKill(2.seconds) == 9); + + assert(procPipes.process.exited); + + outputTask.join(); + + assert(output == "foo\n"); +} + +void testSimpleShell() +{ + auto res = executeShell("echo foo"); + + assert(res.status == 0); + assert(res.output == "foo\n"); +} + +void testLineEndings() +{ + auto program = q{ + write("linux\n"); + write("os9\r"); + write("win\r\n"); + }; + auto res = execute(["rdmd", "--eval", program]); + + assert(res.status == 0); + assert(res.output == "linux\nos9\rwin\r\n"); +} + +void main() +{ + runTask({ + auto tasks = [ + &testEcho, + &testCat, + &testStderr, + &testRandomDeath, + &testIgnoreSigterm, + &testSimpleShell, + &testLineEndings, + ].map!(fn => runTask({ + try { + fn(); + } catch (Exception e) { + logError("%s", e); + throw e; + } + })); + + foreach (task; tasks) { + task.join(); + } + + exitEventLoop(); + }); + + runEventLoop(); +} From 7f11fcf7a6fb3704cadf9309fffd7c22ca4822f4 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 5 May 2019 15:37:35 +1000 Subject: [PATCH 2/9] Candidate windows compilation fix --- source/vibe/core/process.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index 0fb76e2..c3788b6 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -34,7 +34,7 @@ import vibe.core.internal.release; */ Process registerProcess(Pid pid) @trusted { - return registerProcess(pid.osHandle); + return registerProcess(pid.processID); } /// ditto From 83e2d391e67b81ac0cd5e1be3dcc829f82445d2c Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 5 May 2019 17:52:44 +1000 Subject: [PATCH 3/9] 2nd candidate windows compilation fix --- source/vibe/core/process.d | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index c3788b6..4918dc4 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -258,7 +258,10 @@ struct Process { import core.sys.posix.signal : SIGTERM; eventDriver.processes.kill(m_pid, SIGTERM); } - else static assert(0); + else + { + eventDriver.processes.kill(m_pid, 1); + } } /// ditto @@ -279,7 +282,10 @@ struct Process { import core.sys.posix.signal : SIGKILL; eventDriver.processes.kill(m_pid, SIGKILL); } - else static assert(0); + else + { + eventDriver.processes.kill(m_pid, 1); + } } /** From 65980326a11310718d846b7d90a2bdbe336acba1 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 5 May 2019 18:02:26 +1000 Subject: [PATCH 4/9] Add timeouts to avoid the compile time effects of rdmd on tests passing --- tests/vibe.core.process.d | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d index c4a1188..eb184d9 100644 --- a/tests/vibe.core.process.d +++ b/tests/vibe.core.process.d @@ -61,6 +61,9 @@ void testStderr() }; auto procPipes = pipeProcess(["rdmd", "--eval", program], Redirect.stdin | Redirect.stderr); + // Wait for rdmd to compile + sleep(1.seconds); + string output; auto outputTask = runTask({ output = procPipes.stderr.collectOutput(); @@ -83,12 +86,15 @@ void testStderr() void testRandomDeath() { + auto program = q{ + import core.thread; + import std.random; + Thread.sleep(dur!"msecs"(uniform(0, 1000))); + }; + // Prime rdmd + execute(["rdmd", "--eval", program]); + foreach (i; 0..20) { - auto program = q{ - import core.thread; - import std.random; - Thread.sleep(dur!"msecs"(uniform(0, 1000))); - }; auto process = spawnProcess(["rdmd", "--eval", program]); assert(!process.exited); @@ -131,8 +137,8 @@ void testIgnoreSigterm() assert(!procPipes.process.exited); - // Give the program some time to install the signal handler - sleep(1.seconds); + // Give the program some time to compile and install the signal handler + sleep(2.seconds); procPipes.process.kill(); procPipes.stdin.write("foo\n"); From ea3124778b336fb29e279ad31bd3114fcc209d7d Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Tue, 7 May 2019 10:30:11 +1000 Subject: [PATCH 5/9] Increase timeouts --- tests/vibe.core.process.d | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d index eb184d9..840e8fe 100644 --- a/tests/vibe.core.process.d +++ b/tests/vibe.core.process.d @@ -62,7 +62,7 @@ void testStderr() auto procPipes = pipeProcess(["rdmd", "--eval", program], Redirect.stdin | Redirect.stderr); // Wait for rdmd to compile - sleep(1.seconds); + sleep(3.seconds); string output; auto outputTask = runTask({ @@ -138,7 +138,7 @@ void testIgnoreSigterm() assert(!procPipes.process.exited); // Give the program some time to compile and install the signal handler - sleep(2.seconds); + sleep(4.seconds); procPipes.process.kill(); procPipes.stdin.write("foo\n"); From 0d4840b81d7262d2c9e2d616cab88ea353a3c1bc Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 2 Jun 2019 14:42:41 +1000 Subject: [PATCH 6/9] Address review comments --- source/vibe/core/process.d | 5 +++-- tests/vibe.core.process.d | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index 4918dc4..370f707 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -195,7 +195,7 @@ struct Process { /** An operating system handle to the process. */ - @property int osHandle() const nothrow @nogc { return cast(int)m_pid; } + @property int pid() const nothrow @nogc { return cast(int)m_pid; } /** Whether the process has exited. @@ -724,7 +724,7 @@ private auto executeImpl(alias spawn, Cmd, Args...)( return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); } -/** +/* Collect the string output of a stream in a blocking fashion. Params: @@ -734,6 +734,7 @@ private auto executeImpl(alias spawn, Cmd, Args...)( Returns: The collected data from the stream as a string. */ +/// private string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max) @blocking @trusted if (isInputStream!InputStream) { auto output = appender!string(); diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d index 840e8fe..a6a84bd 100644 --- a/tests/vibe.core.process.d +++ b/tests/vibe.core.process.d @@ -5,6 +5,8 @@ dependency "vibe-core" path="../" +/ module test; +static if (__VERSION__ >= 2080) { + import core.thread; import vibe.core.log; import vibe.core.core; @@ -204,3 +206,5 @@ void main() runEventLoop(); } + +} From ab26527e7a2430ff04d590aee885a03305b06727 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Sun, 2 Jun 2019 22:54:59 +1000 Subject: [PATCH 7/9] Fix compiling tests/vibe.core.process.d for versions < 2080 --- tests/vibe.core.process.d | 52 +++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d index a6a84bd..fa195a6 100644 --- a/tests/vibe.core.process.d +++ b/tests/vibe.core.process.d @@ -5,8 +5,6 @@ dependency "vibe-core" path="../" +/ module test; -static if (__VERSION__ >= 2080) { - import core.thread; import vibe.core.log; import vibe.core.core; @@ -179,32 +177,32 @@ void testLineEndings() void main() { - runTask({ - auto tasks = [ - &testEcho, - &testCat, - &testStderr, - &testRandomDeath, - &testIgnoreSigterm, - &testSimpleShell, - &testLineEndings, - ].map!(fn => runTask({ - try { - fn(); - } catch (Exception e) { - logError("%s", e); - throw e; + static if (__VERSION__ >= 2080) { + runTask({ + auto tasks = [ + &testEcho, + &testCat, + &testStderr, + &testRandomDeath, + &testIgnoreSigterm, + &testSimpleShell, + &testLineEndings, + ].map!(fn => runTask({ + try { + fn(); + } catch (Exception e) { + logError("%s", e); + throw e; + } + })); + + foreach (task; tasks) { + task.join(); } - })); - foreach (task; tasks) { - task.join(); - } - - exitEventLoop(); - }); - - runEventLoop(); -} + exitEventLoop(); + }); + runEventLoop(); + } } From 5f5a8314449c3131e68d6fbcce6c5960b9512e37 Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Mon, 3 Jun 2019 10:04:56 +1000 Subject: [PATCH 8/9] Add comment --- tests/vibe.core.process.d | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/vibe.core.process.d b/tests/vibe.core.process.d index fa195a6..d47c9d7 100644 --- a/tests/vibe.core.process.d +++ b/tests/vibe.core.process.d @@ -177,6 +177,7 @@ void testLineEndings() void main() { + // rdmd --eval is only supported in versions >= 2.080 static if (__VERSION__ >= 2080) { runTask({ auto tasks = [ From 68dd4baa44e7759d4d271e54462fdff42ed0caff Mon Sep 17 00:00:00 2001 From: Benjamin Schaaf Date: Tue, 4 Jun 2019 10:13:00 +1000 Subject: [PATCH 9/9] s/registerProcess/adoptProcessID/g --- source/vibe/core/process.d | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index 370f707..aa3b20b 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -30,15 +30,15 @@ import vibe.core.internal.release; started from anywhere including external libraries or std.process. Params: - pid = A Pid or OS process handle + pid = A Pid or OS process id */ -Process registerProcess(Pid pid) +Process adoptProcessID(Pid pid) @trusted { - return registerProcess(pid.processID); + return adoptProcessID(pid.processID); } /// ditto -Process registerProcess(int pid) +Process adoptProcessID(int pid) { return Process(eventDriver.processes.adopt(pid)); }