From 881e3da5f98cdd181b0f9bdd34192ac71563b4a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Aug 2019 11:39:14 +0200 Subject: [PATCH] Make async marked safe if possible and add asyncWork. asyncWork is the same as async, except that it guarantees that the computation happens in a worker thread. --- source/vibe/core/concurrency.d | 38 ++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index 171b8ed..c792e58 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1083,7 +1083,9 @@ struct Future(T) { import vibe.internal.freelistref : FreeListRef; private { - FreeListRef!(shared(T)) m_result; + alias ResultRef = FreeListRef!(shared(Tuple!(T, string))); + + ResultRef m_result; Task m_task; } @@ -1102,15 +1104,18 @@ struct Future(T) { if (!ready) m_task.join(); assert(ready, "Task still running after join()!?"); + if (m_result.get[1].length) + throw new Exception(m_result.get[1]); + // casting away shared is safe, because this is a unique reference - return *() @trusted { return cast(T*)&m_result.get(); } (); + return *() @trusted { return cast(T*)&m_result.get()[0]; } (); } alias getResult this; private void init() - { - m_result = FreeListRef!(shared(T))(); + @safe { + m_result = ResultRef(); } } @@ -1143,8 +1148,9 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar alias RET = ReturnType!CALLABLE; Future!RET ret; ret.init(); - static void compute(FreeListRef!(shared(RET)) dst, CALLABLE callable, ARGS args) { - dst.get = cast(shared(RET))callable(args); + static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { + try dst.get[0] = cast(shared(RET))callable(args); + catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; } static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); @@ -1200,6 +1206,26 @@ unittest { } } +Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) @safe + if (is(typeof(callable(args)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import vibe.core.core; + import vibe.internal.freelistref : FreeListRef; + import std.functional : toDelegate; + + alias RET = ReturnType!CALLABLE; + Future!RET ret; + ret.init(); + static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { + try *cast(RET*)&dst.get[0] = callable(args); + catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; + } + ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); + return ret; +} + + /******************************************************************************/ /* std.concurrency compatible interface for message passing */ /******************************************************************************/