Implement runWorkerTaskDistH.

Variant of runWorkerTask that allows to get the handles of the created tasks.
This commit is contained in:
Sönke Ludwig 2019-01-23 13:50:43 +01:00
parent 6e04179cdc
commit a54fa6b7de
2 changed files with 46 additions and 1 deletions

View file

@ -586,6 +586,21 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
} }
/** Runs a new asynchronous task in all worker threads and returns the handles.
`on_handle` is a callble that takes a `Task` as its only argument and is
called for every task instance that gets created.
See_also: `runWorkerTaskDist`
*/
void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
setupWorkerThreads();
st_workerPool.runTaskDistH(on_handle, func, args);
}
/** /**
Sets up num worker threads. Sets up num worker threads.

View file

@ -176,7 +176,7 @@ shared final class TaskPool {
arguments are allowed to be able to guarantee thread-safety. arguments are allowed to be able to guarantee thread-safety.
The number of tasks started is guaranteed to be equal to The number of tasks started is guaranteed to be equal to
`workerThreadCount`. `threadCount`.
*/ */
void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
if (is(typeof(*func) == function)) if (is(typeof(*func) == function))
@ -193,6 +193,36 @@ shared final class TaskPool {
runTaskDist_unsafe(func, args); runTaskDist_unsafe(func, args);
} }
/** Runs a new asynchronous task in all worker threads and returns the handles.
`on_handle` is an alias to a callble that takes a `Task` as its only
argument and is called for every task instance that gets created.
See_also: `runTaskDist`
*/
void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
{
// TODO: support non-copyable argument types using .move
import std.concurrency : send, receiveOnly;
auto caller = Task.getThis();
// workaround to work when called outside of a task
if (caller == Task.init) {
.runTask({ runTaskDistH(on_handle, func, args); }).join();
return;
}
static void call(Task t, FT func, ARGS args) {
t.tid.send(Task.getThis());
func(args);
}
runTaskDist(&call, caller, func, args);
foreach (i; 0 .. this.threadCount)
on_handle(receiveOnly!Task);
}
private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args) private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args)
{ {
*ret = runTaskH!(FT, ARGS)(func, args); *ret = runTaskH!(FT, ARGS)(func, args);