Mark async/Future and runWorkerTaskH as safe.

This commit is contained in:
Sönke Ludwig 2019-08-21 10:13:09 +02:00
parent 7c609dd07f
commit 4f5636dadf
3 changed files with 38 additions and 31 deletions

View file

@ -1088,7 +1088,7 @@ struct Future(T) {
} }
/// Checks if the values was fully computed. /// Checks if the values was fully computed.
@property bool ready() const { return !m_task.running; } @property bool ready() const @safe { return !m_task.running; }
/** Returns the computed value. /** Returns the computed value.
@ -1098,10 +1098,12 @@ struct Future(T) {
instead. instead.
*/ */
ref T getResult() ref T getResult()
{ @safe {
if (!ready) m_task.join(); if (!ready) m_task.join();
assert(ready, "Task still running after join()!?"); assert(ready, "Task still running after join()!?");
return *cast(T*)&m_result.get(); // casting away shared is safe, because this is a unique reference
// casting away shared is safe, because this is a unique reference
return *() @trusted { return cast(T*)&m_result.get(); } ();
} }
alias getResult this; alias getResult this;
@ -1153,7 +1155,7 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar
} }
/// ///
unittest { @safe unittest {
import vibe.core.core; import vibe.core.core;
import vibe.core.log; import vibe.core.log;

View file

@ -614,15 +614,17 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref
See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`
*/ */
public void setupWorkerThreads(uint num = logicalProcessorCount()) public void setupWorkerThreads(uint num = logicalProcessorCount())
{ @safe {
static bool s_workerThreadsStarted = false; static bool s_workerThreadsStarted = false;
if (s_workerThreadsStarted) return; if (s_workerThreadsStarted) return;
s_workerThreadsStarted = true; s_workerThreadsStarted = true;
synchronized (st_threadsMutex) { () @trusted {
if (!st_workerPool) synchronized (st_threadsMutex) {
st_workerPool = new shared TaskPool(num); if (!st_workerPool)
} st_workerPool = new shared TaskPool(num);
}
} ();
} }

View file

@ -135,28 +135,14 @@ shared final class TaskPool {
Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
if (isFunctionPointer!FT) if (isFunctionPointer!FT)
{ {
import std.typecons : Typedef;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
// workaround for runWorkerTaskH to work when called outside of a task // workaround for runWorkerTaskH to work when called outside of a task
if (caller == Task.init) { if (Task.getThis() == Task.init) {
Task ret; Task ret;
.runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join(); .runTask({ ret = doRunTaskH(func, args); }).join();
return ret; return ret;
} } else return doRunTaskH(func, args);
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.tid.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
return cast(Task)receiveOnly!PrivateTask();
} }
/// ditto /// ditto
Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
@ -168,6 +154,28 @@ shared final class TaskPool {
return runTaskH(&wrapper!(), object, args); return runTaskH(&wrapper!(), object, args);
} }
// NOTE: needs to be a separate function to avoid recursion for the
// workaround above, which breaks @safe inference
private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args)
if (isFunctionPointer!FT)
{
import std.typecons : Typedef;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.tid.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } ();
}
/** Runs a new asynchronous task in all worker threads concurrently. /** Runs a new asynchronous task in all worker threads concurrently.
@ -223,11 +231,6 @@ shared final class TaskPool {
on_handle(receiveOnly!Task); on_handle(receiveOnly!Task);
} }
private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args)
{
*ret = runTaskH!(FT, ARGS)(func, args);
}
private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args)
{ {
import std.traits : ParameterTypeTuple; import std.traits : ParameterTypeTuple;