From a4e87237e45a30f20c1e556be27845e426310b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 17 May 2020 15:10:01 +0200 Subject: [PATCH 1/4] Remove potentially blocking file I/O code (upgrade to eventcore 0.9.0). - file and pipe closing are now asynchronous operations - moveFile and removeFile are now executed in a worker thread - requires eventcore ~>0.9.0 --- dub.sdl | 2 +- source/vibe/core/file.d | 49 ++++++++++++++++++++--------- source/vibe/core/internal/release.d | 7 +++-- source/vibe/core/process.d | 18 +++++++++-- 4 files changed, 56 insertions(+), 20 deletions(-) diff --git a/dub.sdl b/dub.sdl index 1dcfb87..462843c 100644 --- a/dub.sdl +++ b/dub.sdl @@ -4,7 +4,7 @@ authors "Sönke Ludwig" copyright "Copyright © 2016-2020, Sönke Ludwig" license "MIT" -dependency "eventcore" version="~>0.8.43" +dependency "eventcore" version="~>0.9.0" dependency "stdx-allocator" version="~>2.77.0" targetName "vibe_core" diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index d679b22..697f439 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -190,16 +190,21 @@ void moveFile(NativePath from, NativePath to, bool copy_fallback = false) /// ditto void moveFile(string from, string to, bool copy_fallback = false) { - if (!copy_fallback) { - std.file.rename(from, to); - } else { + auto fail = performInWorker((string from, string to) { try { std.file.rename(from, to); - } catch (FileException e) { - copyFile(from, to); - std.file.remove(from); + } catch (Exception e) { + return e.msg.length ? e.msg : "Failed to move file."; } - } + return null; + }, from, to); + + if (!fail.length) return; + + if (!copy_fallback) throw new Exception(fail); + + copyFile(from, to); + removeFile(from); } /** @@ -236,6 +241,7 @@ void copyFile(NativePath from, NativePath to, bool overwrite = false) enforce(overwrite || !existsFile(to), "Destination file already exists."); auto dst = openFile(to, FileMode.createTrunc); scope(exit) dst.close(); + dst.truncate(src.size); dst.write(src); } @@ -267,7 +273,16 @@ void removeFile(NativePath path) /// ditto void removeFile(string path) { - std.file.remove(path); + auto fail = performInWorker((string path) { + try { + std.file.remove(path); + } catch (Exception e) { + return e.msg.length ? e.msg : "Failed to delete file."; + } + return null; + }, path); + + if (fail.length) throw new Exception(fail); } /** @@ -567,12 +582,18 @@ struct FileStream { /// Closes the file handle. void close() { - if (m_fd != FileFD.init) { - eventDriver.files.close(m_fd); // FIXME: may leave dangling references! - releaseHandle!"files"(m_fd, m_ctx.driver); - m_fd = FileFD.init; - m_ctx = null; - } + if (m_fd == FileFD.invalid) return; + if (!eventDriver.files.isValid(m_fd)) return; + + auto res = asyncAwaitUninterruptible!(FileCloseCallback, + cb => eventDriver.files.close(m_fd, cb) + ); + releaseHandle!"files"(m_fd, m_ctx.driver); + m_fd = FileFD.invalid; + m_ctx = null; + + if (res[1] != CloseStatus.ok) + throw new Exception("Failed to close file"); } @property bool empty() const { assert(this.readable); return ctx.ptr >= ctx.size; } diff --git a/source/vibe/core/internal/release.d b/source/vibe/core/internal/release.d index 33dac9c..a99a428 100644 --- a/source/vibe/core/internal/release.d +++ b/source/vibe/core/internal/release.d @@ -1,6 +1,7 @@ module vibe.core.internal.release; import eventcore.core; +import std.stdint : intptr_t; /// Release a handle in a thread-safe way void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv) @@ -19,8 +20,8 @@ void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv) // in case the destructor was called from a foreign thread, // perform the release in the owner thread - drv.core.runInOwnerThread((h) { - __traits(getMember, eventDriver, subsys).releaseRef(cast(H)h); - }, cast(size_t)handle); + drv.core.runInOwnerThread((H handle) { + __traits(getMember, eventDriver, subsys).releaseRef(handle); + }, handle); } } diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index aa3b20b..0d0d4f8 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -453,7 +453,14 @@ struct PipeInputStream { */ void close() nothrow { - eventDriver.pipes.close(m_pipe); + if (m_pipe == PipeFD.invalid) return; + + asyncAwaitUninterruptible!(PipeCloseCallback, + cb => eventDriver.pipes.close(m_pipe, cb) + ); + + eventDriver.pipes.releaseRef(m_pipe); + m_pipe = PipeFD.invalid; } } @@ -528,7 +535,14 @@ struct PipeOutputStream { */ void close() nothrow { - eventDriver.pipes.close(m_pipe); + if (m_pipe == PipeFD.invalid) return; + + asyncAwaitUninterruptible!(PipeCloseCallback, + cb => eventDriver.pipes.close(m_pipe, cb) + ); + + eventDriver.pipes.releaseRef(m_pipe); + m_pipe = PipeFD.invalid; } } From fb27359214cf02860c5eea27ae2a07073dca8cd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 18 May 2020 16:42:28 +0200 Subject: [PATCH 2/4] Fix indentation. --- source/vibe/core/process.d | 1044 ++++++++++++++++++------------------ 1 file changed, 522 insertions(+), 522 deletions(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index 0d0d4f8..1d8db70 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -1,9 +1,9 @@ /** - Functions and structures for dealing with subprocesses and pipes. + Functions and structures for dealing with subprocesses and pipes. - This module is modeled after std.process, but provides a fiber-aware - alternative to it. All blocking operations will yield the calling fiber - instead of blocking it. + This module is modeled after std.process, but provides a fiber-aware + alternative to it. All blocking operations will yield the calling fiber + instead of blocking it. */ module vibe.core.process; @@ -26,433 +26,433 @@ import vibe.core.internal.release; @safe: /** - Register a process with vibe for fibre-aware handling. This process can be - started from anywhere including external libraries or std.process. + Register a process with vibe for fibre-aware handling. This process can be + started from anywhere including external libraries or std.process. - Params: - pid = A Pid or OS process id + Params: + pid = A Pid or OS process id */ Process adoptProcessID(Pid pid) @trusted { - return adoptProcessID(pid.processID); + return adoptProcessID(pid.processID); } /// ditto Process adoptProcessID(int pid) { - return Process(eventDriver.processes.adopt(pid)); + return Process(eventDriver.processes.adopt(pid)); } /** - Path to the user's preferred command interpreter. + Path to the user's preferred command interpreter. - See_Also: `nativeShell` + See_Also: `nativeShell` */ @property NativePath userShell() { return NativePath(std.process.userShell); } /** - The platform specific native shell path. + The platform specific native shell path. - See_Also: `userShell` + See_Also: `userShell` */ const NativePath nativeShell = NativePath(std.process.nativeShell); /** - Equivalent to `std.process.Config` except with less flag support + Equivalent to `std.process.Config` except with less flag support */ enum Config { - none = ProcessConfig.none, - newEnv = ProcessConfig.newEnv, - suppressConsole = ProcessConfig.suppressConsole, - detached = ProcessConfig.detached, + none = ProcessConfig.none, + newEnv = ProcessConfig.newEnv, + suppressConsole = ProcessConfig.suppressConsole, + detached = ProcessConfig.detached, } /** - Equivalent to `std.process.spawnProcess`. + Equivalent to `std.process.spawnProcess`. - Returns: - A reference to the running process. + Returns: + A reference to the running process. - See_Also: `pipeProcess`, `execute` + See_Also: `pipeProcess`, `execute` */ Process spawnProcess( - scope string[] args, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init) + scope string[] args, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) @trusted { - return Process(eventDriver.processes.spawn( - args, - ProcessStdinFile(ProcessRedirect.inherit), - ProcessStdoutFile(ProcessRedirect.inherit), - ProcessStderrFile(ProcessRedirect.inherit), - env, - config, - workDir.toNativeString()).pid - ); + return Process(eventDriver.processes.spawn( + args, + ProcessStdinFile(ProcessRedirect.inherit), + ProcessStdoutFile(ProcessRedirect.inherit), + ProcessStderrFile(ProcessRedirect.inherit), + env, + config, + workDir.toNativeString()).pid + ); } /// ditto Process spawnProcess( - scope string program, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init) + scope string program, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) { - return spawnProcess( - [program], - env, - config, - workDir - ); + return spawnProcess( + [program], + env, + config, + workDir + ); } /// ditto Process spawnShell( - scope string command, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init, - scope NativePath shellPath = nativeShell) + scope string command, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init, + scope NativePath shellPath = nativeShell) { - return spawnProcess( - shellCommand(command, shellPath), - env, - config, - workDir); + return spawnProcess( + shellCommand(command, shellPath), + env, + config, + workDir); } private string[] shellCommand(string command, NativePath shellPath) { - version (Windows) - { - // CMD does not parse its arguments like other programs. - // It does not use CommandLineToArgvW. - // Instead, it treats the first and last quote specially. - // See CMD.EXE /? for details. - return [ - std.process.escapeShellFileName(shellPath.toNativeString()) - ~ ` /C "` ~ command ~ `"` - ]; - } - else version (Posix) - { - return [ - shellPath.toNativeString(), - "-c", - command, - ]; - } + version (Windows) + { + // CMD does not parse its arguments like other programs. + // It does not use CommandLineToArgvW. + // Instead, it treats the first and last quote specially. + // See CMD.EXE /? for details. + return [ + std.process.escapeShellFileName(shellPath.toNativeString()) + ~ ` /C "` ~ command ~ `"` + ]; + } + else version (Posix) + { + return [ + shellPath.toNativeString(), + "-c", + command, + ]; + } } /** - Represents a running process. + Represents a running process. */ struct Process { - private static struct Context { - //Duration waitTimeout; - shared(NativeEventDriver) driver; - } + private static struct Context { + //Duration waitTimeout; + shared(NativeEventDriver) driver; + } - private { - ProcessID m_pid; - Context* m_context; - } + private { + ProcessID m_pid; + Context* m_context; + } - private this(ProcessID p) - nothrow { - assert(p != ProcessID.invalid); - m_pid = p; - m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } (); - m_context.driver = () @trusted { return cast(shared)eventDriver; } (); - } + private this(ProcessID p) + nothrow { + assert(p != ProcessID.invalid); + m_pid = p; + m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } - this(this) - nothrow { - if (m_pid != ProcessID.invalid) - eventDriver.processes.addRef(m_pid); - } + this(this) + nothrow { + if (m_pid != ProcessID.invalid) + eventDriver.processes.addRef(m_pid); + } - ~this() - nothrow { - if (m_pid != ProcessID.invalid) - releaseHandle!"processes"(m_pid, m_context.driver); - } + ~this() + nothrow { + if (m_pid != ProcessID.invalid) + releaseHandle!"processes"(m_pid, m_context.driver); + } - /** - Check whether this is a valid process handle. The process may have - exited already. - */ - bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; } + /** + Check whether this is a valid process handle. The process may have + exited already. + */ + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; } - /// - unittest { - Process p; + /// + unittest { + Process p; - assert(!p); - } + assert(!p); + } - /** - An operating system handle to the process. - */ - @property int pid() const nothrow @nogc { return cast(int)m_pid; } + /** + An operating system handle to the process. + */ + @property int pid() const nothrow @nogc { return cast(int)m_pid; } - /** - Whether the process has exited. - */ - @property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); } + /** + Whether the process has exited. + */ + @property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); } - /** - Wait for the process to exit, allowing other fibers to continue in the - meantime. + /** + Wait for the process to exit, allowing other fibers to continue in the + meantime. - Params: - timeout = Optionally wait until a timeout is reached. + Params: + timeout = Optionally wait until a timeout is reached. - Returns: - The exit code of the process. If a timeout is given and reached, a - null value is returned. - */ - int wait() - @blocking { - return asyncAwaitUninterruptible!(ProcessWaitCallback, - cb => eventDriver.processes.wait(m_pid, cb) - )[1]; - } + Returns: + The exit code of the process. If a timeout is given and reached, a + null value is returned. + */ + int wait() + @blocking { + return asyncAwaitUninterruptible!(ProcessWaitCallback, + cb => eventDriver.processes.wait(m_pid, cb) + )[1]; + } - /// Ditto - Nullable!int wait(Duration timeout) - @blocking { - size_t waitId; - bool cancelled = false; + /// Ditto + Nullable!int wait(Duration timeout) + @blocking { + size_t waitId; + bool cancelled = false; - int code = asyncAwaitUninterruptible!(ProcessWaitCallback, - (cb) nothrow @safe { - waitId = eventDriver.processes.wait(m_pid, cb); - }, - (cb) nothrow @safe { - eventDriver.processes.cancelWait(m_pid, waitId); - cancelled = true; - }, - )(timeout)[1]; + int code = asyncAwaitUninterruptible!(ProcessWaitCallback, + (cb) nothrow @safe { + waitId = eventDriver.processes.wait(m_pid, cb); + }, + (cb) nothrow @safe { + eventDriver.processes.cancelWait(m_pid, waitId); + cancelled = true; + }, + )(timeout)[1]; - if (cancelled) { - return Nullable!int.init; - } else { - return code.nullable; - } - } + if (cancelled) { + return Nullable!int.init; + } else { + return code.nullable; + } + } - /** - Kill the process. + /** + Kill the process. - By default on Linux this sends SIGTERM to the process. + By default on Linux this sends SIGTERM to the process. - Params: - signal = Optional parameter for the signal to send to the process. - */ - void kill() - { - version (Posix) - { - import core.sys.posix.signal : SIGTERM; - eventDriver.processes.kill(m_pid, SIGTERM); - } - else - { - eventDriver.processes.kill(m_pid, 1); - } - } + Params: + signal = Optional parameter for the signal to send to the process. + */ + void kill() + { + version (Posix) + { + import core.sys.posix.signal : SIGTERM; + eventDriver.processes.kill(m_pid, SIGTERM); + } + else + { + eventDriver.processes.kill(m_pid, 1); + } + } - /// ditto - void kill(int signal) - { - eventDriver.processes.kill(m_pid, signal); - } + /// ditto + void kill(int signal) + { + eventDriver.processes.kill(m_pid, signal); + } - /** - Terminate the process immediately. + /** + Terminate the process immediately. - On Linux this sends SIGKILL to the process. - */ - void forceKill() - { - version (Posix) - { - import core.sys.posix.signal : SIGKILL; - eventDriver.processes.kill(m_pid, SIGKILL); - } - else - { - eventDriver.processes.kill(m_pid, 1); - } - } + On Linux this sends SIGKILL to the process. + */ + void forceKill() + { + version (Posix) + { + import core.sys.posix.signal : SIGKILL; + eventDriver.processes.kill(m_pid, SIGKILL); + } + else + { + eventDriver.processes.kill(m_pid, 1); + } + } - /** - Wait for the process to exit until a timeout is reached. If the process - doesn't exit before the timeout, force kill it. + /** + Wait for the process to exit until a timeout is reached. If the process + doesn't exit before the timeout, force kill it. - Returns: - The process exit code. - */ - int waitOrForceKill(Duration timeout) - @blocking { - auto code = wait(timeout); + Returns: + The process exit code. + */ + int waitOrForceKill(Duration timeout) + @blocking { + auto code = wait(timeout); - if (code.isNull) { - forceKill(); - return wait(); - } else { - return code.get; - } - } + if (code.isNull) { + forceKill(); + return wait(); + } else { + return code.get; + } + } } /** - A stream for tBatchBufferhe write end of a pipe. + A stream for tBatchBufferhe write end of a pipe. */ struct PipeInputStream { - private static struct Context { - BatchBuffer!ubyte readBuffer; - shared(NativeEventDriver) driver; - } + private static struct Context { + BatchBuffer!ubyte readBuffer; + shared(NativeEventDriver) driver; + } - private { - PipeFD m_pipe; - Context* m_context; - } + private { + PipeFD m_pipe; + Context* m_context; + } - private this(PipeFD pipe) - nothrow { - m_pipe = pipe; - if (pipe != PipeFD.invalid) { - m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); - m_context.readBuffer.capacity = 4096; - m_context.driver = () @trusted { return cast(shared)eventDriver; } (); - } - } + private this(PipeFD pipe) + nothrow { + m_pipe = pipe; + if (pipe != PipeFD.invalid) { + m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); + m_context.readBuffer.capacity = 4096; + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } + } - this(this) - nothrow { - if (m_pipe != PipeFD.invalid) - eventDriver.pipes.addRef(m_pipe); - } + this(this) + nothrow { + if (m_pipe != PipeFD.invalid) + eventDriver.pipes.addRef(m_pipe); + } - ~this() - nothrow { - if (m_pipe != PipeFD.invalid) - releaseHandle!"pipes"(m_pipe, m_context.driver); - } + ~this() + nothrow { + if (m_pipe != PipeFD.invalid) + releaseHandle!"pipes"(m_pipe, m_context.driver); + } - bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } - @property bool empty() @blocking { return leastSize == 0; } - @property ulong leastSize() - @blocking { - waitForData(); - return m_context ? m_context.readBuffer.length : 0; - } - @property bool dataAvailableForRead() { return waitForData(0.seconds); } + @property bool empty() @blocking { return leastSize == 0; } + @property ulong leastSize() + @blocking { + waitForData(); + return m_context ? m_context.readBuffer.length : 0; + } + @property bool dataAvailableForRead() { return waitForData(0.seconds); } - bool waitForData(Duration timeout = Duration.max) - @blocking { - if (!m_context) return false; - if (m_context.readBuffer.length > 0) return true; - auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; + bool waitForData(Duration timeout = Duration.max) + @blocking { + if (!m_context) return false; + if (m_context.readBuffer.length > 0) return true; + auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; - bool cancelled; - IOStatus status; - size_t nbytes; + bool cancelled; + IOStatus status; + size_t nbytes; - alias waiter = Waitable!(PipeIOCallback, - cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb), - (cb) { - cancelled = true; - eventDriver.pipes.cancelRead(m_pipe); - }, - (pipe, st, nb) { - // Handle closed pipes - if (m_pipe == PipeFD.invalid) { - cancelled = true; - return; - } + alias waiter = Waitable!(PipeIOCallback, + cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb), + (cb) { + cancelled = true; + eventDriver.pipes.cancelRead(m_pipe); + }, + (pipe, st, nb) { + // Handle closed pipes + if (m_pipe == PipeFD.invalid) { + cancelled = true; + return; + } - assert(pipe == m_pipe); - status = st; - nbytes = nb; - } - ); + assert(pipe == m_pipe); + status = st; + nbytes = nb; + } + ); - asyncAwaitAny!(true, waiter)(timeout); + asyncAwaitAny!(true, waiter)(timeout); - if (cancelled || !m_context) return false; + if (cancelled || !m_context) return false; - logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status); + logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status); - assert(m_context.readBuffer.length == 0); - m_context.readBuffer.putN(nbytes); - switch (status) { - case IOStatus.ok: break; - case IOStatus.disconnected: break; - case IOStatus.wouldBlock: - assert(mode == IOMode.immediate); - break; - default: - logDebug("Error status when waiting for data: %s", status); - break; - } + assert(m_context.readBuffer.length == 0); + m_context.readBuffer.putN(nbytes); + switch (status) { + case IOStatus.ok: break; + case IOStatus.disconnected: break; + case IOStatus.wouldBlock: + assert(mode == IOMode.immediate); + break; + default: + logDebug("Error status when waiting for data: %s", status); + break; + } - return m_context.readBuffer.length > 0; - } + return m_context.readBuffer.length > 0; + } - const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } + const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } - size_t read(scope ubyte[] dst, IOMode mode) - @blocking { - if (dst.empty) return 0; + size_t read(scope ubyte[] dst, IOMode mode) + @blocking { + if (dst.empty) return 0; - if (m_context.readBuffer.length >= dst.length) { - m_context.readBuffer.read(dst); - return dst.length; - } + if (m_context.readBuffer.length >= dst.length) { + m_context.readBuffer.read(dst); + return dst.length; + } - size_t nbytes = 0; + size_t nbytes = 0; - while (true) { - if (m_context.readBuffer.length == 0) { - if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) - break; + while (true) { + if (m_context.readBuffer.length == 0) { + if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) + break; - enforce(waitForData(), "Reached end of stream while reading data."); - } + enforce(waitForData(), "Reached end of stream while reading data."); + } - assert(m_context.readBuffer.length > 0); - auto l = min(dst.length, m_context.readBuffer.length); - m_context.readBuffer.read(dst[0 .. l]); - dst = dst[l .. $]; - nbytes += l; - if (dst.length == 0) - break; - } + assert(m_context.readBuffer.length > 0); + auto l = min(dst.length, m_context.readBuffer.length); + m_context.readBuffer.read(dst[0 .. l]); + dst = dst[l .. $]; + nbytes += l; + if (dst.length == 0) + break; + } - return nbytes; - } + return nbytes; + } - void read(scope ubyte[] dst) - @blocking { - auto r = read(dst, IOMode.all); - assert(r == dst.length); - } + void read(scope ubyte[] dst) + @blocking { + auto r = read(dst, IOMode.all); + assert(r == dst.length); + } - /** - Close the read end of the pipe immediately. + /** + Close the read end of the pipe immediately. - Make sure that the pipe is not used after this is called and is released - as soon as possible. Due to implementation detail in eventcore this - reference could conflict with future pipes. - */ - void close() - nothrow { + Make sure that the pipe is not used after this is called and is released + as soon as possible. Due to implementation detail in eventcore this + reference could conflict with future pipes. + */ + void close() + nothrow { if (m_pipe == PipeFD.invalid) return; asyncAwaitUninterruptible!(PipeCloseCallback, @@ -461,80 +461,80 @@ struct PipeInputStream { eventDriver.pipes.releaseRef(m_pipe); m_pipe = PipeFD.invalid; - } + } } mixin validateInputStream!PipeInputStream; /** - Stream for the read end of a pipe. + Stream for the read end of a pipe. */ struct PipeOutputStream { - private static struct Context { - shared(NativeEventDriver) driver; - } + private static struct Context { + shared(NativeEventDriver) driver; + } - private { - PipeFD m_pipe; - Context* m_context; - } + private { + PipeFD m_pipe; + Context* m_context; + } - private this(PipeFD pipe) - nothrow { - m_pipe = pipe; - if (pipe != PipeFD.invalid) { - m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); - m_context.driver = () @trusted { return cast(shared)eventDriver; } (); - } - } + private this(PipeFD pipe) + nothrow { + m_pipe = pipe; + if (pipe != PipeFD.invalid) { + m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); + } + } - this(this) - nothrow { - if (m_pipe != PipeFD.invalid) - eventDriver.pipes.addRef(m_pipe); - } + this(this) + nothrow { + if (m_pipe != PipeFD.invalid) + eventDriver.pipes.addRef(m_pipe); + } - ~this() - nothrow { - if (m_pipe != PipeFD.invalid) - releaseHandle!"pipes"(m_pipe, m_context.driver); - } + ~this() + nothrow { + if (m_pipe != PipeFD.invalid) + releaseHandle!"pipes"(m_pipe, m_context.driver); + } - bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } + bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } - size_t write(in ubyte[] bytes, IOMode mode) - @blocking { - if (bytes.empty) return 0; + size_t write(in ubyte[] bytes, IOMode mode) + @blocking { + if (bytes.empty) return 0; - auto res = asyncAwait!(PipeIOCallback, - cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb), - cb => eventDriver.pipes.cancelWrite(m_pipe)); + auto res = asyncAwait!(PipeIOCallback, + cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb), + cb => eventDriver.pipes.cancelWrite(m_pipe)); - switch (res[1]) { - case IOStatus.ok: break; - case IOStatus.disconnected: break; - default: - throw new Exception("Error writing data to pipe."); - } + switch (res[1]) { + case IOStatus.ok: break; + case IOStatus.disconnected: break; + default: + throw new Exception("Error writing data to pipe."); + } - return res[2]; - } + return res[2]; + } - void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } - void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } + void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } + void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } - void flush() {} - void finalize() {} + void flush() {} + void finalize() {} - /** - Close the write end of the pipe immediately. + /** + Close the write end of the pipe immediately. - Make sure that the pipe is not used after this is called and is released - as soon as possible. Due to implementation detail in eventcore this - reference could conflict with future pipes. - */ - void close() - nothrow { + Make sure that the pipe is not used after this is called and is released + as soon as possible. Due to implementation detail in eventcore this + reference could conflict with future pipes. + */ + void close() + nothrow { if (m_pipe == PipeFD.invalid) return; asyncAwaitUninterruptible!(PipeCloseCallback, @@ -543,231 +543,231 @@ struct PipeOutputStream { eventDriver.pipes.releaseRef(m_pipe); m_pipe = PipeFD.invalid; - } + } } mixin validateOutputStream!PipeOutputStream; /** - A pipe created by `pipe`. + A pipe created by `pipe`. */ struct Pipe { - /// Read end of the pipe - PipeInputStream readEnd; - /// Write end of the pipe - PipeOutputStream writeEnd; + /// Read end of the pipe + PipeInputStream readEnd; + /// Write end of the pipe + PipeOutputStream writeEnd; - /** - Close both ends of the pipe - */ - void close() - nothrow { - writeEnd.close(); - readEnd.close(); - } + /** + Close both ends of the pipe + */ + void close() + nothrow { + writeEnd.close(); + readEnd.close(); + } } /** - Create a pipe, async equivalent of `std.process.pipe`. + Create a pipe, async equivalent of `std.process.pipe`. - Returns: - A stream for each end of the pipe. + Returns: + A stream for each end of the pipe. */ Pipe pipe() { - auto p = std.process.pipe(); + auto p = std.process.pipe(); - auto read = eventDriver.pipes.adopt(p.readEnd.fileno); - auto write = eventDriver.pipes.adopt(p.writeEnd.fileno); + auto read = eventDriver.pipes.adopt(p.readEnd.fileno); + auto write = eventDriver.pipes.adopt(p.writeEnd.fileno); - return Pipe(PipeInputStream(read), PipeOutputStream(write)); + return Pipe(PipeInputStream(read), PipeOutputStream(write)); } /** - Returned from `pipeProcess`. + Returned from `pipeProcess`. - See_Also: `pipeProcess`, `pipeShell` + See_Also: `pipeProcess`, `pipeShell` */ struct ProcessPipes { - Process process; - PipeOutputStream stdin; - PipeInputStream stdout; - PipeInputStream stderr; + Process process; + PipeOutputStream stdin; + PipeInputStream stdout; + PipeInputStream stderr; } /** - Equivalent to `std.process.pipeProcess`. + Equivalent to `std.process.pipeProcess`. - Returns: - A struct containing the process and created pipes. + Returns: + A struct containing the process and created pipes. - See_Also: `spawnProcess`, `execute` + See_Also: `spawnProcess`, `execute` */ ProcessPipes pipeProcess( - scope string[] args, - Redirect redirect = Redirect.all, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init) + scope string[] args, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) @trusted { - auto stdin = ProcessStdinFile(ProcessRedirect.inherit); - if (Redirect.stdin & redirect) { - stdin = ProcessStdinFile(ProcessRedirect.pipe); - } + auto stdin = ProcessStdinFile(ProcessRedirect.inherit); + if (Redirect.stdin & redirect) { + stdin = ProcessStdinFile(ProcessRedirect.pipe); + } - auto stdout = ProcessStdoutFile(ProcessRedirect.inherit); - if (Redirect.stdoutToStderr & redirect) { - stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr); - } else if (Redirect.stdout & redirect) { - stdout = ProcessStdoutFile(ProcessRedirect.pipe); - } + auto stdout = ProcessStdoutFile(ProcessRedirect.inherit); + if (Redirect.stdoutToStderr & redirect) { + stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr); + } else if (Redirect.stdout & redirect) { + stdout = ProcessStdoutFile(ProcessRedirect.pipe); + } - auto stderr = ProcessStderrFile(ProcessRedirect.inherit); - if (Redirect.stderrToStdout & redirect) { - stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout); - } else if (Redirect.stderr & redirect) { - stderr = ProcessStderrFile(ProcessRedirect.pipe); - } + auto stderr = ProcessStderrFile(ProcessRedirect.inherit); + if (Redirect.stderrToStdout & redirect) { + stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout); + } else if (Redirect.stderr & redirect) { + stderr = ProcessStderrFile(ProcessRedirect.pipe); + } - auto process = eventDriver.processes.spawn( - args, - stdin, - stdout, - stderr, - env, - config, - workDir.toNativeString()); + auto process = eventDriver.processes.spawn( + args, + stdin, + stdout, + stderr, + env, + config, + workDir.toNativeString()); - return ProcessPipes( - Process(process.pid), - PipeOutputStream(cast(PipeFD)process.stdin), - PipeInputStream(cast(PipeFD)process.stdout), - PipeInputStream(cast(PipeFD)process.stderr) - ); + return ProcessPipes( + Process(process.pid), + PipeOutputStream(cast(PipeFD)process.stdin), + PipeInputStream(cast(PipeFD)process.stdout), + PipeInputStream(cast(PipeFD)process.stderr) + ); } /// ditto ProcessPipes pipeProcess( - scope string program, - Redirect redirect = Redirect.all, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init) + scope string program, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init) { - return pipeProcess( - [program], - redirect, - env, - config, - workDir - ); + return pipeProcess( + [program], + redirect, + env, + config, + workDir + ); } /// ditto ProcessPipes pipeShell( - scope string command, - Redirect redirect = Redirect.all, - const string[string] env = null, - Config config = Config.none, - scope NativePath workDir = NativePath.init, - scope NativePath shellPath = nativeShell) + scope string command, + Redirect redirect = Redirect.all, + const string[string] env = null, + Config config = Config.none, + scope NativePath workDir = NativePath.init, + scope NativePath shellPath = nativeShell) { - return pipeProcess( - shellCommand(command, nativeShell), - redirect, - env, - config, - workDir); + return pipeProcess( + shellCommand(command, nativeShell), + redirect, + env, + config, + workDir); } /** - Equivalent to `std.process.execute`. + Equivalent to `std.process.execute`. - Returns: - Tuple containing the exit status and process output. + Returns: + Tuple containing the exit status and process output. - See_Also: `spawnProcess`, `pipeProcess` + See_Also: `spawnProcess`, `pipeProcess` */ auto execute( - scope string[] args, - const string[string] env = null, - Config config = Config.none, - size_t maxOutput = size_t.max, - scope NativePath workDir = NativePath.init) + scope string[] args, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = NativePath.init) @blocking { - return executeImpl!pipeProcess(args, env, config, maxOutput, workDir); + return executeImpl!pipeProcess(args, env, config, maxOutput, workDir); } /// ditto auto execute( - scope string program, - const string[string] env = null, - Config config = Config.none, - size_t maxOutput = size_t.max, - scope NativePath workDir = NativePath.init) + scope string program, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = NativePath.init) @blocking @trusted { - return executeImpl!pipeProcess(program, env, config, maxOutput, workDir); + return executeImpl!pipeProcess(program, env, config, maxOutput, workDir); } /// ditto auto executeShell( - scope string command, - const string[string] env = null, - Config config = Config.none, - size_t maxOutput = size_t.max, - scope NativePath workDir = null, - NativePath shellPath = nativeShell) + scope string command, + const string[string] env = null, + Config config = Config.none, + size_t maxOutput = size_t.max, + scope NativePath workDir = null, + NativePath shellPath = nativeShell) @blocking { - return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath); + return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath); } private auto executeImpl(alias spawn, Cmd, Args...)( - Cmd command, - const string[string] env, - Config config, - size_t maxOutput, - scope NativePath workDir, - Args args) + Cmd command, + const string[string] env, + Config config, + size_t maxOutput, + scope NativePath workDir, + Args args) @blocking { - Redirect redirect = Redirect.stdout; + Redirect redirect = Redirect.stdout; - auto processPipes = spawn(command, redirect, env, config, workDir, args); + auto processPipes = spawn(command, redirect, env, config, workDir, args); - auto stringOutput = processPipes.stdout.collectOutput(maxOutput); + auto stringOutput = processPipes.stdout.collectOutput(maxOutput); - return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); + return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); } /* - Collect the string output of a stream in a blocking fashion. + Collect the string output of a stream in a blocking fashion. - Params: - stream = The input stream to collect from. - nbytes = The maximum number of bytes to collect. + Params: + stream = The input stream to collect from. + nbytes = The maximum number of bytes to collect. - Returns: - The collected data from the stream as a string. + Returns: + The collected data from the stream as a string. */ /// private string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max) @blocking @trusted if (isInputStream!InputStream) { - auto output = appender!string(); - if (nbytes != size_t.max) { - output.reserve(nbytes); - } + auto output = appender!string(); + if (nbytes != size_t.max) { + output.reserve(nbytes); + } - import vibe.internal.allocator : theAllocator, dispose; + import vibe.internal.allocator : theAllocator, dispose; - scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); - scope (exit) theAllocator.dispose(buffer); + scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); + scope (exit) theAllocator.dispose(buffer); - while (!stream.empty && nbytes > 0) { - size_t chunk = min(nbytes, stream.leastSize, buffer.length); - assert(chunk > 0, "leastSize returned zero for non-empty stream."); + while (!stream.empty && nbytes > 0) { + size_t chunk = min(nbytes, stream.leastSize, buffer.length); + assert(chunk > 0, "leastSize returned zero for non-empty stream."); - stream.read(buffer[0..chunk]); - output.put(buffer[0..chunk]); - } + stream.read(buffer[0..chunk]); + output.put(buffer[0..chunk]); + } - return output.data; + return output.data; } From 434433394638cfc8d5d77fddedb0534bba5c4551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 18 May 2020 16:44:11 +0200 Subject: [PATCH 3/4] Add error handling for process creation. --- source/vibe/core/process.d | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/source/vibe/core/process.d b/source/vibe/core/process.d index 1d8db70..b963933 100644 --- a/source/vibe/core/process.d +++ b/source/vibe/core/process.d @@ -40,7 +40,10 @@ Process adoptProcessID(Pid pid) /// ditto Process adoptProcessID(int pid) { - return Process(eventDriver.processes.adopt(pid)); + auto p = eventDriver.processes.adopt(pid); + if (p == ProcessID.invalid) + throw new Exception("Failed to adopt process ID"); + return Process(p); } /** @@ -81,15 +84,19 @@ Process spawnProcess( Config config = Config.none, scope NativePath workDir = NativePath.init) @trusted { - return Process(eventDriver.processes.spawn( + auto process = eventDriver.processes.spawn( args, ProcessStdinFile(ProcessRedirect.inherit), ProcessStdoutFile(ProcessRedirect.inherit), ProcessStderrFile(ProcessRedirect.inherit), env, config, - workDir.toNativeString()).pid - ); + workDir.toNativeString()); + + if (process.pid == ProcessID.invalid) + throw new Exception("Failed to spawn process"); + + return Process(process.pid); } /// ditto @@ -638,6 +645,9 @@ ProcessPipes pipeProcess( config, workDir.toNativeString()); + if (process.pid == ProcessID.invalid) + throw new Exception("Failed to spawn process"); + return ProcessPipes( Process(process.pid), PipeOutputStream(cast(PipeFD)process.stdin), From b846a06a3567e30f73bf40e8cf9baf9e3666774e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 22 May 2020 10:33:17 +0200 Subject: [PATCH 4/4] Make the std.concurrency test more robust against bad sleep timing. --- tests/std.concurrency.d | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/tests/std.concurrency.d b/tests/std.concurrency.d index f310a9a..f57390b 100644 --- a/tests/std.concurrency.d +++ b/tests/std.concurrency.d @@ -7,6 +7,7 @@ module test; import vibe.core.core; import vibe.core.log; +import std.algorithm; import std.concurrency; import core.atomic; import core.time; @@ -18,9 +19,16 @@ shared watchdog_count = 0; void main() { t1 = spawn({ - // ensure that asynchronous operations run in parallel to receive() + // ensure that asynchronous operations can run in parallel to receive() int wc = 0; - runTask({ while (true) { sleep(250.msecs); wc++; logInfo("Watchdog receiver %s", wc); } }); + MonoTime stime = MonoTime.currTime; + runTask({ + while (true) { + sleepUntil((wc + 1) * 250.msecs, stime, 200.msecs); + wc++; + logInfo("Watchdog receiver %s", wc); + } + }); bool finished = false; try while (!finished) { @@ -57,28 +65,30 @@ void main() }); t2 = spawn({ + MonoTime stime = MonoTime.currTime; + scope (failure) assert(false); - sleep(1.seconds()); + sleepUntil(1.seconds, stime, 900.msecs); logInfo("send Hello World"); t1.send("Hello, World!"); - sleep(1.seconds()); + sleepUntil(2.seconds, stime, 900.msecs); logInfo("send int 1"); t1.send(1); - sleep(1.seconds()); + sleepUntil(3.seconds, stime, 900.msecs); logInfo("send double 1.2"); t1.send(1.2); - sleep(1.seconds()); + sleepUntil(4.seconds, stime, 900.msecs); logInfo("send int 2"); t1.send(2); - sleep(1.seconds()); + sleepUntil(5.seconds, stime, 900.msecs); logInfo("send 3xint 1 2 3"); t1.send(1, 2, 3); - sleep(1.seconds()); + sleepUntil(6.seconds, stime, 900.msecs); logInfo("send string Bye bye"); t1.send("Bye bye"); @@ -89,3 +99,12 @@ void main() runApplication(); } + +// corrects for small timing inaccuracies to avoid the counter +// getting systematically out of sync when sleep timing is inaccurate +void sleepUntil(Duration until, MonoTime start_time, Duration min_sleep) +{ + auto tm = MonoTime.currTime; + auto timeout = max(start_time - tm + until, min_sleep); + sleep(timeout); +}