From 40713db075d794b620dcd03df854e23ebbb897c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 22 Feb 2017 18:35:51 +0100 Subject: [PATCH] Move worker task logic into a new TaskPool class. --- source/vibe/core/core.d | 364 ++++++------------------------------ source/vibe/core/task.d | 129 ++++++++++++- source/vibe/core/taskpool.d | 315 +++++++++++++++++++++++++++++++ 3 files changed, 494 insertions(+), 314 deletions(-) create mode 100644 source/vibe/core/taskpool.d diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index c720597..632cccd 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -14,6 +14,7 @@ import vibe.core.args; import vibe.core.concurrency; import vibe.core.log; import vibe.core.sync : ManualEvent, createSharedManualEvent; +import vibe.core.taskpool : TaskPool; import vibe.internal.async; import vibe.internal.array : FixedRingBuffer; //import vibe.utils.array; @@ -25,8 +26,9 @@ import std.exception; import std.functional; import std.range : empty, front, popFront; import std.string; -import std.variant; +import std.traits : isFunctionPointer; import std.typecons : Typedef, Tuple, tuple; +import std.variant; import core.atomic; import core.sync.condition; import core.sync.mutex; @@ -312,13 +314,34 @@ void setIdleHandler(bool delegate() @safe nothrow del) Note that the maximum size of all args must not exceed `maxTaskParameterSize`. */ -Task runTask(ARGS...)(void delegate(ARGS) task, ARGS args) +Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) + if (is(typeof(CALLABLE.init(ARGS.init)))) { - auto tfi = makeTaskFuncInfo(task, args); + auto tfi = TaskFuncInfo.make(task, args); return runTask_internal(tfi); } -private Task runTask_internal(ref TaskFuncInfo tfi) +/** + Runs an asyncronous task that is guaranteed to finish before the caller's + scope is left. +*/ +auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args) +{ + static struct S { + Task handle; + + @disable this(this); + + ~this() + { + handle.joinUninterruptible(); + } + } + + return S(runTask(callable, args)); +} + +package Task runTask_internal(ref TaskFuncInfo tfi) @safe nothrow { import std.typecons : Tuple, tuple; @@ -362,19 +385,18 @@ private Task runTask_internal(ref TaskFuncInfo tfi) able to guarantee thread-safety. */ void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args) - if (is(typeof(*func) == function)) + if (isFunctionPointer!FT) { - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runWorkerTask_unsafe(func, args); + setupWorkerThreads(); + st_workerPool.runTask(func, args); } /// ditto void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) { - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - auto func = &__traits(getMember, object, __traits(identifier, method)); - runWorkerTask_unsafe(func, args); + setupWorkerThreads(); + st_workerPool.runTask!method(object, args); } /** @@ -387,56 +409,17 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS arg able to guarantee thread-safety. */ Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args) - if (is(typeof(*func) == function)) + if (isFunctionPointer!FT) { - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - - alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); - Task caller = Task.getThis(); - - // workaround for runWorkerTaskH to work when called outside of a task - if (caller == Task.init) { - Task ret; - runTask({ ret = runWorkerTaskH(func, args); }).join(); - return ret; - } - - assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); - static void taskFun(Task caller, FT func, ARGS args) { - PrivateTask callee = Task.getThis(); - caller.prioritySend(callee); - mixin(callWithMove!ARGS("func", "args")); - } - runWorkerTask_unsafe(&taskFun, caller, func, args); - return cast(Task)receiveOnly!PrivateTask(); + setupWorkerThreads(); + return st_workerPool.runTaskH(func, args); } /// ditto Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) { - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - - auto func = &__traits(getMember, object, __traits(identifier, method)); - alias FT = typeof(func); - - alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); - Task caller = Task.getThis(); - - // workaround for runWorkerTaskH to work when called outside of a task - if (caller == Task.init) { - Task ret; - runTask({ ret = runWorkerTaskH!method(object, args); }).join(); - return ret; - } - - assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); - static void taskFun(Task caller, FT func, ARGS args) { - PrivateTask callee = Task.getThis(); - caller.prioritySend(callee); - mixin(callWithMove!ARGS("func", "args")); - } - runWorkerTask_unsafe(&taskFun, caller, func, args); - return cast(Task)receiveOnly!PrivateTask(); + setupWorkerThreads(); + return st_workerPool.runTaskH!method(object, args); } /// Running a worker task using a function @@ -546,24 +529,6 @@ unittest { // run and join worker task from outside of a task assert(i == 1); } -private void runWorkerTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) -{ - import std.traits : ParameterTypeTuple; - import vibe.internal.traits : areConvertibleTo; - import vibe.internal.typetuple; - - alias FARGS = ParameterTypeTuple!CALLABLE; - static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), - "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); - - setupWorkerThreads(); - - auto tfi = makeTaskFuncInfo(callable, args); - - synchronized (st_threadsMutex) st_workerTasks ~= tfi; - st_threadsSignal.emit(); -} - /** Runs a new asynchronous task in all worker threads concurrently. @@ -578,81 +543,14 @@ private void runWorkerTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) if (is(typeof(*func) == function)) { - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runWorkerTaskDist_unsafe(func, args); + setupWorkerThreads(); + return st_workerPool.runTaskDist(func, args); } /// ditto void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) { - auto func = &__traits(getMember, object, __traits(identifier, method)); - foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - - runWorkerTaskDist_unsafe(func, args); -} - -private void runWorkerTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) -{ - import std.traits : ParameterTypeTuple; - import vibe.internal.traits : areConvertibleTo; - import vibe.internal.typetuple; - - alias FARGS = ParameterTypeTuple!CALLABLE; - static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), - "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); - setupWorkerThreads(); - - auto tfi = makeTaskFuncInfo(callable, args); - - synchronized (st_threadsMutex) { - foreach (ref ctx; st_threads) - if (ctx.isWorker) - ctx.taskQueue ~= tfi; - } - st_threadsSignal.emit(); -} - -private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) -{ - import std.algorithm : move; - import std.traits : hasElaborateAssign; - - static struct TARGS { ARGS expand; } - - static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); - static assert(TARGS.sizeof <= maxTaskParameterSize, - "The arguments passed to run(Worker)Task must not exceed "~ - maxTaskParameterSize.to!string~" bytes in total size."); - - static void callDelegate(TaskFuncInfo* tfi) { - assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); - - // copy original call data to stack - CALLABLE c; - TARGS args; - move(*(cast(CALLABLE*)tfi.callable.ptr), c); - move(*(cast(TARGS*)tfi.args.ptr), args); - - // reset the info - tfi.func = null; - - // make the call - mixin(callWithMove!ARGS("c", "args.expand")); - } - - return () @trusted { - TaskFuncInfo tfi; - tfi.func = &callDelegate; - - static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); - static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); - tfi.typedCallable!CALLABLE = callable; - foreach (i, A; ARGS) { - static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); - else tfi.typedArgs!TARGS.expand[i] = args[i]; - } - return tfi; - } (); + return st_workerPool.runTaskDist!method(object, args); } @@ -676,15 +574,8 @@ public void setupWorkerThreads(uint num = logicalProcessorCount()) s_workerThreadsStarted = true; synchronized (st_threadsMutex) { - if (st_threads.any!(t => t.isWorker)) - return; - - foreach (i; 0 .. num) { - auto thr = new Thread(&workerThreadFunc); - thr.name = format("vibe-%s", i); - st_threads ~= ThreadContext(thr, true); - thr.start(); - } + if (!st_workerPool) + st_workerPool = new shared TaskPool; } } @@ -1181,7 +1072,6 @@ package(vibe) void performIdleProcessing() private struct ThreadContext { Thread thread; bool isWorker; - TaskFuncInfo[] taskQueue; this(Thread thr, bool worker) { this.thread = thr; this.isWorker = worker; } } @@ -1196,9 +1086,9 @@ private { bool s_ignoreIdleForGC = false; __gshared core.sync.mutex.Mutex st_threadsMutex; + shared TaskPool st_workerPool; shared ManualEvent st_threadsSignal; __gshared ThreadContext[] st_threads; - __gshared TaskFuncInfo[] st_workerTasks; __gshared Condition st_threadShutdownCondition; shared bool st_term = false; @@ -1329,17 +1219,10 @@ shared static ~this() { shutdownDriver(); - size_t tasks_left; + size_t tasks_left = s_scheduler.scheduledTaskCount; - synchronized (st_threadsMutex) { - if( !st_workerTasks.empty ) tasks_left = st_workerTasks.length; - } - - tasks_left += s_scheduler.scheduledTaskCount; - - if (tasks_left > 0) { + if (tasks_left > 0) logWarn("There were still %d tasks running at exit.", tasks_left); - } } // per thread setup @@ -1368,20 +1251,19 @@ static ~this() synchronized (st_threadsMutex) { auto idx = st_threads.countUntil!(c => c.thread is thisthr); - logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread); - if (is_main_thread) { // we are the main thread, wait for others - atomicStore(st_term, true); - st_threadsSignal.emit(); - // wait for all non-daemon threads to shut down - while (st_threads[1 .. $].any!(th => !th.thread.isDaemon)) { - logDiagnostic("Main thread still waiting for other threads: %s", - st_threads[1 .. $].map!(t => t.thread.name ~ (t.isWorker ? " (worker thread)" : "")).join(", ")); - st_threadShutdownCondition.wait(); - } - logDiagnostic("Main thread exiting"); - } + } + if (is_main_thread) { + shared(TaskPool) tpool; + synchronized (st_threadsMutex) swap(tpool, st_workerPool); + logDiagnostic("Main thread still waiting for worker threads."); + tpool.terminate(); + logDiagnostic("Main thread exiting"); + } + + synchronized (st_threadsMutex) { + auto idx = st_threads.countUntil!(c => c.thread is thisthr); assert(idx >= 0, "No more threads registered"); if (idx >= 0) { st_threads[idx] = st_threads[$-1]; @@ -1405,82 +1287,6 @@ private void shutdownDriver() eventDriver.dispose(); } -private void workerThreadFunc() -nothrow { - try { - if (getExitFlag()) return; - logDebug("entering worker thread"); - runTask(toDelegate(&handleWorkerTasks)); - logDebug("running event loop"); - if (!getExitFlag()) runEventLoop(); - logDebug("Worker thread exit."); - } catch (Exception e) { - scope (failure) exit(-1); - logFatal("Worker thread terminated due to uncaught exception: %s", e.msg); - logDebug("Full error: %s", e.toString().sanitize()); - } catch (InvalidMemoryOperationError e) { - import std.stdio; - scope(failure) assert(false); - writeln("Error message: ", e.msg); - writeln("Full error: ", e.toString().sanitize()); - exit(-1); - } catch (Throwable th) { - logFatal("Worker thread terminated due to uncaught error: %s", th.msg); - logDebug("Full error: %s", th.toString().sanitize()); - exit(-1); - } -} - -private void handleWorkerTasks() -nothrow { - logDebug("worker thread enter"); - - auto thisthr = Thread.getThis(); - - logDebug("worker thread loop enter"); - while (true) { - auto emit_count = st_threadsSignal.emitCount; - TaskFuncInfo task; - - bool processTask() nothrow { - auto idx = st_threads.countUntil!(c => c.thread is thisthr); - assert(idx >= 0, "Worker thread not in st_threads array!?"); - logDebug("worker thread check"); - - if (getExitFlag()) { - if (st_threads[idx].taskQueue.length > 0) - logWarn("Worker thread shuts down with specific worker tasks left in its queue."); - if (st_threads.count!(c => c.isWorker) == 1 && st_workerTasks.length > 0) - logWarn("Worker threads shut down with worker tasks still left in the queue."); - return false; - } - - if (!st_workerTasks.empty) { - logDebug("worker thread got task"); - task = st_workerTasks.front; - st_workerTasks.popFront(); - } else if (!st_threads[idx].taskQueue.empty) { - logDebug("worker thread got specific task"); - task = st_threads[idx].taskQueue.front; - st_threads[idx].taskQueue.popFront(); - } - return true; - } - - { - scope (failure) assert(false); - synchronized (st_threadsMutex) - if (!processTask()) - break; - } - - if (task.func !is null) runTask_internal(task); - else emit_count = st_threadsSignal.waitUninterruptible(emit_count); - } - - logDebug("worker thread exit"); - eventDriver.core.exit(); -} private void watchExitFlag() { @@ -1567,61 +1373,3 @@ version(Posix) assert(false); } } - - -// mixin string helper to call a function with arguments that potentially have -// to be moved -private string callWithMove(ARGS...)(string func, string args) -{ - import std.string; - string ret = func ~ "("; - foreach (i, T; ARGS) { - if (i > 0) ret ~= ", "; - ret ~= format("%s[%s]", args, i); - static if (needsMove!T) ret ~= ".move"; - } - return ret ~ ");"; -} - -private template needsMove(T) -{ - template isCopyable(T) - { - enum isCopyable = __traits(compiles, (T a) { return a; }); - } - - template isMoveable(T) - { - enum isMoveable = __traits(compiles, (T a) { return a.move; }); - } - - enum needsMove = !isCopyable!T; - - static assert(isCopyable!T || isMoveable!T, - "Non-copyable type "~T.stringof~" must be movable with a .move property."); -} - -unittest { - enum E { a, move } - static struct S { - @disable this(this); - @property S move() { return S.init; } - } - static struct T { @property T move() { return T.init; } } - static struct U { } - static struct V { - @disable this(); - @disable this(this); - @property V move() { return V.init; } - } - static struct W { @disable this(); } - - static assert(needsMove!S); - static assert(!needsMove!int); - static assert(!needsMove!string); - static assert(!needsMove!E); - static assert(!needsMove!T); - static assert(!needsMove!U); - static assert(needsMove!V); - static assert(!needsMove!W); -} diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 441be5e..94d324c 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -107,6 +107,13 @@ struct Task { Base64.encode(md.finish()[0 .. 3], dst); if (!this.running) dst.put("-fin"); } + string getDebugID() + @trusted { + import std.array : appender; + auto app = appender!string; + getDebugID(app); + return app.data; + } bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } @@ -338,6 +345,7 @@ final package class TaskFiber : Fiber { while (true) { while (!m_taskFunc.func) { try { + logTrace("putting fiber to sleep waiting for new task..."); Fiber.yield(); } catch (Exception e) { logWarn("CoreTaskFiber was resumed with exception but without active task!"); @@ -427,11 +435,12 @@ final package class TaskFiber : Fiber { void join(bool interruptiple)(size_t task_counter) @trusted { auto cnt = m_onExit.emitCount; - while (m_running && m_taskCounter == task_counter) + while (m_running && m_taskCounter == task_counter) { static if (interruptiple) - cnt = m_onExit.wait(1.seconds, cnt); + cnt = m_onExit.wait(cnt); else - cnt = m_onExit.waitUninterruptible(1.seconds, cnt); + cnt = m_onExit.waitUninterruptible(cnt); + } } /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. @@ -480,8 +489,52 @@ final package class TaskFiber : Fiber { package struct TaskFuncInfo { void function(TaskFuncInfo*) func; - void[2*size_t.sizeof] callable = void; - void[maxTaskParameterSize] args = void; + void[2*size_t.sizeof] callable; + void[maxTaskParameterSize] args; + + static TaskFuncInfo make(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) + { + import std.algorithm : move; + import std.traits : hasElaborateAssign; + + static struct TARGS { ARGS expand; } + + static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); + static assert(TARGS.sizeof <= maxTaskParameterSize, + "The arguments passed to run(Worker)Task must not exceed "~ + maxTaskParameterSize.to!string~" bytes in total size."); + + static void callDelegate(TaskFuncInfo* tfi) { + assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); + + // copy original call data to stack + CALLABLE c; + TARGS args; + move(*(cast(CALLABLE*)tfi.callable.ptr), c); + move(*(cast(TARGS*)tfi.args.ptr), args); + + // reset the info + tfi.func = null; + + // make the call + mixin(callWithMove!ARGS("c", "args.expand")); + } + + return () @trusted { + TaskFuncInfo tfi; + tfi.func = &callDelegate; + + static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); + static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); + tfi.typedCallable!CALLABLE = callable; + foreach (i, A; ARGS) { + static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); + else tfi.typedArgs!TARGS.expand[i] = args[i]; + } + return tfi; + } (); + } + @property ref C typedCallable(C)() { @@ -678,7 +731,9 @@ package struct TaskScheduler { auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } (); assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); if (thist == Task.init) { + logTrace("switch to task from global context"); resumeTask(t); + logTrace("task yielded control back to global context"); } else { auto tf = () @trusted { return t.taskFiber; } (); auto thistf = () @trusted { return thist.taskFiber; } (); @@ -690,7 +745,7 @@ package struct TaskScheduler { assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); } - logTrace("Switching tasks"); + logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); m_taskQueue.insertFront(thistf); m_taskQueue.insertFront(tf); doYield(thist); @@ -725,7 +780,9 @@ package struct TaskScheduler { while (m_taskQueue.front !is m_markerTask) { auto t = m_taskQueue.front; m_taskQueue.popFront(); + logTrace("resuming task"); resumeTask(t.task); + logTrace("task out"); assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?"); if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode @@ -734,6 +791,8 @@ package struct TaskScheduler { // remove marker task m_taskQueue.popFront(); + logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length); + return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; } @@ -742,7 +801,9 @@ package struct TaskScheduler { { import std.encoding : sanitize; + logTrace("task fiber resume"); auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } (); + logTrace("task fiber yielded"); if (uncaught_exception) { auto th = cast(Throwable)uncaught_exception; @@ -850,3 +911,59 @@ private struct FLSInfo { } } +// mixin string helper to call a function with arguments that potentially have +// to be moved +package string callWithMove(ARGS...)(string func, string args) +{ + import std.string; + string ret = func ~ "("; + foreach (i, T; ARGS) { + if (i > 0) ret ~= ", "; + ret ~= format("%s[%s]", args, i); + static if (needsMove!T) ret ~= ".move"; + } + return ret ~ ");"; +} + +private template needsMove(T) +{ + template isCopyable(T) + { + enum isCopyable = __traits(compiles, (T a) { return a; }); + } + + template isMoveable(T) + { + enum isMoveable = __traits(compiles, (T a) { return a.move; }); + } + + enum needsMove = !isCopyable!T; + + static assert(isCopyable!T || isMoveable!T, + "Non-copyable type "~T.stringof~" must be movable with a .move property."); +} + +unittest { + enum E { a, move } + static struct S { + @disable this(this); + @property S move() { return S.init; } + } + static struct T { @property T move() { return T.init; } } + static struct U { } + static struct V { + @disable this(); + @disable this(this); + @property V move() { return V.init; } + } + static struct W { @disable this(); } + + static assert(needsMove!S); + static assert(!needsMove!int); + static assert(!needsMove!string); + static assert(!needsMove!E); + static assert(!needsMove!T); + static assert(!needsMove!U); + static assert(needsMove!V); + static assert(!needsMove!W); +} diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d new file mode 100644 index 0000000..7a40129 --- /dev/null +++ b/source/vibe/core/taskpool.d @@ -0,0 +1,315 @@ +/** + Multi-threaded task pool implementation. + + Copyright: © 2012-2017 RejectedSoftware e.K. + License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. + Authors: Sönke Ludwig +*/ +module vibe.core.taskpool; + +import vibe.core.concurrency : isWeaklyIsolated; +import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; +import vibe.core.log; +import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor; +import vibe.core.task : Task, TaskFuncInfo, callWithMove; +import core.sync.mutex : Mutex; +import core.thread : Thread; +import std.concurrency : prioritySend, receiveOnly; +import std.traits : isFunctionPointer; + + +/** Implements a shared, multi-threaded task pool. +*/ +shared class TaskPool { + private { + struct State { + WorkerThread[] threads; + TaskQueue queue; + bool term; + } + vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state; + shared(ManualEvent) m_signal; + } + + /** Creates a new task pool with the specified number of threads. + + Params: + thread_count: The number of worker threads to create + */ + this(size_t thread_count = logicalProcessorCount()) + @safe { + import std.format : format; + + m_signal = createSharedManualEvent(); + + with (m_state.lock) { + threads.length = thread_count; + foreach (i; 0 .. thread_count) { + WorkerThread thr; + () @trusted { + thr = new WorkerThread(this); + thr.name = format("vibe-%s", i); + thr.start(); + } (); + threads[i] = thr; + } + } + } + + /** Instructs all worker threads to terminate and waits until all have + finished. + */ + void terminate() + @safe nothrow { + m_state.lock.term = true; + m_signal.emit(); + + auto ec = m_signal.emitCount; + while (m_state.lock.threads.length > 0) + ec = m_signal.waitUninterruptible(ec); + + size_t cnt = m_state.lock.queue.tasks.length; + if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt); + } + + /** Instructs all worker threads to terminate as soon as all tasks have + been processed and waits for them to finish. + */ + void join() + @safe nothrow { + assert(false, "TODO!"); + } + + /** Runs a new asynchronous task in a worker thread. + + Only function pointers with weakly isolated arguments are allowed to be + able to guarantee thread-safety. + */ + void runTask(FT, ARGS...)(FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTask_unsafe(func, args); + } + + /// ditto + void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + auto func = &__traits(getMember, object, __traits(identifier, method)); + runTask_unsafe(func, args); + } + + /** Runs a new asynchronous task in a worker thread, returning the task handle. + + This function will yield and wait for the new task to be created and started + in the worker thread, then resume and return it. + + Only function pointers with weakly isolated arguments are allowed to be + able to guarantee thread-safety. + */ + Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + import std.typecons : Typedef; + + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); + Task caller = Task.getThis(); + + // workaround for runWorkerTaskH to work when called outside of a task + if (caller == Task.init) { + Task ret; + .runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join(); + return ret; + } + + assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); + static void taskFun(Task caller, FT func, ARGS args) { + PrivateTask callee = Task.getThis(); + logInfo("SEND H"); + caller.tid.prioritySend(callee); + logInfo("SENT H"); + mixin(callWithMove!ARGS("func", "args")); + } + runTask_unsafe(&taskFun, caller, func, args); + return cast(Task)receiveOnly!PrivateTask(); + } + /// ditto + Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + static void wrapper()(shared(T) object, ref ARGS args) { + __traits(getMember, object, __traits(identifier, method))(args); + } + return runTaskH(&wrapper!(), object, args); + } + + + /** Runs a new asynchronous task in all worker threads concurrently. + + This function is mainly useful for long-living tasks that distribute their + work across all CPU cores. Only function pointers with weakly isolated + arguments are allowed to be able to guarantee thread-safety. + + The number of tasks started is guaranteed to be equal to + `workerThreadCount`. + */ + void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTaskDist_unsafe(func, args); + } + /// ditto + void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) + { + auto func = &__traits(getMember, object, __traits(identifier, method)); + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + runTaskDist_unsafe(func, args); + } + + private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args) + { + *ret = runTaskH!(FT, ARGS)(func, args); + } + + private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) + { + import std.traits : ParameterTypeTuple; + import vibe.internal.traits : areConvertibleTo; + import vibe.internal.typetuple; + + alias FARGS = ParameterTypeTuple!CALLABLE; + static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), + "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); + + auto tfi = TaskFuncInfo.make(callable, args); + m_state.lock.queue.put(tfi); + m_signal.emitSingle(); + } + + private void runTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! + { + import std.traits : ParameterTypeTuple; + import vibe.internal.traits : areConvertibleTo; + import vibe.internal.typetuple; + + alias FARGS = ParameterTypeTuple!CALLABLE; + static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), + "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); + + foreach (thr; m_state.lock.threads) { + // create one TFI per thread to properly acocunt for elaborate assignment operators/postblit + auto tfi = TaskFuncInfo.make(callable, args); + thr.m_queue.put(tfi); + } + m_signal.emit(); + } +} + +private class WorkerThread : Thread { + private { + shared(TaskPool) m_pool; + TaskQueue m_queue; + } + + this(shared(TaskPool) pool) + { + m_pool = pool; + super(&main); + } + + private void main() + nothrow { + import core.stdc.stdlib : exit; + import core.exception : InvalidMemoryOperationError; + import std.encoding : sanitize; + + try { + if (m_pool.m_state.lock.term) return; + logDebug("entering worker thread"); + runTask(&handleWorkerTasks); + logDebug("running event loop"); + if (!m_pool.m_state.lock.term) runEventLoop(); + logDebug("Worker thread exit."); + } catch (Exception e) { + scope (failure) exit(-1); + logFatal("Worker thread terminated due to uncaught exception: %s", e.msg); + logDebug("Full error: %s", e.toString().sanitize()); + } catch (InvalidMemoryOperationError e) { + import std.stdio; + scope(failure) assert(false); + writeln("Error message: ", e.msg); + writeln("Full error: ", e.toString().sanitize()); + exit(-1); + } catch (Throwable th) { + logFatal("Worker thread terminated due to uncaught error: %s", th.msg); + logDebug("Full error: %s", th.toString().sanitize()); + exit(-1); + } + } + + private void handleWorkerTasks() + nothrow @safe { + import std.algorithm.iteration : filter; + import std.algorithm.searching : count; + import std.array : array; + + logDebug("worker thread enter"); + TaskFuncInfo taskfunc; + while(true){ + auto emit_count = m_pool.m_signal.emitCount; + + with (m_pool.m_state.lock) { + logDebug("worker thread check"); + + if (term) break; + + if (m_queue.consume(taskfunc)) { + logDebug("worker thread got specific task"); + } else if (queue.consume(taskfunc)) { + logDebug("worker thread got specific task"); + } + } + + if (taskfunc.func !is null) { + .runTask_internal(taskfunc); + taskfunc.func = null; + } + else emit_count = m_pool.m_signal.waitUninterruptible(emit_count); + } + + logDebug("worker thread exit"); + + if (!m_queue.empty) + logWarn("Worker thread shuts down with specific worker tasks left in its queue."); + + with (m_pool.m_state.lock) { + threads = threads.filter!(t => t !is this).array; + if (threads.length > 0 && !queue.empty) + logWarn("Worker threads shut down with worker tasks still left in the queue."); + } + m_pool.m_signal.emit(); + + exitEventLoop(); + } +} + +private struct TaskQueue { +nothrow @safe: + // FIXME: use a more efficient storage! + TaskFuncInfo[] tasks; + @property bool empty() const { return tasks.length == 0; } + void put(ref TaskFuncInfo tfi) { tasks ~= tfi; } + bool consume(ref TaskFuncInfo tfi) + { + if (tasks.length == 0) return false; + tfi = tasks[0]; + tasks = tasks[1 .. $]; + return true; + } +}