From 4f5636dadff59fde032a43794216ca860b15480e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 10:13:09 +0200 Subject: [PATCH 1/3] Mark async/Future and runWorkerTaskH as safe. --- source/vibe/core/concurrency.d | 10 +++++--- source/vibe/core/core.d | 12 +++++---- source/vibe/core/taskpool.d | 47 ++++++++++++++++++---------------- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index 11b3cc2..171b8ed 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1088,7 +1088,7 @@ struct Future(T) { } /// Checks if the values was fully computed. - @property bool ready() const { return !m_task.running; } + @property bool ready() const @safe { return !m_task.running; } /** Returns the computed value. @@ -1098,10 +1098,12 @@ struct Future(T) { instead. */ ref T getResult() - { + @safe { if (!ready) m_task.join(); assert(ready, "Task still running after join()!?"); - return *cast(T*)&m_result.get(); // casting away shared is safe, because this is a unique reference + + // casting away shared is safe, because this is a unique reference + return *() @trusted { return cast(T*)&m_result.get(); } (); } alias getResult this; @@ -1153,7 +1155,7 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar } /// -unittest { +@safe unittest { import vibe.core.core; import vibe.core.log; diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index cab93d1..0010de5 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -614,15 +614,17 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` */ public void setupWorkerThreads(uint num = logicalProcessorCount()) -{ +@safe { static bool s_workerThreadsStarted = false; if (s_workerThreadsStarted) return; s_workerThreadsStarted = true; - synchronized (st_threadsMutex) { - if (!st_workerPool) - st_workerPool = new shared TaskPool(num); - } + () @trusted { + synchronized (st_threadsMutex) { + if (!st_workerPool) + st_workerPool = new shared TaskPool(num); + } + } (); } diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index d26d93c..e2bb428 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -135,28 +135,14 @@ shared final class TaskPool { Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) if (isFunctionPointer!FT) { - import std.typecons : Typedef; - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); - Task caller = Task.getThis(); - // workaround for runWorkerTaskH to work when called outside of a task - if (caller == Task.init) { + if (Task.getThis() == Task.init) { Task ret; - .runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join(); + .runTask({ ret = doRunTaskH(func, args); }).join(); return ret; - } - - assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); - static void taskFun(Task caller, FT func, ARGS args) { - PrivateTask callee = Task.getThis(); - caller.tid.prioritySend(callee); - mixin(callWithMove!ARGS("func", "args")); - } - runTask_unsafe(&taskFun, caller, func, args); - return cast(Task)receiveOnly!PrivateTask(); + } else return doRunTaskH(func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -168,6 +154,28 @@ shared final class TaskPool { return runTaskH(&wrapper!(), object, args); } + // NOTE: needs to be a separate function to avoid recursion for the + // workaround above, which breaks @safe inference + private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args) + if (isFunctionPointer!FT) + { + import std.typecons : Typedef; + + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); + Task caller = Task.getThis(); + + assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); + static void taskFun(Task caller, FT func, ARGS args) { + PrivateTask callee = Task.getThis(); + caller.tid.prioritySend(callee); + mixin(callWithMove!ARGS("func", "args")); + } + runTask_unsafe(&taskFun, caller, func, args); + return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } (); + } + /** Runs a new asynchronous task in all worker threads concurrently. @@ -223,11 +231,6 @@ shared final class TaskPool { on_handle(receiveOnly!Task); } - private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args) - { - *ret = runTaskH!(FT, ARGS)(func, args); - } - private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) { import std.traits : ParameterTypeTuple; From 881e3da5f98cdd181b0f9bdd34192ac71563b4a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 11:39:14 +0200 Subject: [PATCH 2/3] Make async marked safe if possible and add asyncWork. asyncWork is the same as async, except that it guarantees that the computation happens in a worker thread. --- source/vibe/core/concurrency.d | 38 ++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index 171b8ed..c792e58 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1083,7 +1083,9 @@ struct Future(T) { import vibe.internal.freelistref : FreeListRef; private { - FreeListRef!(shared(T)) m_result; + alias ResultRef = FreeListRef!(shared(Tuple!(T, string))); + + ResultRef m_result; Task m_task; } @@ -1102,15 +1104,18 @@ struct Future(T) { if (!ready) m_task.join(); assert(ready, "Task still running after join()!?"); + if (m_result.get[1].length) + throw new Exception(m_result.get[1]); + // casting away shared is safe, because this is a unique reference - return *() @trusted { return cast(T*)&m_result.get(); } (); + return *() @trusted { return cast(T*)&m_result.get()[0]; } (); } alias getResult this; private void init() - { - m_result = FreeListRef!(shared(T))(); + @safe { + m_result = ResultRef(); } } @@ -1143,8 +1148,9 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar alias RET = ReturnType!CALLABLE; Future!RET ret; ret.init(); - static void compute(FreeListRef!(shared(RET)) dst, CALLABLE callable, ARGS args) { - dst.get = cast(shared(RET))callable(args); + static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { + try dst.get[0] = cast(shared(RET))callable(args); + catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; } static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); @@ -1200,6 +1206,26 @@ unittest { } } +Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) @safe + if (is(typeof(callable(args)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import vibe.core.core; + import vibe.internal.freelistref : FreeListRef; + import std.functional : toDelegate; + + alias RET = ReturnType!CALLABLE; + Future!RET ret; + ret.init(); + static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { + try *cast(RET*)&dst.get[0] = callable(args); + catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; + } + ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); + return ret; +} + + /******************************************************************************/ /* std.concurrency compatible interface for message passing */ /******************************************************************************/ From e3a38b374e02b629d9dad7f01bbcdcd3c16d2e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 11:40:30 +0200 Subject: [PATCH 3/3] Let all non-aio file system functions run in worker threads. This avoids stalling the event loop in case of lengthy I/O (network shares, spinning up hard drives etc). --- source/vibe/core/file.d | 90 ++++++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 15 deletions(-) diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index e32efef..cc16b64 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -1,12 +1,13 @@ /** File handling functions and types. - Copyright: © 2012-2018 RejectedSoftware e.K. + Copyright: © 2012-2019 RejectedSoftware e.K. License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Sönke Ludwig */ module vibe.core.file; +import vibe.core.concurrency : asyncWork; import eventcore.core : NativeEventDriver, eventDriver; import eventcore.driver; import vibe.core.internal.release; @@ -26,6 +27,8 @@ import std.exception; import std.file; import std.path; import std.string; +import std.typecons : Flag, No; + version(Posix){ private extern(C) int mkstemps(char* templ, int suffixlen); @@ -278,7 +281,12 @@ bool existsFile(string path) nothrow // This was *annotated* nothrow in 2.067. static if (__VERSION__ < 2067) scope(failure) assert(0, "Error: existsFile should never throw"); - return std.file.exists(path); + + try return asyncWork((string p) => std.file.exists(p), path).getResult(); + catch (Exception e) { + logDebug("Failed to determine file existence for '%s': %s", path, e.msg); + return false; + } } /** Stores information about the specified file/directory into 'info' @@ -287,13 +295,15 @@ bool existsFile(string path) nothrow */ FileInfo getFileInfo(NativePath path) @trusted { - auto ent = DirEntry(path.toNativeString()); - return makeFileInfo(ent); + return getFileInfo(path.toNativeString); } /// ditto FileInfo getFileInfo(string path) { - return getFileInfo(NativePath(path)); + return asyncWork((string p) { + auto ent = DirEntry(p); + return makeFileInfo(ent); + }, path).getResult(); } /** @@ -301,27 +311,77 @@ FileInfo getFileInfo(string path) */ void createDirectory(NativePath path) { - () @trusted { mkdir(path.toNativeString()); } (); + createDirectory(path.toNativeString); } /// ditto -void createDirectory(string path) +void createDirectory(string path, Flag!"recursive" recursive = No.recursive) { - createDirectory(NativePath(path)); + auto fail = asyncWork((string p, bool rec) { + try { + if (rec) mkdirRecurse(p); + else mkdir(p); + } catch (Exception e) { + return e.msg.length ? e.msg : "Failed to create directory."; + } + return null; + }, path, !!recursive); + + if (fail) throw new Exception(fail); } /** Enumerates all files in the specified directory. */ -void listDirectory(NativePath path, scope bool delegate(FileInfo info) del) -@trusted { - foreach( DirEntry ent; dirEntries(path.toNativeString(), SpanMode.shallow) ) - if( !del(makeFileInfo(ent)) ) - break; +void listDirectory(NativePath path, scope bool delegate(FileInfo info) @safe del) +{ + listDirectory(path.toNativeString, del); } /// ditto -void listDirectory(string path, scope bool delegate(FileInfo info) del) +void listDirectory(string path, scope bool delegate(FileInfo info) @safe del) { - listDirectory(NativePath(path), del); + import vibe.core.core : runWorkerTaskH; + import vibe.core.channel : Channel, createChannel; + + struct S { + FileInfo info; + string error; + } + + auto ch = createChannel!S(); + runWorkerTaskH((string path, Channel!S ch) nothrow { + scope (exit) ch.close(); + try { + foreach (DirEntry ent; dirEntries(path, SpanMode.shallow)) { + auto nfo = makeFileInfo(ent); + try ch.put(S(nfo, null)); + catch (Exception e) break; // channel got closed + } + } catch (Exception e) { + try ch.put(S(FileInfo.init, e.msg.length ? e.msg : "Failed to iterate directory")); + catch (Exception e) {} // channel got closed + } + }, path, ch); + + S itm; + while (ch.tryConsumeOne(itm)) { + if (itm.error.length) + throw new Exception(itm.error); + + if (!del(itm.info)) { + ch.close(); + break; + } + } +} +/// ditto +void listDirectory(NativePath path, scope bool delegate(FileInfo info) @system del) +@system { + listDirectory(path, (nfo) @trusted => del(nfo)); +} +/// ditto +void listDirectory(string path, scope bool delegate(FileInfo info) @system del) +@system { + listDirectory(path, (nfo) @trusted => del(nfo)); } /// ditto int delegate(scope int delegate(ref FileInfo)) iterateDirectory(NativePath path)