759 lines
19 KiB
D
759 lines
19 KiB
D
/**
|
|
Functions and structures for dealing with subprocesses and pipes.
|
|
|
|
This module is modeled after std.process, but provides a fiber-aware
|
|
alternative to it. All blocking operations will yield the calling fiber
|
|
instead of blocking it.
|
|
*/
|
|
module vibe.core.process;
|
|
|
|
public import std.process : Pid, Redirect;
|
|
static import std.process;
|
|
|
|
import core.time;
|
|
import std.array;
|
|
import std.typecons;
|
|
import std.exception : enforce;
|
|
import std.algorithm;
|
|
import eventcore.core;
|
|
import vibe.core.path;
|
|
import vibe.core.log;
|
|
import vibe.core.stream;
|
|
import vibe.internal.async;
|
|
import vibe.internal.array : BatchBuffer;
|
|
import vibe.core.internal.release;
|
|
|
|
@safe:
|
|
|
|
/**
|
|
Register a process with vibe for fibre-aware handling. This process can be
|
|
started from anywhere including external libraries or std.process.
|
|
|
|
Params:
|
|
pid = A Pid or OS process handle
|
|
*/
|
|
Process registerProcess(Pid pid)
|
|
@trusted {
|
|
return registerProcess(pid.processID);
|
|
}
|
|
|
|
/// ditto
|
|
Process registerProcess(int pid)
|
|
{
|
|
return Process(eventDriver.processes.adopt(pid));
|
|
}
|
|
|
|
/**
|
|
Path to the user's preferred command interpreter.
|
|
|
|
See_Also: `nativeShell`
|
|
*/
|
|
@property NativePath userShell() { return NativePath(std.process.userShell); }
|
|
|
|
/**
|
|
The platform specific native shell path.
|
|
|
|
See_Also: `userShell`
|
|
*/
|
|
const NativePath nativeShell = NativePath(std.process.nativeShell);
|
|
|
|
/**
|
|
Equivalent to `std.process.Config` except with less flag support
|
|
*/
|
|
enum Config {
|
|
none = ProcessConfig.none,
|
|
newEnv = ProcessConfig.newEnv,
|
|
suppressConsole = ProcessConfig.suppressConsole,
|
|
detached = ProcessConfig.detached,
|
|
}
|
|
|
|
/**
|
|
Equivalent to `std.process.spawnProcess`.
|
|
|
|
Returns:
|
|
A reference to the running process.
|
|
|
|
See_Also: `pipeProcess`, `execute`
|
|
*/
|
|
Process spawnProcess(
|
|
scope string[] args,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init)
|
|
@trusted {
|
|
return Process(eventDriver.processes.spawn(
|
|
args,
|
|
ProcessStdinFile(ProcessRedirect.inherit),
|
|
ProcessStdoutFile(ProcessRedirect.inherit),
|
|
ProcessStderrFile(ProcessRedirect.inherit),
|
|
env,
|
|
config,
|
|
workDir.toNativeString()).pid
|
|
);
|
|
}
|
|
|
|
/// ditto
|
|
Process spawnProcess(
|
|
scope string program,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init)
|
|
{
|
|
return spawnProcess(
|
|
[program],
|
|
env,
|
|
config,
|
|
workDir
|
|
);
|
|
}
|
|
|
|
/// ditto
|
|
Process spawnShell(
|
|
scope string command,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init,
|
|
scope NativePath shellPath = nativeShell)
|
|
{
|
|
return spawnProcess(
|
|
shellCommand(command, shellPath),
|
|
env,
|
|
config,
|
|
workDir);
|
|
}
|
|
|
|
private string[] shellCommand(string command, NativePath shellPath)
|
|
{
|
|
version (Windows)
|
|
{
|
|
// CMD does not parse its arguments like other programs.
|
|
// It does not use CommandLineToArgvW.
|
|
// Instead, it treats the first and last quote specially.
|
|
// See CMD.EXE /? for details.
|
|
return [
|
|
std.process.escapeShellFileName(shellPath.toNativeString())
|
|
~ ` /C "` ~ command ~ `"`
|
|
];
|
|
}
|
|
else version (Posix)
|
|
{
|
|
return [
|
|
shellPath.toNativeString(),
|
|
"-c",
|
|
command,
|
|
];
|
|
}
|
|
}
|
|
|
|
/**
|
|
Represents a running process.
|
|
*/
|
|
struct Process {
|
|
private static struct Context {
|
|
//Duration waitTimeout;
|
|
shared(NativeEventDriver) driver;
|
|
}
|
|
|
|
private {
|
|
ProcessID m_pid;
|
|
Context* m_context;
|
|
}
|
|
|
|
private this(ProcessID p)
|
|
nothrow {
|
|
assert(p != ProcessID.invalid);
|
|
m_pid = p;
|
|
m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } ();
|
|
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
|
|
}
|
|
|
|
this(this)
|
|
nothrow {
|
|
if (m_pid != ProcessID.invalid)
|
|
eventDriver.processes.addRef(m_pid);
|
|
}
|
|
|
|
~this()
|
|
nothrow {
|
|
if (m_pid != ProcessID.invalid)
|
|
releaseHandle!"processes"(m_pid, m_context.driver);
|
|
}
|
|
|
|
/**
|
|
Check whether this is a valid process handle. The process may have
|
|
exited already.
|
|
*/
|
|
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; }
|
|
|
|
///
|
|
unittest {
|
|
Process p;
|
|
|
|
assert(!p);
|
|
}
|
|
|
|
/**
|
|
An operating system handle to the process.
|
|
*/
|
|
@property int osHandle() const nothrow @nogc { return cast(int)m_pid; }
|
|
|
|
/**
|
|
Whether the process has exited.
|
|
*/
|
|
@property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); }
|
|
|
|
/**
|
|
Wait for the process to exit, allowing other fibers to continue in the
|
|
meantime.
|
|
|
|
Params:
|
|
timeout = Optionally wait until a timeout is reached.
|
|
|
|
Returns:
|
|
The exit code of the process. If a timeout is given and reached, a
|
|
null value is returned.
|
|
*/
|
|
int wait()
|
|
@blocking {
|
|
return asyncAwaitUninterruptible!(ProcessWaitCallback,
|
|
cb => eventDriver.processes.wait(m_pid, cb)
|
|
)[1];
|
|
}
|
|
|
|
/// Ditto
|
|
Nullable!int wait(Duration timeout)
|
|
@blocking {
|
|
size_t waitId;
|
|
bool cancelled = false;
|
|
|
|
int code = asyncAwaitUninterruptible!(ProcessWaitCallback,
|
|
(cb) nothrow @safe {
|
|
waitId = eventDriver.processes.wait(m_pid, cb);
|
|
},
|
|
(cb) nothrow @safe {
|
|
eventDriver.processes.cancelWait(m_pid, waitId);
|
|
cancelled = true;
|
|
},
|
|
)(timeout)[1];
|
|
|
|
if (cancelled) {
|
|
return Nullable!int.init;
|
|
} else {
|
|
return code.nullable;
|
|
}
|
|
}
|
|
|
|
/**
|
|
Kill the process.
|
|
|
|
By default on Linux this sends SIGTERM to the process.
|
|
|
|
Params:
|
|
signal = Optional parameter for the signal to send to the process.
|
|
*/
|
|
void kill()
|
|
{
|
|
version (Posix)
|
|
{
|
|
import core.sys.posix.signal : SIGTERM;
|
|
eventDriver.processes.kill(m_pid, SIGTERM);
|
|
}
|
|
else
|
|
{
|
|
eventDriver.processes.kill(m_pid, 1);
|
|
}
|
|
}
|
|
|
|
/// ditto
|
|
void kill(int signal)
|
|
{
|
|
eventDriver.processes.kill(m_pid, signal);
|
|
}
|
|
|
|
/**
|
|
Terminate the process immediately.
|
|
|
|
On Linux this sends SIGKILL to the process.
|
|
*/
|
|
void forceKill()
|
|
{
|
|
version (Posix)
|
|
{
|
|
import core.sys.posix.signal : SIGKILL;
|
|
eventDriver.processes.kill(m_pid, SIGKILL);
|
|
}
|
|
else
|
|
{
|
|
eventDriver.processes.kill(m_pid, 1);
|
|
}
|
|
}
|
|
|
|
/**
|
|
Wait for the process to exit until a timeout is reached. If the process
|
|
doesn't exit before the timeout, force kill it.
|
|
|
|
Returns:
|
|
The process exit code.
|
|
*/
|
|
int waitOrForceKill(Duration timeout)
|
|
@blocking {
|
|
auto code = wait(timeout);
|
|
|
|
if (code.isNull) {
|
|
forceKill();
|
|
return wait();
|
|
} else {
|
|
return code.get;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
A stream for tBatchBufferhe write end of a pipe.
|
|
*/
|
|
struct PipeInputStream {
|
|
private static struct Context {
|
|
BatchBuffer!ubyte readBuffer;
|
|
shared(NativeEventDriver) driver;
|
|
}
|
|
|
|
private {
|
|
PipeFD m_pipe;
|
|
Context* m_context;
|
|
}
|
|
|
|
private this(PipeFD pipe)
|
|
nothrow {
|
|
m_pipe = pipe;
|
|
if (pipe != PipeFD.invalid) {
|
|
m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
|
|
m_context.readBuffer.capacity = 4096;
|
|
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
|
|
}
|
|
}
|
|
|
|
this(this)
|
|
nothrow {
|
|
if (m_pipe != PipeFD.invalid)
|
|
eventDriver.pipes.addRef(m_pipe);
|
|
}
|
|
|
|
~this()
|
|
nothrow {
|
|
if (m_pipe != PipeFD.invalid)
|
|
releaseHandle!"pipes"(m_pipe, m_context.driver);
|
|
}
|
|
|
|
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
|
|
|
|
@property bool empty() @blocking { return leastSize == 0; }
|
|
@property ulong leastSize()
|
|
@blocking {
|
|
waitForData();
|
|
return m_context ? m_context.readBuffer.length : 0;
|
|
}
|
|
@property bool dataAvailableForRead() { return waitForData(0.seconds); }
|
|
|
|
bool waitForData(Duration timeout = Duration.max)
|
|
@blocking {
|
|
if (!m_context) return false;
|
|
if (m_context.readBuffer.length > 0) return true;
|
|
auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
|
|
|
|
bool cancelled;
|
|
IOStatus status;
|
|
size_t nbytes;
|
|
|
|
alias waiter = Waitable!(PipeIOCallback,
|
|
cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb),
|
|
(cb) {
|
|
cancelled = true;
|
|
eventDriver.pipes.cancelRead(m_pipe);
|
|
},
|
|
(pipe, st, nb) {
|
|
// Handle closed pipes
|
|
if (m_pipe == PipeFD.invalid) {
|
|
cancelled = true;
|
|
return;
|
|
}
|
|
|
|
assert(pipe == m_pipe);
|
|
status = st;
|
|
nbytes = nb;
|
|
}
|
|
);
|
|
|
|
asyncAwaitAny!(true, waiter)(timeout);
|
|
|
|
if (cancelled || !m_context) return false;
|
|
|
|
logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status);
|
|
|
|
assert(m_context.readBuffer.length == 0);
|
|
m_context.readBuffer.putN(nbytes);
|
|
switch (status) {
|
|
case IOStatus.ok: break;
|
|
case IOStatus.disconnected: break;
|
|
case IOStatus.wouldBlock:
|
|
assert(mode == IOMode.immediate);
|
|
break;
|
|
default:
|
|
logDebug("Error status when waiting for data: %s", status);
|
|
break;
|
|
}
|
|
|
|
return m_context.readBuffer.length > 0;
|
|
}
|
|
|
|
const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
|
|
|
|
size_t read(scope ubyte[] dst, IOMode mode)
|
|
@blocking {
|
|
if (dst.empty) return 0;
|
|
|
|
if (m_context.readBuffer.length >= dst.length) {
|
|
m_context.readBuffer.read(dst);
|
|
return dst.length;
|
|
}
|
|
|
|
size_t nbytes = 0;
|
|
|
|
while (true) {
|
|
if (m_context.readBuffer.length == 0) {
|
|
if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
|
|
break;
|
|
|
|
enforce(waitForData(), "Reached end of stream while reading data.");
|
|
}
|
|
|
|
assert(m_context.readBuffer.length > 0);
|
|
auto l = min(dst.length, m_context.readBuffer.length);
|
|
m_context.readBuffer.read(dst[0 .. l]);
|
|
dst = dst[l .. $];
|
|
nbytes += l;
|
|
if (dst.length == 0)
|
|
break;
|
|
}
|
|
|
|
return nbytes;
|
|
}
|
|
|
|
void read(scope ubyte[] dst)
|
|
@blocking {
|
|
auto r = read(dst, IOMode.all);
|
|
assert(r == dst.length);
|
|
}
|
|
|
|
/**
|
|
Close the read end of the pipe immediately.
|
|
|
|
Make sure that the pipe is not used after this is called and is released
|
|
as soon as possible. Due to implementation detail in eventcore this
|
|
reference could conflict with future pipes.
|
|
*/
|
|
void close()
|
|
nothrow {
|
|
eventDriver.pipes.close(m_pipe);
|
|
}
|
|
}
|
|
|
|
mixin validateInputStream!PipeInputStream;
|
|
|
|
/**
|
|
Stream for the read end of a pipe.
|
|
*/
|
|
struct PipeOutputStream {
|
|
private static struct Context {
|
|
shared(NativeEventDriver) driver;
|
|
}
|
|
|
|
private {
|
|
PipeFD m_pipe;
|
|
Context* m_context;
|
|
}
|
|
|
|
private this(PipeFD pipe)
|
|
nothrow {
|
|
m_pipe = pipe;
|
|
if (pipe != PipeFD.invalid) {
|
|
m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
|
|
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
|
|
}
|
|
}
|
|
|
|
this(this)
|
|
nothrow {
|
|
if (m_pipe != PipeFD.invalid)
|
|
eventDriver.pipes.addRef(m_pipe);
|
|
}
|
|
|
|
~this()
|
|
nothrow {
|
|
if (m_pipe != PipeFD.invalid)
|
|
releaseHandle!"pipes"(m_pipe, m_context.driver);
|
|
}
|
|
|
|
bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
|
|
|
|
size_t write(in ubyte[] bytes, IOMode mode)
|
|
@blocking {
|
|
if (bytes.empty) return 0;
|
|
|
|
auto res = asyncAwait!(PipeIOCallback,
|
|
cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb),
|
|
cb => eventDriver.pipes.cancelWrite(m_pipe));
|
|
|
|
switch (res[1]) {
|
|
case IOStatus.ok: break;
|
|
case IOStatus.disconnected: break;
|
|
default:
|
|
throw new Exception("Error writing data to pipe.");
|
|
}
|
|
|
|
return res[2];
|
|
}
|
|
|
|
void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
|
|
void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
|
|
|
|
void flush() {}
|
|
void finalize() {}
|
|
|
|
/**
|
|
Close the write end of the pipe immediately.
|
|
|
|
Make sure that the pipe is not used after this is called and is released
|
|
as soon as possible. Due to implementation detail in eventcore this
|
|
reference could conflict with future pipes.
|
|
*/
|
|
void close()
|
|
nothrow {
|
|
eventDriver.pipes.close(m_pipe);
|
|
}
|
|
}
|
|
|
|
mixin validateOutputStream!PipeOutputStream;
|
|
|
|
/**
|
|
A pipe created by `pipe`.
|
|
*/
|
|
struct Pipe {
|
|
/// Read end of the pipe
|
|
PipeInputStream readEnd;
|
|
/// Write end of the pipe
|
|
PipeOutputStream writeEnd;
|
|
|
|
/**
|
|
Close both ends of the pipe
|
|
*/
|
|
void close()
|
|
nothrow {
|
|
writeEnd.close();
|
|
readEnd.close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
Create a pipe, async equivalent of `std.process.pipe`.
|
|
|
|
Returns:
|
|
A stream for each end of the pipe.
|
|
*/
|
|
Pipe pipe()
|
|
{
|
|
auto p = std.process.pipe();
|
|
|
|
auto read = eventDriver.pipes.adopt(p.readEnd.fileno);
|
|
auto write = eventDriver.pipes.adopt(p.writeEnd.fileno);
|
|
|
|
return Pipe(PipeInputStream(read), PipeOutputStream(write));
|
|
}
|
|
|
|
/**
|
|
Returned from `pipeProcess`.
|
|
|
|
See_Also: `pipeProcess`, `pipeShell`
|
|
*/
|
|
struct ProcessPipes {
|
|
Process process;
|
|
PipeOutputStream stdin;
|
|
PipeInputStream stdout;
|
|
PipeInputStream stderr;
|
|
}
|
|
|
|
/**
|
|
Equivalent to `std.process.pipeProcess`.
|
|
|
|
Returns:
|
|
A struct containing the process and created pipes.
|
|
|
|
See_Also: `spawnProcess`, `execute`
|
|
*/
|
|
ProcessPipes pipeProcess(
|
|
scope string[] args,
|
|
Redirect redirect = Redirect.all,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init)
|
|
@trusted {
|
|
auto stdin = ProcessStdinFile(ProcessRedirect.inherit);
|
|
if (Redirect.stdin & redirect) {
|
|
stdin = ProcessStdinFile(ProcessRedirect.pipe);
|
|
}
|
|
|
|
auto stdout = ProcessStdoutFile(ProcessRedirect.inherit);
|
|
if (Redirect.stdoutToStderr & redirect) {
|
|
stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr);
|
|
} else if (Redirect.stdout & redirect) {
|
|
stdout = ProcessStdoutFile(ProcessRedirect.pipe);
|
|
}
|
|
|
|
auto stderr = ProcessStderrFile(ProcessRedirect.inherit);
|
|
if (Redirect.stderrToStdout & redirect) {
|
|
stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout);
|
|
} else if (Redirect.stderr & redirect) {
|
|
stderr = ProcessStderrFile(ProcessRedirect.pipe);
|
|
}
|
|
|
|
auto process = eventDriver.processes.spawn(
|
|
args,
|
|
stdin,
|
|
stdout,
|
|
stderr,
|
|
env,
|
|
config,
|
|
workDir.toNativeString());
|
|
|
|
return ProcessPipes(
|
|
Process(process.pid),
|
|
PipeOutputStream(cast(PipeFD)process.stdin),
|
|
PipeInputStream(cast(PipeFD)process.stdout),
|
|
PipeInputStream(cast(PipeFD)process.stderr)
|
|
);
|
|
}
|
|
|
|
/// ditto
|
|
ProcessPipes pipeProcess(
|
|
scope string program,
|
|
Redirect redirect = Redirect.all,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init)
|
|
{
|
|
return pipeProcess(
|
|
[program],
|
|
redirect,
|
|
env,
|
|
config,
|
|
workDir
|
|
);
|
|
}
|
|
|
|
/// ditto
|
|
ProcessPipes pipeShell(
|
|
scope string command,
|
|
Redirect redirect = Redirect.all,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
scope NativePath workDir = NativePath.init,
|
|
scope NativePath shellPath = nativeShell)
|
|
{
|
|
return pipeProcess(
|
|
shellCommand(command, nativeShell),
|
|
redirect,
|
|
env,
|
|
config,
|
|
workDir);
|
|
}
|
|
|
|
/**
|
|
Equivalent to `std.process.execute`.
|
|
|
|
Returns:
|
|
Tuple containing the exit status and process output.
|
|
|
|
See_Also: `spawnProcess`, `pipeProcess`
|
|
*/
|
|
auto execute(
|
|
scope string[] args,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
size_t maxOutput = size_t.max,
|
|
scope NativePath workDir = NativePath.init)
|
|
@blocking {
|
|
return executeImpl!pipeProcess(args, env, config, maxOutput, workDir);
|
|
}
|
|
|
|
/// ditto
|
|
auto execute(
|
|
scope string program,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
size_t maxOutput = size_t.max,
|
|
scope NativePath workDir = NativePath.init)
|
|
@blocking @trusted {
|
|
return executeImpl!pipeProcess(program, env, config, maxOutput, workDir);
|
|
}
|
|
|
|
/// ditto
|
|
auto executeShell(
|
|
scope string command,
|
|
const string[string] env = null,
|
|
Config config = Config.none,
|
|
size_t maxOutput = size_t.max,
|
|
scope NativePath workDir = null,
|
|
NativePath shellPath = nativeShell)
|
|
@blocking {
|
|
return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath);
|
|
}
|
|
|
|
private auto executeImpl(alias spawn, Cmd, Args...)(
|
|
Cmd command,
|
|
const string[string] env,
|
|
Config config,
|
|
size_t maxOutput,
|
|
scope NativePath workDir,
|
|
Args args)
|
|
@blocking {
|
|
Redirect redirect = Redirect.stdout;
|
|
|
|
auto processPipes = spawn(command, redirect, env, config, workDir, args);
|
|
|
|
auto stringOutput = processPipes.stdout.collectOutput(maxOutput);
|
|
|
|
return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput);
|
|
}
|
|
|
|
/**
|
|
Collect the string output of a stream in a blocking fashion.
|
|
|
|
Params:
|
|
stream = The input stream to collect from.
|
|
nbytes = The maximum number of bytes to collect.
|
|
|
|
Returns:
|
|
The collected data from the stream as a string.
|
|
*/
|
|
string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max)
|
|
@blocking @trusted if (isInputStream!InputStream) {
|
|
auto output = appender!string();
|
|
if (nbytes != size_t.max) {
|
|
output.reserve(nbytes);
|
|
}
|
|
|
|
import vibe.internal.allocator : theAllocator, dispose;
|
|
|
|
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
|
|
scope (exit) theAllocator.dispose(buffer);
|
|
|
|
while (!stream.empty && nbytes > 0) {
|
|
size_t chunk = min(nbytes, stream.leastSize, buffer.length);
|
|
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
|
|
|
|
stream.read(buffer[0..chunk]);
|
|
output.put(buffer[0..chunk]);
|
|
}
|
|
|
|
return output.data;
|
|
}
|