From a54fa6b7de157a7962b29f02086da34d236a063f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 23 Jan 2019 13:50:43 +0100 Subject: [PATCH] Implement runWorkerTaskDistH. Variant of runWorkerTask that allows to get the handles of the created tasks. --- source/vibe/core/core.d | 15 +++++++++++++++ source/vibe/core/taskpool.d | 32 +++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 554a6a6..a11eded 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -586,6 +586,21 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) } +/** Runs a new asynchronous task in all worker threads and returns the handles. + + `on_handle` is a callble that takes a `Task` as its only argument and is + called for every task instance that gets created. + + See_also: `runWorkerTaskDist` +*/ +void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) +{ + setupWorkerThreads(); + st_workerPool.runTaskDistH(on_handle, func, args); +} + + /** Sets up num worker threads. diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 6d85596..d26d93c 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -176,7 +176,7 @@ shared final class TaskPool { arguments are allowed to be able to guarantee thread-safety. The number of tasks started is guaranteed to be equal to - `workerThreadCount`. + `threadCount`. */ void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) if (is(typeof(*func) == function)) @@ -193,6 +193,36 @@ shared final class TaskPool { runTaskDist_unsafe(func, args); } + /** Runs a new asynchronous task in all worker threads and returns the handles. + + `on_handle` is an alias to a callble that takes a `Task` as its only + argument and is called for every task instance that gets created. + + See_also: `runTaskDist` + */ + void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) + { + // TODO: support non-copyable argument types using .move + import std.concurrency : send, receiveOnly; + + auto caller = Task.getThis(); + + // workaround to work when called outside of a task + if (caller == Task.init) { + .runTask({ runTaskDistH(on_handle, func, args); }).join(); + return; + } + + static void call(Task t, FT func, ARGS args) { + t.tid.send(Task.getThis()); + func(args); + } + runTaskDist(&call, caller, func, args); + + foreach (i; 0 .. this.threadCount) + on_handle(receiveOnly!Task); + } + private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args) { *ret = runTaskH!(FT, ARGS)(func, args);