From 6a2dfa468bf7b4016126ffe7487babb80a5a971c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 14 Mar 2020 19:47:20 +0100 Subject: [PATCH 1/3] Implement priority based task scheduling. 1. Removes the marker task used by schedule() and instead limits the number of task resumptions to the initial length of the task queue 2. Assigns a static and a dynamic priority to each task. The dynamic priority starts with the same value as the static priority and gets incremented by the static priority each time the task gets overtaken by a higher priority task, eventually leading to the task becoming the highest priority (unless the static priority is zero). Tasks with a higher dynamic priority generally take precedence, unless the concurrency exceeds 10 scheduled tasks, in which case the front of the queue is scheduled in normal FIFO order. --- source/vibe/core/core.d | 77 +++++++++++++++++++++- source/vibe/core/task.d | 123 +++++++++++++++++++++++++++++------- source/vibe/core/taskpool.d | 84 +++++++++++++++++++----- 3 files changed, 244 insertions(+), 40 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index ee507a2..2d1af35 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1,7 +1,7 @@ /** This module contains the core functionality of the vibe.d framework. - Copyright: © 2012-2019 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Sönke Ludwig */ @@ -319,7 +319,7 @@ Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args) { return runTask_internal!((ref tfi) { tfi.set(task, args); }); } -/// +/// ditto Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args) @system { return runTask_internal!((ref tfi) { tfi.set(task, args); }); @@ -330,6 +330,50 @@ Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) { return runTask_internal!((ref tfi) { tfi.set(task, args); }); } +/// ditto +Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args) +{ + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} +/// ditto +Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args) +@system { + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} +/// ditto +Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args) + if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init)))) +{ + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} + + +unittest { // test proportional priority scheduling + auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); }); + scope (exit) tm.stop(); + + size_t a, b; + auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; yield(); } }); + auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; yield(); } }); + runTask({ + t1.join(); + t2.join(); + exitEventLoop(); + }); + runEventLoop(); + assert(a + b == 100); + assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1 +} + /** Runs an asyncronous task that is guaranteed to finish before the caller's @@ -429,6 +473,21 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS arg setupWorkerThreads(); st_workerPool.runTask!method(object, args); } +/// ditto +void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) +{ + setupWorkerThreads(); + st_workerPool.runTask(settings, func, args); +} + +/// ditto +void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) +{ + setupWorkerThreads(); + st_workerPool.runTask!method(settings, object, args); +} /** Runs a new asynchronous task in a worker thread, returning the task handle. @@ -452,6 +511,20 @@ Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS ar setupWorkerThreads(); return st_workerPool.runTaskH!method(object, args); } +/// ditto +Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) +{ + setupWorkerThreads(); + return st_workerPool.runTaskH(settings, func, args); +} +/// ditto +Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) +{ + setupWorkerThreads(); + return st_workerPool.runTaskH!method(settings, object, args); +} /// Running a worker task using a function unittest { diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index e71d378..c422d8c 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -1,7 +1,7 @@ /** Contains interfaces and enums for evented I/O drivers. - Copyright: © 2012-2016 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig Authors: Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. */ @@ -31,6 +31,8 @@ struct Task { static ThreadInfo s_tidInfo; } + enum basePriority = 0x00010000; + private this(TaskFiber fiber, size_t task_counter) @safe nothrow { () @trusted { m_fiber = cast(shared)fiber; } (); @@ -129,6 +131,27 @@ struct Task { bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } } + +/** Settings to control the behavior of newly started tasks. +*/ +struct TaskSettings { + /** Scheduling priority of the task + + The priority of a task is roughly proportional to the amount of + times it gets scheduled in comparison to competing tasks. For + example, a task with priority 100 will be scheduled every 10 rounds + when competing against a task with priority 1000. + + The default priority is defined by `basePriority` and has a value + of 65536. Priorities should be computed relative to `basePriority`. + + A task with a priority of zero will only be executed if no other + non-zero task is competing. + */ + uint priority = Task.basePriority; +} + + /** Implements a task local storage variable. @@ -316,7 +339,9 @@ final package class TaskFiber : Fiber { Thread m_thread; ThreadInfo m_tidInfo; + uint m_staticPriority, m_dynamicPriority; shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags + bool m_shutdown = false; shared(ManualEvent) m_onExit; @@ -384,6 +409,7 @@ final package class TaskFiber : Fiber { TaskFuncInfo task; swap(task, m_taskFunc); + m_dynamicPriority = m_staticPriority = task.settings.priority; Task handle = this.task; try { atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running @@ -621,6 +647,7 @@ package struct TaskFuncInfo { void[2*size_t.sizeof] callable; void[maxTaskParameterSize] args; debug ulong functionPointer; + TaskSettings settings; void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) { @@ -715,7 +742,6 @@ package struct TaskScheduler { private { TaskFiberQueue m_taskQueue; - TaskFiber m_markerTask; } @safe: @@ -738,8 +764,7 @@ package struct TaskScheduler { debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ()); tf.handleInterrupt(); if (tf.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(tf); - doYield(t); + doYieldAndReschedule(t); tf.handleInterrupt(); } @@ -846,13 +871,10 @@ package struct TaskScheduler { if (t == Task.init) return; // not really a task -> no-op auto tf = () @trusted { return t.taskFiber; } (); if (tf.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(tf); - doYield(t); + doYieldAndReschedule(t); } /** Holds execution until the task gets explicitly resumed. - - */ void hibernate() { @@ -923,33 +945,26 @@ package struct TaskScheduler { return ScheduleStatus.idle; - if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here! - - scope (exit) assert(!m_markerTask.m_queue, "Marker task still in queue!?"); - assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); - assert(!m_markerTask.m_queue, "TaskScheduler.schedule() was called recursively!"); - // keep track of the end of the queue, so that we don't process tasks - // infinitely - m_taskQueue.insertBack(m_markerTask); + if (m_taskQueue.empty) return ScheduleStatus.idle; - while (m_taskQueue.front !is m_markerTask) { + foreach (i; 0 .. m_taskQueue.length) { auto t = m_taskQueue.front; m_taskQueue.popFront(); + + // reset priority + t.m_dynamicPriority = t.m_staticPriority; + debug (VibeTaskLog) logTrace("resuming task"); auto task = t.task; if (task != Task.init) resumeTask(t.task); debug (VibeTaskLog) logTrace("task out"); - assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?"); - if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode + if (m_taskQueue.empty) break; } - // remove marker task - m_taskQueue.popFront(); - debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length); return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; @@ -979,6 +994,25 @@ package struct TaskScheduler { } } + private void doYieldAndReschedule(Task task) + { + auto tf = () @trusted { return task.taskFiber; } (); + + // insert according to priority, limited to a priority + // factor of 1:10 in case of heavy concurrency + m_taskQueue.insertBackPred(tf, 10, (t) { + if (t.m_dynamicPriority >= tf.m_dynamicPriority) + return true; + + // increase dynamic priority each time a task gets overtaken to + // ensure a fair schedule + t.m_dynamicPriority += t.m_staticPriority; + return false; + }); + + doYield(task); + } + private void doYield(Task task) { assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!"); @@ -1041,6 +1075,31 @@ private struct TaskFiberQueue { length++; } + // inserts a task after the first task for which the predicate yields `true`, + // starting from the back. a maximum of max_skip tasks will be skipped + // before the task is inserted regardless of the predicate. + void insertBackPred(TaskFiber task, size_t max_skip, + scope bool delegate(TaskFiber) @safe nothrow pred) + { + assert(task.m_queue is null, "Task is already scheduled to be resumed!"); + assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); + assert(task.m_next is null, "Task has m_next set without being in a queue!?"); + + for (auto t = last; t; t = t.m_prev) { + if (!max_skip-- || pred(t)) { + task.m_queue = &this; + task.m_next = t.m_next; + t.m_next = task; + task.m_prev = t; + if (!task.m_next) last = task; + length++; + return; + } + } + + insertFront(task); + } + void popFront() { if (first is last) last = null; @@ -1086,6 +1145,26 @@ unittest { assert(q.empty && q.length == 0); } +unittest { + auto f1 = new TaskFiber; + auto f2 = new TaskFiber; + auto f3 = new TaskFiber; + auto f4 = new TaskFiber; + auto f5 = new TaskFiber; + + TaskFiberQueue q; + q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); + assert(q.first == f1 && q.last == f1); + q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); + assert(q.first == f1 && q.last == f2); + q.insertBackPred(f3, 1, (tf) => false); + assert(q.first == f1 && q.last == f2); + q.insertBackPred(f4, 10, (tf) => false); + assert(q.first == f4 && q.last == f2); + q.insertBackPred(f5, 10, (tf) => true); + assert(q.first == f4 && q.last == f5); +} + private struct FLSInfo { void function(void[], size_t) fct; size_t offset; diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 1b71142..cd1aa67 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -1,7 +1,7 @@ /** Multi-threaded task pool implementation. - Copyright: © 2012-2017 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Sönke Ludwig */ @@ -11,7 +11,7 @@ import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.log; import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor; -import vibe.core.task : Task, TaskFuncInfo, callWithMove; +import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove; import core.sync.mutex : Mutex; import core.thread : Thread; import std.concurrency : prioritySend, receiveOnly; @@ -113,16 +113,30 @@ shared final class TaskPool { if (isFunctionPointer!FT) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTask_unsafe(func, args); + runTask_unsafe(TaskSettings.init, func, args); } - /// ditto void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); auto func = &__traits(getMember, object, __traits(identifier, method)); - runTask_unsafe(func, args); + runTask_unsafe(TaskSettings.init, func, args); + } + /// ditto + void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTask_unsafe(settings, func, args); + } + /// ditto + void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + auto func = &__traits(getMember, object, __traits(identifier, method)); + runTask_unsafe(settings, func, args); } /** Runs a new asynchronous task in a worker thread, returning the task handle. @@ -141,9 +155,9 @@ shared final class TaskPool { // workaround for runWorkerTaskH to work when called outside of a task if (Task.getThis() == Task.init) { Task ret; - .runTask({ ret = doRunTaskH(func, args); }).join(); + .runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join(); return ret; - } else return doRunTaskH(func, args); + } else return doRunTaskH(TaskSettings.init, func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -154,10 +168,32 @@ shared final class TaskPool { } return runTaskH(&wrapper!(), object, args); } + /// ditto + Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + // workaround for runWorkerTaskH to work when called outside of a task + if (Task.getThis() == Task.init) { + Task ret; + .runTask({ ret = doRunTaskH(settings, func, args); }).join(); + return ret; + } else return doRunTaskH(settings, func, args); + } + /// ditto + Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + static void wrapper()(shared(T) object, ref ARGS args) { + __traits(getMember, object, __traits(identifier, method))(args); + } + return runTaskH(settings, &wrapper!(), object, args); + } // NOTE: needs to be a separate function to avoid recursion for the // workaround above, which breaks @safe inference - private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args) + private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args) if (isFunctionPointer!FT) { import std.typecons : Typedef; @@ -173,7 +209,7 @@ shared final class TaskPool { caller.tid.prioritySend(callee); mixin(callWithMove!ARGS("func", "args")); } - runTask_unsafe(&taskFun, caller, func, args); + runTask_unsafe(settings, &taskFun, caller, func, args); return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } (); } @@ -191,7 +227,7 @@ shared final class TaskPool { if (is(typeof(*func) == function)) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTaskDist_unsafe(func, args); + runTaskDist_unsafe(TaskSettings.init, func, args); } /// ditto void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -199,7 +235,22 @@ shared final class TaskPool { auto func = &__traits(getMember, object, __traits(identifier, method)); foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTaskDist_unsafe(func, args); + runTaskDist_unsafe(TaskSettings.init, func, args); + } + /// ditto + void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTaskDist_unsafe(settings, func, args); + } + /// ditto + void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + { + auto func = &__traits(getMember, object, __traits(identifier, method)); + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + runTaskDist_unsafe(settings, func, args); } /** Runs a new asynchronous task in all worker threads and returns the handles. @@ -232,7 +283,7 @@ shared final class TaskPool { on_handle(receiveOnly!Task); } - private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) + private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args) { import std.traits : ParameterTypeTuple; import vibe.internal.traits : areConvertibleTo; @@ -242,11 +293,11 @@ shared final class TaskPool { static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); - m_state.lock.queue.put(callable, args); + m_state.lock.queue.put(settings, callable, args); m_signal.emitSingle(); } - private void runTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! + private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! { import std.traits : ParameterTypeTuple; import vibe.internal.traits : areConvertibleTo; @@ -260,7 +311,7 @@ shared final class TaskPool { auto st = m_state.lock; foreach (thr; st.threads) { // create one TFI per thread to properly account for elaborate assignment operators/postblit - thr.m_queue.put(callable, args); + thr.m_queue.put(settings, callable, args); } } m_signal.emit(); @@ -355,12 +406,13 @@ nothrow @safe: @property size_t length() const { return m_queue.length; } - void put(CALLABLE, ARGS...)(ref CALLABLE c, ref ARGS args) + void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args) { import std.algorithm.comparison : max; if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2); assert(!m_queue.full); + m_queue.peekDst[0].settings = settings; m_queue.peekDst[0].set(c, args); m_queue.putN(1); } From a6bd4a0b1d6243c5ab074de4f25986be410cdf1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 14 Mar 2020 22:30:41 +0100 Subject: [PATCH 2/3] Fix regression test w.r.t. slightly changed semantics of schedule(). A single call to yield() (which calls schedule() once) is not sufficient anymore to guarantee that both tasks have run far enough. --- tests/issue-161-multiple-joiners.d | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/issue-161-multiple-joiners.d b/tests/issue-161-multiple-joiners.d index c0ebc78..e1bac30 100644 --- a/tests/issue-161-multiple-joiners.d +++ b/tests/issue-161-multiple-joiners.d @@ -23,6 +23,9 @@ void main() t.join(); }); + // let the outer task run and start the inner task + yield(); + // let the outer task get another execution slice to write to t yield(); assert(t && t.running); From 262b417794d38111e830a34c43281e55576f74aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 14 Mar 2020 23:33:41 +0100 Subject: [PATCH 3/3] Add TaskSettings overloads for runWorkerTaskDist(H). --- source/vibe/core/core.d | 20 ++++++++++++++++++++ source/vibe/core/taskpool.d | 8 +++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 2d1af35..953cbc4 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -656,6 +656,19 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) setupWorkerThreads(); return st_workerPool.runTaskDist!method(object, args); } +/// ditto +void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) +{ + setupWorkerThreads(); + return st_workerPool.runTaskDist(settings, func, args); +} +/// ditto +void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args) +{ + setupWorkerThreads(); + return st_workerPool.runTaskDist!method(settings, object, args); +} /** Runs a new asynchronous task in all worker threads and returns the handles. @@ -671,6 +684,13 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref setupWorkerThreads(); st_workerPool.runTaskDistH(on_handle, func, args); } +/// ditto +void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) +{ + setupWorkerThreads(); + st_workerPool.runTaskDistH(settings, on_handle, func, args); +} /** diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index cd1aa67..f862ab5 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -261,6 +261,12 @@ shared final class TaskPool { See_also: `runTaskDist` */ void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) + if (!is(HCB == TaskSettings)) + { + runTaskDistH(TaskSettings.init, on_handle, func, args); + } + /// ditto + void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) { // TODO: support non-copyable argument types using .move import std.concurrency : send, receiveOnly; @@ -277,7 +283,7 @@ shared final class TaskPool { t.tid.send(Task.getThis()); func(args); } - runTaskDist(&call, caller, func, args); + runTaskDist(settings, &call, caller, func, args); foreach (i; 0 .. this.threadCount) on_handle(receiveOnly!Task);