Make async marked safe if possible and add asyncWork.

asyncWork is the same as async, except that it guarantees that the computation happens in a worker thread.
This commit is contained in:
Sönke Ludwig 2019-08-21 11:39:14 +02:00
parent 4f5636dadf
commit 881e3da5f9

View file

@ -1083,7 +1083,9 @@ struct Future(T) {
import vibe.internal.freelistref : FreeListRef; import vibe.internal.freelistref : FreeListRef;
private { private {
FreeListRef!(shared(T)) m_result; alias ResultRef = FreeListRef!(shared(Tuple!(T, string)));
ResultRef m_result;
Task m_task; Task m_task;
} }
@ -1102,15 +1104,18 @@ struct Future(T) {
if (!ready) m_task.join(); if (!ready) m_task.join();
assert(ready, "Task still running after join()!?"); assert(ready, "Task still running after join()!?");
if (m_result.get[1].length)
throw new Exception(m_result.get[1]);
// casting away shared is safe, because this is a unique reference // casting away shared is safe, because this is a unique reference
return *() @trusted { return cast(T*)&m_result.get(); } (); return *() @trusted { return cast(T*)&m_result.get()[0]; } ();
} }
alias getResult this; alias getResult this;
private void init() private void init()
{ @safe {
m_result = FreeListRef!(shared(T))(); m_result = ResultRef();
} }
} }
@ -1143,8 +1148,9 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar
alias RET = ReturnType!CALLABLE; alias RET = ReturnType!CALLABLE;
Future!RET ret; Future!RET ret;
ret.init(); ret.init();
static void compute(FreeListRef!(shared(RET)) dst, CALLABLE callable, ARGS args) { static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) {
dst.get = cast(shared(RET))callable(args); try dst.get[0] = cast(shared(RET))callable(args);
catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed";
} }
static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) {
ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args);
@ -1200,6 +1206,26 @@ unittest {
} }
} }
Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) @safe
if (is(typeof(callable(args)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import vibe.core.core;
import vibe.internal.freelistref : FreeListRef;
import std.functional : toDelegate;
alias RET = ReturnType!CALLABLE;
Future!RET ret;
ret.init();
static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) {
try *cast(RET*)&dst.get[0] = callable(args);
catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed";
}
ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args);
return ret;
}
/******************************************************************************/ /******************************************************************************/
/* std.concurrency compatible interface for message passing */ /* std.concurrency compatible interface for message passing */
/******************************************************************************/ /******************************************************************************/