Merge pull request #154 from BenjaminSchaaf/process

Implement vibe.core.process for subprocess handling similar to std.process
This commit is contained in:
Sönke Ludwig 2019-06-04 11:18:34 +02:00 committed by GitHub
commit 33014fde5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 968 additions and 0 deletions

759
source/vibe/core/process.d Normal file
View file

@ -0,0 +1,759 @@
/**
Functions and structures for dealing with subprocesses and pipes.
This module is modeled after std.process, but provides a fiber-aware
alternative to it. All blocking operations will yield the calling fiber
instead of blocking it.
*/
module vibe.core.process;
public import std.process : Pid, Redirect;
static import std.process;
import core.time;
import std.array;
import std.typecons;
import std.exception : enforce;
import std.algorithm;
import eventcore.core;
import vibe.core.path;
import vibe.core.log;
import vibe.core.stream;
import vibe.internal.async;
import vibe.internal.array : BatchBuffer;
import vibe.core.internal.release;
@safe:
/**
Register a process with vibe for fibre-aware handling. This process can be
started from anywhere including external libraries or std.process.
Params:
pid = A Pid or OS process id
*/
Process adoptProcessID(Pid pid)
@trusted {
return adoptProcessID(pid.processID);
}
/// ditto
Process adoptProcessID(int pid)
{
return Process(eventDriver.processes.adopt(pid));
}
/**
Path to the user's preferred command interpreter.
See_Also: `nativeShell`
*/
@property NativePath userShell() { return NativePath(std.process.userShell); }
/**
The platform specific native shell path.
See_Also: `userShell`
*/
const NativePath nativeShell = NativePath(std.process.nativeShell);
/**
Equivalent to `std.process.Config` except with less flag support
*/
enum Config {
none = ProcessConfig.none,
newEnv = ProcessConfig.newEnv,
suppressConsole = ProcessConfig.suppressConsole,
detached = ProcessConfig.detached,
}
/**
Equivalent to `std.process.spawnProcess`.
Returns:
A reference to the running process.
See_Also: `pipeProcess`, `execute`
*/
Process spawnProcess(
scope string[] args,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init)
@trusted {
return Process(eventDriver.processes.spawn(
args,
ProcessStdinFile(ProcessRedirect.inherit),
ProcessStdoutFile(ProcessRedirect.inherit),
ProcessStderrFile(ProcessRedirect.inherit),
env,
config,
workDir.toNativeString()).pid
);
}
/// ditto
Process spawnProcess(
scope string program,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init)
{
return spawnProcess(
[program],
env,
config,
workDir
);
}
/// ditto
Process spawnShell(
scope string command,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init,
scope NativePath shellPath = nativeShell)
{
return spawnProcess(
shellCommand(command, shellPath),
env,
config,
workDir);
}
private string[] shellCommand(string command, NativePath shellPath)
{
version (Windows)
{
// CMD does not parse its arguments like other programs.
// It does not use CommandLineToArgvW.
// Instead, it treats the first and last quote specially.
// See CMD.EXE /? for details.
return [
std.process.escapeShellFileName(shellPath.toNativeString())
~ ` /C "` ~ command ~ `"`
];
}
else version (Posix)
{
return [
shellPath.toNativeString(),
"-c",
command,
];
}
}
/**
Represents a running process.
*/
struct Process {
private static struct Context {
//Duration waitTimeout;
shared(NativeEventDriver) driver;
}
private {
ProcessID m_pid;
Context* m_context;
}
private this(ProcessID p)
nothrow {
assert(p != ProcessID.invalid);
m_pid = p;
m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this)
nothrow {
if (m_pid != ProcessID.invalid)
eventDriver.processes.addRef(m_pid);
}
~this()
nothrow {
if (m_pid != ProcessID.invalid)
releaseHandle!"processes"(m_pid, m_context.driver);
}
/**
Check whether this is a valid process handle. The process may have
exited already.
*/
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; }
///
unittest {
Process p;
assert(!p);
}
/**
An operating system handle to the process.
*/
@property int pid() const nothrow @nogc { return cast(int)m_pid; }
/**
Whether the process has exited.
*/
@property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); }
/**
Wait for the process to exit, allowing other fibers to continue in the
meantime.
Params:
timeout = Optionally wait until a timeout is reached.
Returns:
The exit code of the process. If a timeout is given and reached, a
null value is returned.
*/
int wait()
@blocking {
return asyncAwaitUninterruptible!(ProcessWaitCallback,
cb => eventDriver.processes.wait(m_pid, cb)
)[1];
}
/// Ditto
Nullable!int wait(Duration timeout)
@blocking {
size_t waitId;
bool cancelled = false;
int code = asyncAwaitUninterruptible!(ProcessWaitCallback,
(cb) nothrow @safe {
waitId = eventDriver.processes.wait(m_pid, cb);
},
(cb) nothrow @safe {
eventDriver.processes.cancelWait(m_pid, waitId);
cancelled = true;
},
)(timeout)[1];
if (cancelled) {
return Nullable!int.init;
} else {
return code.nullable;
}
}
/**
Kill the process.
By default on Linux this sends SIGTERM to the process.
Params:
signal = Optional parameter for the signal to send to the process.
*/
void kill()
{
version (Posix)
{
import core.sys.posix.signal : SIGTERM;
eventDriver.processes.kill(m_pid, SIGTERM);
}
else
{
eventDriver.processes.kill(m_pid, 1);
}
}
/// ditto
void kill(int signal)
{
eventDriver.processes.kill(m_pid, signal);
}
/**
Terminate the process immediately.
On Linux this sends SIGKILL to the process.
*/
void forceKill()
{
version (Posix)
{
import core.sys.posix.signal : SIGKILL;
eventDriver.processes.kill(m_pid, SIGKILL);
}
else
{
eventDriver.processes.kill(m_pid, 1);
}
}
/**
Wait for the process to exit until a timeout is reached. If the process
doesn't exit before the timeout, force kill it.
Returns:
The process exit code.
*/
int waitOrForceKill(Duration timeout)
@blocking {
auto code = wait(timeout);
if (code.isNull) {
forceKill();
return wait();
} else {
return code.get;
}
}
}
/**
A stream for tBatchBufferhe write end of a pipe.
*/
struct PipeInputStream {
private static struct Context {
BatchBuffer!ubyte readBuffer;
shared(NativeEventDriver) driver;
}
private {
PipeFD m_pipe;
Context* m_context;
}
private this(PipeFD pipe)
nothrow {
m_pipe = pipe;
if (pipe != PipeFD.invalid) {
m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
m_context.readBuffer.capacity = 4096;
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
}
this(this)
nothrow {
if (m_pipe != PipeFD.invalid)
eventDriver.pipes.addRef(m_pipe);
}
~this()
nothrow {
if (m_pipe != PipeFD.invalid)
releaseHandle!"pipes"(m_pipe, m_context.driver);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
@property bool empty() @blocking { return leastSize == 0; }
@property ulong leastSize()
@blocking {
waitForData();
return m_context ? m_context.readBuffer.length : 0;
}
@property bool dataAvailableForRead() { return waitForData(0.seconds); }
bool waitForData(Duration timeout = Duration.max)
@blocking {
if (!m_context) return false;
if (m_context.readBuffer.length > 0) return true;
auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
bool cancelled;
IOStatus status;
size_t nbytes;
alias waiter = Waitable!(PipeIOCallback,
cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb),
(cb) {
cancelled = true;
eventDriver.pipes.cancelRead(m_pipe);
},
(pipe, st, nb) {
// Handle closed pipes
if (m_pipe == PipeFD.invalid) {
cancelled = true;
return;
}
assert(pipe == m_pipe);
status = st;
nbytes = nb;
}
);
asyncAwaitAny!(true, waiter)(timeout);
if (cancelled || !m_context) return false;
logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status);
assert(m_context.readBuffer.length == 0);
m_context.readBuffer.putN(nbytes);
switch (status) {
case IOStatus.ok: break;
case IOStatus.disconnected: break;
case IOStatus.wouldBlock:
assert(mode == IOMode.immediate);
break;
default:
logDebug("Error status when waiting for data: %s", status);
break;
}
return m_context.readBuffer.length > 0;
}
const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
size_t read(scope ubyte[] dst, IOMode mode)
@blocking {
if (dst.empty) return 0;
if (m_context.readBuffer.length >= dst.length) {
m_context.readBuffer.read(dst);
return dst.length;
}
size_t nbytes = 0;
while (true) {
if (m_context.readBuffer.length == 0) {
if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
break;
enforce(waitForData(), "Reached end of stream while reading data.");
}
assert(m_context.readBuffer.length > 0);
auto l = min(dst.length, m_context.readBuffer.length);
m_context.readBuffer.read(dst[0 .. l]);
dst = dst[l .. $];
nbytes += l;
if (dst.length == 0)
break;
}
return nbytes;
}
void read(scope ubyte[] dst)
@blocking {
auto r = read(dst, IOMode.all);
assert(r == dst.length);
}
/**
Close the read end of the pipe immediately.
Make sure that the pipe is not used after this is called and is released
as soon as possible. Due to implementation detail in eventcore this
reference could conflict with future pipes.
*/
void close()
nothrow {
eventDriver.pipes.close(m_pipe);
}
}
mixin validateInputStream!PipeInputStream;
/**
Stream for the read end of a pipe.
*/
struct PipeOutputStream {
private static struct Context {
shared(NativeEventDriver) driver;
}
private {
PipeFD m_pipe;
Context* m_context;
}
private this(PipeFD pipe)
nothrow {
m_pipe = pipe;
if (pipe != PipeFD.invalid) {
m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
}
this(this)
nothrow {
if (m_pipe != PipeFD.invalid)
eventDriver.pipes.addRef(m_pipe);
}
~this()
nothrow {
if (m_pipe != PipeFD.invalid)
releaseHandle!"pipes"(m_pipe, m_context.driver);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
size_t write(in ubyte[] bytes, IOMode mode)
@blocking {
if (bytes.empty) return 0;
auto res = asyncAwait!(PipeIOCallback,
cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb),
cb => eventDriver.pipes.cancelWrite(m_pipe));
switch (res[1]) {
case IOStatus.ok: break;
case IOStatus.disconnected: break;
default:
throw new Exception("Error writing data to pipe.");
}
return res[2];
}
void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
void flush() {}
void finalize() {}
/**
Close the write end of the pipe immediately.
Make sure that the pipe is not used after this is called and is released
as soon as possible. Due to implementation detail in eventcore this
reference could conflict with future pipes.
*/
void close()
nothrow {
eventDriver.pipes.close(m_pipe);
}
}
mixin validateOutputStream!PipeOutputStream;
/**
A pipe created by `pipe`.
*/
struct Pipe {
/// Read end of the pipe
PipeInputStream readEnd;
/// Write end of the pipe
PipeOutputStream writeEnd;
/**
Close both ends of the pipe
*/
void close()
nothrow {
writeEnd.close();
readEnd.close();
}
}
/**
Create a pipe, async equivalent of `std.process.pipe`.
Returns:
A stream for each end of the pipe.
*/
Pipe pipe()
{
auto p = std.process.pipe();
auto read = eventDriver.pipes.adopt(p.readEnd.fileno);
auto write = eventDriver.pipes.adopt(p.writeEnd.fileno);
return Pipe(PipeInputStream(read), PipeOutputStream(write));
}
/**
Returned from `pipeProcess`.
See_Also: `pipeProcess`, `pipeShell`
*/
struct ProcessPipes {
Process process;
PipeOutputStream stdin;
PipeInputStream stdout;
PipeInputStream stderr;
}
/**
Equivalent to `std.process.pipeProcess`.
Returns:
A struct containing the process and created pipes.
See_Also: `spawnProcess`, `execute`
*/
ProcessPipes pipeProcess(
scope string[] args,
Redirect redirect = Redirect.all,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init)
@trusted {
auto stdin = ProcessStdinFile(ProcessRedirect.inherit);
if (Redirect.stdin & redirect) {
stdin = ProcessStdinFile(ProcessRedirect.pipe);
}
auto stdout = ProcessStdoutFile(ProcessRedirect.inherit);
if (Redirect.stdoutToStderr & redirect) {
stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr);
} else if (Redirect.stdout & redirect) {
stdout = ProcessStdoutFile(ProcessRedirect.pipe);
}
auto stderr = ProcessStderrFile(ProcessRedirect.inherit);
if (Redirect.stderrToStdout & redirect) {
stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout);
} else if (Redirect.stderr & redirect) {
stderr = ProcessStderrFile(ProcessRedirect.pipe);
}
auto process = eventDriver.processes.spawn(
args,
stdin,
stdout,
stderr,
env,
config,
workDir.toNativeString());
return ProcessPipes(
Process(process.pid),
PipeOutputStream(cast(PipeFD)process.stdin),
PipeInputStream(cast(PipeFD)process.stdout),
PipeInputStream(cast(PipeFD)process.stderr)
);
}
/// ditto
ProcessPipes pipeProcess(
scope string program,
Redirect redirect = Redirect.all,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init)
{
return pipeProcess(
[program],
redirect,
env,
config,
workDir
);
}
/// ditto
ProcessPipes pipeShell(
scope string command,
Redirect redirect = Redirect.all,
const string[string] env = null,
Config config = Config.none,
scope NativePath workDir = NativePath.init,
scope NativePath shellPath = nativeShell)
{
return pipeProcess(
shellCommand(command, nativeShell),
redirect,
env,
config,
workDir);
}
/**
Equivalent to `std.process.execute`.
Returns:
Tuple containing the exit status and process output.
See_Also: `spawnProcess`, `pipeProcess`
*/
auto execute(
scope string[] args,
const string[string] env = null,
Config config = Config.none,
size_t maxOutput = size_t.max,
scope NativePath workDir = NativePath.init)
@blocking {
return executeImpl!pipeProcess(args, env, config, maxOutput, workDir);
}
/// ditto
auto execute(
scope string program,
const string[string] env = null,
Config config = Config.none,
size_t maxOutput = size_t.max,
scope NativePath workDir = NativePath.init)
@blocking @trusted {
return executeImpl!pipeProcess(program, env, config, maxOutput, workDir);
}
/// ditto
auto executeShell(
scope string command,
const string[string] env = null,
Config config = Config.none,
size_t maxOutput = size_t.max,
scope NativePath workDir = null,
NativePath shellPath = nativeShell)
@blocking {
return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath);
}
private auto executeImpl(alias spawn, Cmd, Args...)(
Cmd command,
const string[string] env,
Config config,
size_t maxOutput,
scope NativePath workDir,
Args args)
@blocking {
Redirect redirect = Redirect.stdout;
auto processPipes = spawn(command, redirect, env, config, workDir, args);
auto stringOutput = processPipes.stdout.collectOutput(maxOutput);
return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput);
}
/*
Collect the string output of a stream in a blocking fashion.
Params:
stream = The input stream to collect from.
nbytes = The maximum number of bytes to collect.
Returns:
The collected data from the stream as a string.
*/
/// private
string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max)
@blocking @trusted if (isInputStream!InputStream) {
auto output = appender!string();
if (nbytes != size_t.max) {
output.reserve(nbytes);
}
import vibe.internal.allocator : theAllocator, dispose;
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope (exit) theAllocator.dispose(buffer);
while (!stream.empty && nbytes > 0) {
size_t chunk = min(nbytes, stream.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
stream.read(buffer[0..chunk]);
output.put(buffer[0..chunk]);
}
return output.data;
}

209
tests/vibe.core.process.d Normal file
View file

@ -0,0 +1,209 @@
/+ dub.sdl:
name "test"
description "Subprocesses"
dependency "vibe-core" path="../"
+/
module test;
import core.thread;
import vibe.core.log;
import vibe.core.core;
import vibe.core.process;
import std.array;
import std.range;
import std.algorithm;
void testEcho()
{
foreach (i; 0..100) {
auto procPipes = pipeProcess(["echo", "foo bar"], Redirect.stdout);
assert(!procPipes.process.exited);
auto output = procPipes.stdout.collectOutput();
assert(procPipes.process.wait() == 0);
assert(procPipes.process.exited);
assert(output == "foo bar\n");
}
}
void testCat()
{
auto procPipes = pipeProcess(["cat"]);
string output;
auto outputTask = runTask({
output = procPipes.stdout.collectOutput();
});
auto inputs = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"]
.map!(s => s ~ "\n")
.repeat(4000).join.array;
foreach (input; inputs) {
procPipes.stdin.write(input);
}
procPipes.stdin.close();
assert(procPipes.process.wait() == 0);
outputTask.join();
assert(output == inputs.join());
}
void testStderr()
{
auto program = q{
foreach (line; stdin.byLine())
stderr.writeln(line);
};
auto procPipes = pipeProcess(["rdmd", "--eval", program], Redirect.stdin | Redirect.stderr);
// Wait for rdmd to compile
sleep(3.seconds);
string output;
auto outputTask = runTask({
output = procPipes.stderr.collectOutput();
});
auto inputs = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"]
.map!(s => s ~ "\n")
.repeat(4000).join.array;
foreach (input; inputs) {
procPipes.stdin.write(input);
}
procPipes.stdin.close();
assert(procPipes.process.wait() == 0);
outputTask.join();
assert(output == inputs.join);
}
void testRandomDeath()
{
auto program = q{
import core.thread;
import std.random;
Thread.sleep(dur!"msecs"(uniform(0, 1000)));
};
// Prime rdmd
execute(["rdmd", "--eval", program]);
foreach (i; 0..20) {
auto process = spawnProcess(["rdmd", "--eval", program]);
assert(!process.exited);
sleep(800.msecs);
try {
process.kill();
} catch (Exception e) {}
process.wait();
assert(process.exited);
}
}
void testIgnoreSigterm()
{
auto program = q{
import core.thread;
import core.sys.posix.signal;
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
foreach (line; stdin.byLine()) {
writeln(line);
stdout.flush();
}
// Zombie
while (true) Thread.sleep(100.dur!"msecs");
};
auto procPipes = pipeProcess(
["rdmd", "--eval", program],
Redirect.stdin | Redirect.stdout | Redirect.stderrToStdout);
string output;
auto outputTask = runTask({
output = procPipes.stdout.collectOutput();
});
assert(!procPipes.process.exited);
// Give the program some time to compile and install the signal handler
sleep(4.seconds);
procPipes.process.kill();
procPipes.stdin.write("foo\n");
assert(!procPipes.process.exited);
assert(procPipes.process.waitOrForceKill(2.seconds) == 9);
assert(procPipes.process.exited);
outputTask.join();
assert(output == "foo\n");
}
void testSimpleShell()
{
auto res = executeShell("echo foo");
assert(res.status == 0);
assert(res.output == "foo\n");
}
void testLineEndings()
{
auto program = q{
write("linux\n");
write("os9\r");
write("win\r\n");
};
auto res = execute(["rdmd", "--eval", program]);
assert(res.status == 0);
assert(res.output == "linux\nos9\rwin\r\n");
}
void main()
{
// rdmd --eval is only supported in versions >= 2.080
static if (__VERSION__ >= 2080) {
runTask({
auto tasks = [
&testEcho,
&testCat,
&testStderr,
&testRandomDeath,
&testIgnoreSigterm,
&testSimpleShell,
&testLineEndings,
].map!(fn => runTask({
try {
fn();
} catch (Exception e) {
logError("%s", e);
throw e;
}
}));
foreach (task; tasks) {
task.join();
}
exitEventLoop();
});
runEventLoop();
}
}