diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index 11b3cc2..171b8ed 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1088,7 +1088,7 @@ struct Future(T) { } /// Checks if the values was fully computed. - @property bool ready() const { return !m_task.running; } + @property bool ready() const @safe { return !m_task.running; } /** Returns the computed value. @@ -1098,10 +1098,12 @@ struct Future(T) { instead. */ ref T getResult() - { + @safe { if (!ready) m_task.join(); assert(ready, "Task still running after join()!?"); - return *cast(T*)&m_result.get(); // casting away shared is safe, because this is a unique reference + + // casting away shared is safe, because this is a unique reference + return *() @trusted { return cast(T*)&m_result.get(); } (); } alias getResult this; @@ -1153,7 +1155,7 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar } /// -unittest { +@safe unittest { import vibe.core.core; import vibe.core.log; diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index cab93d1..0010de5 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -614,15 +614,17 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` */ public void setupWorkerThreads(uint num = logicalProcessorCount()) -{ +@safe { static bool s_workerThreadsStarted = false; if (s_workerThreadsStarted) return; s_workerThreadsStarted = true; - synchronized (st_threadsMutex) { - if (!st_workerPool) - st_workerPool = new shared TaskPool(num); - } + () @trusted { + synchronized (st_threadsMutex) { + if (!st_workerPool) + st_workerPool = new shared TaskPool(num); + } + } (); } diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index d26d93c..e2bb428 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -135,28 +135,14 @@ shared final class TaskPool { Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) if (isFunctionPointer!FT) { - import std.typecons : Typedef; - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); - Task caller = Task.getThis(); - // workaround for runWorkerTaskH to work when called outside of a task - if (caller == Task.init) { + if (Task.getThis() == Task.init) { Task ret; - .runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join(); + .runTask({ ret = doRunTaskH(func, args); }).join(); return ret; - } - - assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); - static void taskFun(Task caller, FT func, ARGS args) { - PrivateTask callee = Task.getThis(); - caller.tid.prioritySend(callee); - mixin(callWithMove!ARGS("func", "args")); - } - runTask_unsafe(&taskFun, caller, func, args); - return cast(Task)receiveOnly!PrivateTask(); + } else return doRunTaskH(func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -168,6 +154,28 @@ shared final class TaskPool { return runTaskH(&wrapper!(), object, args); } + // NOTE: needs to be a separate function to avoid recursion for the + // workaround above, which breaks @safe inference + private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args) + if (isFunctionPointer!FT) + { + import std.typecons : Typedef; + + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); + Task caller = Task.getThis(); + + assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); + static void taskFun(Task caller, FT func, ARGS args) { + PrivateTask callee = Task.getThis(); + caller.tid.prioritySend(callee); + mixin(callWithMove!ARGS("func", "args")); + } + runTask_unsafe(&taskFun, caller, func, args); + return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } (); + } + /** Runs a new asynchronous task in all worker threads concurrently. @@ -223,11 +231,6 @@ shared final class TaskPool { on_handle(receiveOnly!Task); } - private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args) - { - *ret = runTaskH!(FT, ARGS)(func, args); - } - private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) { import std.traits : ParameterTypeTuple;