Merge pull request #129 from vibe-d/run_worker_task_h
Implement runWorkerTaskDistH.
This commit is contained in:
commit
5437b9ecb6
|
@ -586,6 +586,21 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
|
|||
}
|
||||
|
||||
|
||||
/** Runs a new asynchronous task in all worker threads and returns the handles.
|
||||
|
||||
`on_handle` is a callble that takes a `Task` as its only argument and is
|
||||
called for every task instance that gets created.
|
||||
|
||||
See_also: `runWorkerTaskDist`
|
||||
*/
|
||||
void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
|
||||
if (is(typeof(*func) == function))
|
||||
{
|
||||
setupWorkerThreads();
|
||||
st_workerPool.runTaskDistH(on_handle, func, args);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Sets up num worker threads.
|
||||
|
||||
|
|
|
@ -176,7 +176,7 @@ shared final class TaskPool {
|
|||
arguments are allowed to be able to guarantee thread-safety.
|
||||
|
||||
The number of tasks started is guaranteed to be equal to
|
||||
`workerThreadCount`.
|
||||
`threadCount`.
|
||||
*/
|
||||
void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
|
||||
if (is(typeof(*func) == function))
|
||||
|
@ -193,6 +193,36 @@ shared final class TaskPool {
|
|||
runTaskDist_unsafe(func, args);
|
||||
}
|
||||
|
||||
/** Runs a new asynchronous task in all worker threads and returns the handles.
|
||||
|
||||
`on_handle` is an alias to a callble that takes a `Task` as its only
|
||||
argument and is called for every task instance that gets created.
|
||||
|
||||
See_also: `runTaskDist`
|
||||
*/
|
||||
void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
|
||||
{
|
||||
// TODO: support non-copyable argument types using .move
|
||||
import std.concurrency : send, receiveOnly;
|
||||
|
||||
auto caller = Task.getThis();
|
||||
|
||||
// workaround to work when called outside of a task
|
||||
if (caller == Task.init) {
|
||||
.runTask({ runTaskDistH(on_handle, func, args); }).join();
|
||||
return;
|
||||
}
|
||||
|
||||
static void call(Task t, FT func, ARGS args) {
|
||||
t.tid.send(Task.getThis());
|
||||
func(args);
|
||||
}
|
||||
runTaskDist(&call, caller, func, args);
|
||||
|
||||
foreach (i; 0 .. this.threadCount)
|
||||
on_handle(receiveOnly!Task);
|
||||
}
|
||||
|
||||
private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args)
|
||||
{
|
||||
*ret = runTaskH!(FT, ARGS)(func, args);
|
||||
|
|
Loading…
Reference in a new issue