Move worker task logic into a new TaskPool class.

This commit is contained in:
Sönke Ludwig 2017-02-22 18:35:51 +01:00
parent f9372446b1
commit 40713db075
3 changed files with 494 additions and 314 deletions

View file

@ -14,6 +14,7 @@ import vibe.core.args;
import vibe.core.concurrency;
import vibe.core.log;
import vibe.core.sync : ManualEvent, createSharedManualEvent;
import vibe.core.taskpool : TaskPool;
import vibe.internal.async;
import vibe.internal.array : FixedRingBuffer;
//import vibe.utils.array;
@ -25,8 +26,9 @@ import std.exception;
import std.functional;
import std.range : empty, front, popFront;
import std.string;
import std.variant;
import std.traits : isFunctionPointer;
import std.typecons : Typedef, Tuple, tuple;
import std.variant;
import core.atomic;
import core.sync.condition;
import core.sync.mutex;
@ -312,13 +314,34 @@ void setIdleHandler(bool delegate() @safe nothrow del)
Note that the maximum size of all args must not exceed `maxTaskParameterSize`.
*/
Task runTask(ARGS...)(void delegate(ARGS) task, ARGS args)
Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
if (is(typeof(CALLABLE.init(ARGS.init))))
{
auto tfi = makeTaskFuncInfo(task, args);
auto tfi = TaskFuncInfo.make(task, args);
return runTask_internal(tfi);
}
private Task runTask_internal(ref TaskFuncInfo tfi)
/**
Runs an asyncronous task that is guaranteed to finish before the caller's
scope is left.
*/
auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args)
{
static struct S {
Task handle;
@disable this(this);
~this()
{
handle.joinUninterruptible();
}
}
return S(runTask(callable, args));
}
package Task runTask_internal(ref TaskFuncInfo tfi)
@safe nothrow {
import std.typecons : Tuple, tuple;
@ -362,19 +385,18 @@ private Task runTask_internal(ref TaskFuncInfo tfi)
able to guarantee thread-safety.
*/
void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runWorkerTask_unsafe(func, args);
setupWorkerThreads();
st_workerPool.runTask(func, args);
}
/// ditto
void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
auto func = &__traits(getMember, object, __traits(identifier, method));
runWorkerTask_unsafe(func, args);
setupWorkerThreads();
st_workerPool.runTask!method(object, args);
}
/**
@ -387,56 +409,17 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS arg
able to guarantee thread-safety.
*/
Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
// workaround for runWorkerTaskH to work when called outside of a task
if (caller == Task.init) {
Task ret;
runTask({ ret = runWorkerTaskH(func, args); }).join();
return ret;
}
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runWorkerTask_unsafe(&taskFun, caller, func, args);
return cast(Task)receiveOnly!PrivateTask();
setupWorkerThreads();
return st_workerPool.runTaskH(func, args);
}
/// ditto
Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
auto func = &__traits(getMember, object, __traits(identifier, method));
alias FT = typeof(func);
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
// workaround for runWorkerTaskH to work when called outside of a task
if (caller == Task.init) {
Task ret;
runTask({ ret = runWorkerTaskH!method(object, args); }).join();
return ret;
}
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runWorkerTask_unsafe(&taskFun, caller, func, args);
return cast(Task)receiveOnly!PrivateTask();
setupWorkerThreads();
return st_workerPool.runTaskH!method(object, args);
}
/// Running a worker task using a function
@ -546,24 +529,6 @@ unittest { // run and join worker task from outside of a task
assert(i == 1);
}
private void runWorkerTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args)
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
import vibe.internal.typetuple;
alias FARGS = ParameterTypeTuple!CALLABLE;
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
setupWorkerThreads();
auto tfi = makeTaskFuncInfo(callable, args);
synchronized (st_threadsMutex) st_workerTasks ~= tfi;
st_threadsSignal.emit();
}
/**
Runs a new asynchronous task in all worker threads concurrently.
@ -578,81 +543,14 @@ private void runWorkerTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS
void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runWorkerTaskDist_unsafe(func, args);
setupWorkerThreads();
return st_workerPool.runTaskDist(func, args);
}
/// ditto
void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
{
auto func = &__traits(getMember, object, __traits(identifier, method));
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runWorkerTaskDist_unsafe(func, args);
}
private void runWorkerTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
import vibe.internal.typetuple;
alias FARGS = ParameterTypeTuple!CALLABLE;
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
setupWorkerThreads();
auto tfi = makeTaskFuncInfo(callable, args);
synchronized (st_threadsMutex) {
foreach (ref ctx; st_threads)
if (ctx.isWorker)
ctx.taskQueue ~= tfi;
}
st_threadsSignal.emit();
}
private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{
import std.algorithm : move;
import std.traits : hasElaborateAssign;
static struct TARGS { ARGS expand; }
static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length);
static assert(TARGS.sizeof <= maxTaskParameterSize,
"The arguments passed to run(Worker)Task must not exceed "~
maxTaskParameterSize.to!string~" bytes in total size.");
static void callDelegate(TaskFuncInfo* tfi) {
assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
// copy original call data to stack
CALLABLE c;
TARGS args;
move(*(cast(CALLABLE*)tfi.callable.ptr), c);
move(*(cast(TARGS*)tfi.args.ptr), args);
// reset the info
tfi.func = null;
// make the call
mixin(callWithMove!ARGS("c", "args.expand"));
}
return () @trusted {
TaskFuncInfo tfi;
tfi.func = &callDelegate;
static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE();
static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS();
tfi.typedCallable!CALLABLE = callable;
foreach (i, A; ARGS) {
static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]);
else tfi.typedArgs!TARGS.expand[i] = args[i];
}
return tfi;
} ();
return st_workerPool.runTaskDist!method(object, args);
}
@ -676,15 +574,8 @@ public void setupWorkerThreads(uint num = logicalProcessorCount())
s_workerThreadsStarted = true;
synchronized (st_threadsMutex) {
if (st_threads.any!(t => t.isWorker))
return;
foreach (i; 0 .. num) {
auto thr = new Thread(&workerThreadFunc);
thr.name = format("vibe-%s", i);
st_threads ~= ThreadContext(thr, true);
thr.start();
}
if (!st_workerPool)
st_workerPool = new shared TaskPool;
}
}
@ -1181,7 +1072,6 @@ package(vibe) void performIdleProcessing()
private struct ThreadContext {
Thread thread;
bool isWorker;
TaskFuncInfo[] taskQueue;
this(Thread thr, bool worker) { this.thread = thr; this.isWorker = worker; }
}
@ -1196,9 +1086,9 @@ private {
bool s_ignoreIdleForGC = false;
__gshared core.sync.mutex.Mutex st_threadsMutex;
shared TaskPool st_workerPool;
shared ManualEvent st_threadsSignal;
__gshared ThreadContext[] st_threads;
__gshared TaskFuncInfo[] st_workerTasks;
__gshared Condition st_threadShutdownCondition;
shared bool st_term = false;
@ -1329,17 +1219,10 @@ shared static ~this()
{
shutdownDriver();
size_t tasks_left;
size_t tasks_left = s_scheduler.scheduledTaskCount;
synchronized (st_threadsMutex) {
if( !st_workerTasks.empty ) tasks_left = st_workerTasks.length;
}
tasks_left += s_scheduler.scheduledTaskCount;
if (tasks_left > 0) {
if (tasks_left > 0)
logWarn("There were still %d tasks running at exit.", tasks_left);
}
}
// per thread setup
@ -1368,20 +1251,19 @@ static ~this()
synchronized (st_threadsMutex) {
auto idx = st_threads.countUntil!(c => c.thread is thisthr);
logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread);
if (is_main_thread) { // we are the main thread, wait for others
atomicStore(st_term, true);
st_threadsSignal.emit();
// wait for all non-daemon threads to shut down
while (st_threads[1 .. $].any!(th => !th.thread.isDaemon)) {
logDiagnostic("Main thread still waiting for other threads: %s",
st_threads[1 .. $].map!(t => t.thread.name ~ (t.isWorker ? " (worker thread)" : "")).join(", "));
st_threadShutdownCondition.wait();
}
if (is_main_thread) {
shared(TaskPool) tpool;
synchronized (st_threadsMutex) swap(tpool, st_workerPool);
logDiagnostic("Main thread still waiting for worker threads.");
tpool.terminate();
logDiagnostic("Main thread exiting");
}
synchronized (st_threadsMutex) {
auto idx = st_threads.countUntil!(c => c.thread is thisthr);
assert(idx >= 0, "No more threads registered");
if (idx >= 0) {
st_threads[idx] = st_threads[$-1];
@ -1405,82 +1287,6 @@ private void shutdownDriver()
eventDriver.dispose();
}
private void workerThreadFunc()
nothrow {
try {
if (getExitFlag()) return;
logDebug("entering worker thread");
runTask(toDelegate(&handleWorkerTasks));
logDebug("running event loop");
if (!getExitFlag()) runEventLoop();
logDebug("Worker thread exit.");
} catch (Exception e) {
scope (failure) exit(-1);
logFatal("Worker thread terminated due to uncaught exception: %s", e.msg);
logDebug("Full error: %s", e.toString().sanitize());
} catch (InvalidMemoryOperationError e) {
import std.stdio;
scope(failure) assert(false);
writeln("Error message: ", e.msg);
writeln("Full error: ", e.toString().sanitize());
exit(-1);
} catch (Throwable th) {
logFatal("Worker thread terminated due to uncaught error: %s", th.msg);
logDebug("Full error: %s", th.toString().sanitize());
exit(-1);
}
}
private void handleWorkerTasks()
nothrow {
logDebug("worker thread enter");
auto thisthr = Thread.getThis();
logDebug("worker thread loop enter");
while (true) {
auto emit_count = st_threadsSignal.emitCount;
TaskFuncInfo task;
bool processTask() nothrow {
auto idx = st_threads.countUntil!(c => c.thread is thisthr);
assert(idx >= 0, "Worker thread not in st_threads array!?");
logDebug("worker thread check");
if (getExitFlag()) {
if (st_threads[idx].taskQueue.length > 0)
logWarn("Worker thread shuts down with specific worker tasks left in its queue.");
if (st_threads.count!(c => c.isWorker) == 1 && st_workerTasks.length > 0)
logWarn("Worker threads shut down with worker tasks still left in the queue.");
return false;
}
if (!st_workerTasks.empty) {
logDebug("worker thread got task");
task = st_workerTasks.front;
st_workerTasks.popFront();
} else if (!st_threads[idx].taskQueue.empty) {
logDebug("worker thread got specific task");
task = st_threads[idx].taskQueue.front;
st_threads[idx].taskQueue.popFront();
}
return true;
}
{
scope (failure) assert(false);
synchronized (st_threadsMutex)
if (!processTask())
break;
}
if (task.func !is null) runTask_internal(task);
else emit_count = st_threadsSignal.waitUninterruptible(emit_count);
}
logDebug("worker thread exit");
eventDriver.core.exit();
}
private void watchExitFlag()
{
@ -1567,61 +1373,3 @@ version(Posix)
assert(false);
}
}
// mixin string helper to call a function with arguments that potentially have
// to be moved
private string callWithMove(ARGS...)(string func, string args)
{
import std.string;
string ret = func ~ "(";
foreach (i, T; ARGS) {
if (i > 0) ret ~= ", ";
ret ~= format("%s[%s]", args, i);
static if (needsMove!T) ret ~= ".move";
}
return ret ~ ");";
}
private template needsMove(T)
{
template isCopyable(T)
{
enum isCopyable = __traits(compiles, (T a) { return a; });
}
template isMoveable(T)
{
enum isMoveable = __traits(compiles, (T a) { return a.move; });
}
enum needsMove = !isCopyable!T;
static assert(isCopyable!T || isMoveable!T,
"Non-copyable type "~T.stringof~" must be movable with a .move property.");
}
unittest {
enum E { a, move }
static struct S {
@disable this(this);
@property S move() { return S.init; }
}
static struct T { @property T move() { return T.init; } }
static struct U { }
static struct V {
@disable this();
@disable this(this);
@property V move() { return V.init; }
}
static struct W { @disable this(); }
static assert(needsMove!S);
static assert(!needsMove!int);
static assert(!needsMove!string);
static assert(!needsMove!E);
static assert(!needsMove!T);
static assert(!needsMove!U);
static assert(needsMove!V);
static assert(!needsMove!W);
}

View file

@ -107,6 +107,13 @@ struct Task {
Base64.encode(md.finish()[0 .. 3], dst);
if (!this.running) dst.put("-fin");
}
string getDebugID()
@trusted {
import std.array : appender;
auto app = appender!string;
getDebugID(app);
return app.data;
}
bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
@ -338,6 +345,7 @@ final package class TaskFiber : Fiber {
while (true) {
while (!m_taskFunc.func) {
try {
logTrace("putting fiber to sleep waiting for new task...");
Fiber.yield();
} catch (Exception e) {
logWarn("CoreTaskFiber was resumed with exception but without active task!");
@ -427,11 +435,12 @@ final package class TaskFiber : Fiber {
void join(bool interruptiple)(size_t task_counter)
@trusted {
auto cnt = m_onExit.emitCount;
while (m_running && m_taskCounter == task_counter)
while (m_running && m_taskCounter == task_counter) {
static if (interruptiple)
cnt = m_onExit.wait(1.seconds, cnt);
cnt = m_onExit.wait(cnt);
else
cnt = m_onExit.waitUninterruptible(1.seconds, cnt);
cnt = m_onExit.waitUninterruptible(cnt);
}
}
/** Throws an InterruptExeption within the task as soon as it calls an interruptible function.
@ -480,8 +489,52 @@ final package class TaskFiber : Fiber {
package struct TaskFuncInfo {
void function(TaskFuncInfo*) func;
void[2*size_t.sizeof] callable = void;
void[maxTaskParameterSize] args = void;
void[2*size_t.sizeof] callable;
void[maxTaskParameterSize] args;
static TaskFuncInfo make(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{
import std.algorithm : move;
import std.traits : hasElaborateAssign;
static struct TARGS { ARGS expand; }
static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length);
static assert(TARGS.sizeof <= maxTaskParameterSize,
"The arguments passed to run(Worker)Task must not exceed "~
maxTaskParameterSize.to!string~" bytes in total size.");
static void callDelegate(TaskFuncInfo* tfi) {
assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
// copy original call data to stack
CALLABLE c;
TARGS args;
move(*(cast(CALLABLE*)tfi.callable.ptr), c);
move(*(cast(TARGS*)tfi.args.ptr), args);
// reset the info
tfi.func = null;
// make the call
mixin(callWithMove!ARGS("c", "args.expand"));
}
return () @trusted {
TaskFuncInfo tfi;
tfi.func = &callDelegate;
static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE();
static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS();
tfi.typedCallable!CALLABLE = callable;
foreach (i, A; ARGS) {
static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]);
else tfi.typedArgs!TARGS.expand[i] = args[i];
}
return tfi;
} ();
}
@property ref C typedCallable(C)()
{
@ -678,7 +731,9 @@ package struct TaskScheduler {
auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
if (thist == Task.init) {
logTrace("switch to task from global context");
resumeTask(t);
logTrace("task yielded control back to global context");
} else {
auto tf = () @trusted { return t.taskFiber; } ();
auto thistf = () @trusted { return thist.taskFiber; } ();
@ -690,7 +745,7 @@ package struct TaskScheduler {
assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
}
logTrace("Switching tasks");
logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
m_taskQueue.insertFront(thistf);
m_taskQueue.insertFront(tf);
doYield(thist);
@ -725,7 +780,9 @@ package struct TaskScheduler {
while (m_taskQueue.front !is m_markerTask) {
auto t = m_taskQueue.front;
m_taskQueue.popFront();
logTrace("resuming task");
resumeTask(t.task);
logTrace("task out");
assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?");
if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode
@ -734,6 +791,8 @@ package struct TaskScheduler {
// remove marker task
m_taskQueue.popFront();
logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length);
return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy;
}
@ -742,7 +801,9 @@ package struct TaskScheduler {
{
import std.encoding : sanitize;
logTrace("task fiber resume");
auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } ();
logTrace("task fiber yielded");
if (uncaught_exception) {
auto th = cast(Throwable)uncaught_exception;
@ -850,3 +911,59 @@ private struct FLSInfo {
}
}
// mixin string helper to call a function with arguments that potentially have
// to be moved
package string callWithMove(ARGS...)(string func, string args)
{
import std.string;
string ret = func ~ "(";
foreach (i, T; ARGS) {
if (i > 0) ret ~= ", ";
ret ~= format("%s[%s]", args, i);
static if (needsMove!T) ret ~= ".move";
}
return ret ~ ");";
}
private template needsMove(T)
{
template isCopyable(T)
{
enum isCopyable = __traits(compiles, (T a) { return a; });
}
template isMoveable(T)
{
enum isMoveable = __traits(compiles, (T a) { return a.move; });
}
enum needsMove = !isCopyable!T;
static assert(isCopyable!T || isMoveable!T,
"Non-copyable type "~T.stringof~" must be movable with a .move property.");
}
unittest {
enum E { a, move }
static struct S {
@disable this(this);
@property S move() { return S.init; }
}
static struct T { @property T move() { return T.init; } }
static struct U { }
static struct V {
@disable this();
@disable this(this);
@property V move() { return V.init; }
}
static struct W { @disable this(); }
static assert(needsMove!S);
static assert(!needsMove!int);
static assert(!needsMove!string);
static assert(!needsMove!E);
static assert(!needsMove!T);
static assert(!needsMove!U);
static assert(needsMove!V);
static assert(!needsMove!W);
}

315
source/vibe/core/taskpool.d Normal file
View file

@ -0,0 +1,315 @@
/**
Multi-threaded task pool implementation.
Copyright: © 2012-2017 RejectedSoftware e.K.
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig
*/
module vibe.core.taskpool;
import vibe.core.concurrency : isWeaklyIsolated;
import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
import vibe.core.log;
import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor;
import vibe.core.task : Task, TaskFuncInfo, callWithMove;
import core.sync.mutex : Mutex;
import core.thread : Thread;
import std.concurrency : prioritySend, receiveOnly;
import std.traits : isFunctionPointer;
/** Implements a shared, multi-threaded task pool.
*/
shared class TaskPool {
private {
struct State {
WorkerThread[] threads;
TaskQueue queue;
bool term;
}
vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state;
shared(ManualEvent) m_signal;
}
/** Creates a new task pool with the specified number of threads.
Params:
thread_count: The number of worker threads to create
*/
this(size_t thread_count = logicalProcessorCount())
@safe {
import std.format : format;
m_signal = createSharedManualEvent();
with (m_state.lock) {
threads.length = thread_count;
foreach (i; 0 .. thread_count) {
WorkerThread thr;
() @trusted {
thr = new WorkerThread(this);
thr.name = format("vibe-%s", i);
thr.start();
} ();
threads[i] = thr;
}
}
}
/** Instructs all worker threads to terminate and waits until all have
finished.
*/
void terminate()
@safe nothrow {
m_state.lock.term = true;
m_signal.emit();
auto ec = m_signal.emitCount;
while (m_state.lock.threads.length > 0)
ec = m_signal.waitUninterruptible(ec);
size_t cnt = m_state.lock.queue.tasks.length;
if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt);
}
/** Instructs all worker threads to terminate as soon as all tasks have
been processed and waits for them to finish.
*/
void join()
@safe nothrow {
assert(false, "TODO!");
}
/** Runs a new asynchronous task in a worker thread.
Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread-safety.
*/
void runTask(FT, ARGS...)(FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTask_unsafe(func, args);
}
/// ditto
void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
auto func = &__traits(getMember, object, __traits(identifier, method));
runTask_unsafe(func, args);
}
/** Runs a new asynchronous task in a worker thread, returning the task handle.
This function will yield and wait for the new task to be created and started
in the worker thread, then resume and return it.
Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread-safety.
*/
Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
import std.typecons : Typedef;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
// workaround for runWorkerTaskH to work when called outside of a task
if (caller == Task.init) {
Task ret;
.runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join();
return ret;
}
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
logInfo("SEND H");
caller.tid.prioritySend(callee);
logInfo("SENT H");
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
return cast(Task)receiveOnly!PrivateTask();
}
/// ditto
Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
static void wrapper()(shared(T) object, ref ARGS args) {
__traits(getMember, object, __traits(identifier, method))(args);
}
return runTaskH(&wrapper!(), object, args);
}
/** Runs a new asynchronous task in all worker threads concurrently.
This function is mainly useful for long-living tasks that distribute their
work across all CPU cores. Only function pointers with weakly isolated
arguments are allowed to be able to guarantee thread-safety.
The number of tasks started is guaranteed to be equal to
`workerThreadCount`.
*/
void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(func, args);
}
/// ditto
void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
{
auto func = &__traits(getMember, object, __traits(identifier, method));
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(func, args);
}
private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args)
{
*ret = runTaskH!(FT, ARGS)(func, args);
}
private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args)
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
import vibe.internal.typetuple;
alias FARGS = ParameterTypeTuple!CALLABLE;
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
auto tfi = TaskFuncInfo.make(callable, args);
m_state.lock.queue.put(tfi);
m_signal.emitSingle();
}
private void runTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types!
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
import vibe.internal.typetuple;
alias FARGS = ParameterTypeTuple!CALLABLE;
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
foreach (thr; m_state.lock.threads) {
// create one TFI per thread to properly acocunt for elaborate assignment operators/postblit
auto tfi = TaskFuncInfo.make(callable, args);
thr.m_queue.put(tfi);
}
m_signal.emit();
}
}
private class WorkerThread : Thread {
private {
shared(TaskPool) m_pool;
TaskQueue m_queue;
}
this(shared(TaskPool) pool)
{
m_pool = pool;
super(&main);
}
private void main()
nothrow {
import core.stdc.stdlib : exit;
import core.exception : InvalidMemoryOperationError;
import std.encoding : sanitize;
try {
if (m_pool.m_state.lock.term) return;
logDebug("entering worker thread");
runTask(&handleWorkerTasks);
logDebug("running event loop");
if (!m_pool.m_state.lock.term) runEventLoop();
logDebug("Worker thread exit.");
} catch (Exception e) {
scope (failure) exit(-1);
logFatal("Worker thread terminated due to uncaught exception: %s", e.msg);
logDebug("Full error: %s", e.toString().sanitize());
} catch (InvalidMemoryOperationError e) {
import std.stdio;
scope(failure) assert(false);
writeln("Error message: ", e.msg);
writeln("Full error: ", e.toString().sanitize());
exit(-1);
} catch (Throwable th) {
logFatal("Worker thread terminated due to uncaught error: %s", th.msg);
logDebug("Full error: %s", th.toString().sanitize());
exit(-1);
}
}
private void handleWorkerTasks()
nothrow @safe {
import std.algorithm.iteration : filter;
import std.algorithm.searching : count;
import std.array : array;
logDebug("worker thread enter");
TaskFuncInfo taskfunc;
while(true){
auto emit_count = m_pool.m_signal.emitCount;
with (m_pool.m_state.lock) {
logDebug("worker thread check");
if (term) break;
if (m_queue.consume(taskfunc)) {
logDebug("worker thread got specific task");
} else if (queue.consume(taskfunc)) {
logDebug("worker thread got specific task");
}
}
if (taskfunc.func !is null) {
.runTask_internal(taskfunc);
taskfunc.func = null;
}
else emit_count = m_pool.m_signal.waitUninterruptible(emit_count);
}
logDebug("worker thread exit");
if (!m_queue.empty)
logWarn("Worker thread shuts down with specific worker tasks left in its queue.");
with (m_pool.m_state.lock) {
threads = threads.filter!(t => t !is this).array;
if (threads.length > 0 && !queue.empty)
logWarn("Worker threads shut down with worker tasks still left in the queue.");
}
m_pool.m_signal.emit();
exitEventLoop();
}
}
private struct TaskQueue {
nothrow @safe:
// FIXME: use a more efficient storage!
TaskFuncInfo[] tasks;
@property bool empty() const { return tasks.length == 0; }
void put(ref TaskFuncInfo tfi) { tasks ~= tfi; }
bool consume(ref TaskFuncInfo tfi)
{
if (tasks.length == 0) return false;
tfi = tasks[0];
tasks = tasks[1 .. $];
return true;
}
}