diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 543ed2b..f1c44b3 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1,7 +1,7 @@ /** This module contains the core functionality of the vibe.d framework. - Copyright: © 2012-2019 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Sönke Ludwig */ @@ -319,7 +319,7 @@ Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args) { return runTask_internal!((ref tfi) { tfi.set(task, args); }); } -/// +/// ditto Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args) @system { return runTask_internal!((ref tfi) { tfi.set(task, args); }); @@ -330,6 +330,50 @@ Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) { return runTask_internal!((ref tfi) { tfi.set(task, args); }); } +/// ditto +Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args) +{ + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} +/// ditto +Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args) +@system { + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} +/// ditto +Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args) + if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init)))) +{ + return runTask_internal!((ref tfi) { + tfi.settings = settings; + tfi.set(task, args); + }); +} + + +unittest { // test proportional priority scheduling + auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); }); + scope (exit) tm.stop(); + + size_t a, b; + auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; yield(); } }); + auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; yield(); } }); + runTask({ + t1.join(); + t2.join(); + exitEventLoop(); + }); + runEventLoop(); + assert(a + b == 100); + assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1 +} + /** Runs an asyncronous task that is guaranteed to finish before the caller's @@ -429,6 +473,21 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS arg setupWorkerThreads(); st_workerPool.runTask!method(object, args); } +/// ditto +void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) +{ + setupWorkerThreads(); + st_workerPool.runTask(settings, func, args); +} + +/// ditto +void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) +{ + setupWorkerThreads(); + st_workerPool.runTask!method(settings, object, args); +} /** Runs a new asynchronous task in a worker thread, returning the task handle. @@ -452,6 +511,20 @@ Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS ar setupWorkerThreads(); return st_workerPool.runTaskH!method(object, args); } +/// ditto +Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) +{ + setupWorkerThreads(); + return st_workerPool.runTaskH(settings, func, args); +} +/// ditto +Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) +{ + setupWorkerThreads(); + return st_workerPool.runTaskH!method(settings, object, args); +} /// Running a worker task using a function unittest { @@ -583,6 +656,19 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) setupWorkerThreads(); return st_workerPool.runTaskDist!method(object, args); } +/// ditto +void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) +{ + setupWorkerThreads(); + return st_workerPool.runTaskDist(settings, func, args); +} +/// ditto +void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args) +{ + setupWorkerThreads(); + return st_workerPool.runTaskDist!method(settings, object, args); +} /** Runs a new asynchronous task in all worker threads and returns the handles. @@ -598,6 +684,13 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref setupWorkerThreads(); st_workerPool.runTaskDistH(on_handle, func, args); } +/// ditto +void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) +{ + setupWorkerThreads(); + st_workerPool.runTaskDistH(settings, on_handle, func, args); +} /** diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index e71d378..c422d8c 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -1,7 +1,7 @@ /** Contains interfaces and enums for evented I/O drivers. - Copyright: © 2012-2016 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig Authors: Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. */ @@ -31,6 +31,8 @@ struct Task { static ThreadInfo s_tidInfo; } + enum basePriority = 0x00010000; + private this(TaskFiber fiber, size_t task_counter) @safe nothrow { () @trusted { m_fiber = cast(shared)fiber; } (); @@ -129,6 +131,27 @@ struct Task { bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } } + +/** Settings to control the behavior of newly started tasks. +*/ +struct TaskSettings { + /** Scheduling priority of the task + + The priority of a task is roughly proportional to the amount of + times it gets scheduled in comparison to competing tasks. For + example, a task with priority 100 will be scheduled every 10 rounds + when competing against a task with priority 1000. + + The default priority is defined by `basePriority` and has a value + of 65536. Priorities should be computed relative to `basePriority`. + + A task with a priority of zero will only be executed if no other + non-zero task is competing. + */ + uint priority = Task.basePriority; +} + + /** Implements a task local storage variable. @@ -316,7 +339,9 @@ final package class TaskFiber : Fiber { Thread m_thread; ThreadInfo m_tidInfo; + uint m_staticPriority, m_dynamicPriority; shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags + bool m_shutdown = false; shared(ManualEvent) m_onExit; @@ -384,6 +409,7 @@ final package class TaskFiber : Fiber { TaskFuncInfo task; swap(task, m_taskFunc); + m_dynamicPriority = m_staticPriority = task.settings.priority; Task handle = this.task; try { atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running @@ -621,6 +647,7 @@ package struct TaskFuncInfo { void[2*size_t.sizeof] callable; void[maxTaskParameterSize] args; debug ulong functionPointer; + TaskSettings settings; void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) { @@ -715,7 +742,6 @@ package struct TaskScheduler { private { TaskFiberQueue m_taskQueue; - TaskFiber m_markerTask; } @safe: @@ -738,8 +764,7 @@ package struct TaskScheduler { debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ()); tf.handleInterrupt(); if (tf.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(tf); - doYield(t); + doYieldAndReschedule(t); tf.handleInterrupt(); } @@ -846,13 +871,10 @@ package struct TaskScheduler { if (t == Task.init) return; // not really a task -> no-op auto tf = () @trusted { return t.taskFiber; } (); if (tf.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(tf); - doYield(t); + doYieldAndReschedule(t); } /** Holds execution until the task gets explicitly resumed. - - */ void hibernate() { @@ -923,33 +945,26 @@ package struct TaskScheduler { return ScheduleStatus.idle; - if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here! - - scope (exit) assert(!m_markerTask.m_queue, "Marker task still in queue!?"); - assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); - assert(!m_markerTask.m_queue, "TaskScheduler.schedule() was called recursively!"); - // keep track of the end of the queue, so that we don't process tasks - // infinitely - m_taskQueue.insertBack(m_markerTask); + if (m_taskQueue.empty) return ScheduleStatus.idle; - while (m_taskQueue.front !is m_markerTask) { + foreach (i; 0 .. m_taskQueue.length) { auto t = m_taskQueue.front; m_taskQueue.popFront(); + + // reset priority + t.m_dynamicPriority = t.m_staticPriority; + debug (VibeTaskLog) logTrace("resuming task"); auto task = t.task; if (task != Task.init) resumeTask(t.task); debug (VibeTaskLog) logTrace("task out"); - assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?"); - if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode + if (m_taskQueue.empty) break; } - // remove marker task - m_taskQueue.popFront(); - debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length); return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; @@ -979,6 +994,25 @@ package struct TaskScheduler { } } + private void doYieldAndReschedule(Task task) + { + auto tf = () @trusted { return task.taskFiber; } (); + + // insert according to priority, limited to a priority + // factor of 1:10 in case of heavy concurrency + m_taskQueue.insertBackPred(tf, 10, (t) { + if (t.m_dynamicPriority >= tf.m_dynamicPriority) + return true; + + // increase dynamic priority each time a task gets overtaken to + // ensure a fair schedule + t.m_dynamicPriority += t.m_staticPriority; + return false; + }); + + doYield(task); + } + private void doYield(Task task) { assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!"); @@ -1041,6 +1075,31 @@ private struct TaskFiberQueue { length++; } + // inserts a task after the first task for which the predicate yields `true`, + // starting from the back. a maximum of max_skip tasks will be skipped + // before the task is inserted regardless of the predicate. + void insertBackPred(TaskFiber task, size_t max_skip, + scope bool delegate(TaskFiber) @safe nothrow pred) + { + assert(task.m_queue is null, "Task is already scheduled to be resumed!"); + assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); + assert(task.m_next is null, "Task has m_next set without being in a queue!?"); + + for (auto t = last; t; t = t.m_prev) { + if (!max_skip-- || pred(t)) { + task.m_queue = &this; + task.m_next = t.m_next; + t.m_next = task; + task.m_prev = t; + if (!task.m_next) last = task; + length++; + return; + } + } + + insertFront(task); + } + void popFront() { if (first is last) last = null; @@ -1086,6 +1145,26 @@ unittest { assert(q.empty && q.length == 0); } +unittest { + auto f1 = new TaskFiber; + auto f2 = new TaskFiber; + auto f3 = new TaskFiber; + auto f4 = new TaskFiber; + auto f5 = new TaskFiber; + + TaskFiberQueue q; + q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); + assert(q.first == f1 && q.last == f1); + q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); + assert(q.first == f1 && q.last == f2); + q.insertBackPred(f3, 1, (tf) => false); + assert(q.first == f1 && q.last == f2); + q.insertBackPred(f4, 10, (tf) => false); + assert(q.first == f4 && q.last == f2); + q.insertBackPred(f5, 10, (tf) => true); + assert(q.first == f4 && q.last == f5); +} + private struct FLSInfo { void function(void[], size_t) fct; size_t offset; diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 1b71142..f862ab5 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -1,7 +1,7 @@ /** Multi-threaded task pool implementation. - Copyright: © 2012-2017 Sönke Ludwig + Copyright: © 2012-2020 Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Sönke Ludwig */ @@ -11,7 +11,7 @@ import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.log; import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor; -import vibe.core.task : Task, TaskFuncInfo, callWithMove; +import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove; import core.sync.mutex : Mutex; import core.thread : Thread; import std.concurrency : prioritySend, receiveOnly; @@ -113,16 +113,30 @@ shared final class TaskPool { if (isFunctionPointer!FT) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTask_unsafe(func, args); + runTask_unsafe(TaskSettings.init, func, args); } - /// ditto void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); auto func = &__traits(getMember, object, __traits(identifier, method)); - runTask_unsafe(func, args); + runTask_unsafe(TaskSettings.init, func, args); + } + /// ditto + void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTask_unsafe(settings, func, args); + } + /// ditto + void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + auto func = &__traits(getMember, object, __traits(identifier, method)); + runTask_unsafe(settings, func, args); } /** Runs a new asynchronous task in a worker thread, returning the task handle. @@ -141,9 +155,9 @@ shared final class TaskPool { // workaround for runWorkerTaskH to work when called outside of a task if (Task.getThis() == Task.init) { Task ret; - .runTask({ ret = doRunTaskH(func, args); }).join(); + .runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join(); return ret; - } else return doRunTaskH(func, args); + } else return doRunTaskH(TaskSettings.init, func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -154,10 +168,32 @@ shared final class TaskPool { } return runTaskH(&wrapper!(), object, args); } + /// ditto + Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (isFunctionPointer!FT) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + // workaround for runWorkerTaskH to work when called outside of a task + if (Task.getThis() == Task.init) { + Task ret; + .runTask({ ret = doRunTaskH(settings, func, args); }).join(); + return ret; + } else return doRunTaskH(settings, func, args); + } + /// ditto + Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) + { + static void wrapper()(shared(T) object, ref ARGS args) { + __traits(getMember, object, __traits(identifier, method))(args); + } + return runTaskH(settings, &wrapper!(), object, args); + } // NOTE: needs to be a separate function to avoid recursion for the // workaround above, which breaks @safe inference - private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args) + private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args) if (isFunctionPointer!FT) { import std.typecons : Typedef; @@ -173,7 +209,7 @@ shared final class TaskPool { caller.tid.prioritySend(callee); mixin(callWithMove!ARGS("func", "args")); } - runTask_unsafe(&taskFun, caller, func, args); + runTask_unsafe(settings, &taskFun, caller, func, args); return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } (); } @@ -191,7 +227,7 @@ shared final class TaskPool { if (is(typeof(*func) == function)) { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTaskDist_unsafe(func, args); + runTaskDist_unsafe(TaskSettings.init, func, args); } /// ditto void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -199,7 +235,22 @@ shared final class TaskPool { auto func = &__traits(getMember, object, __traits(identifier, method)); foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - runTaskDist_unsafe(func, args); + runTaskDist_unsafe(TaskSettings.init, func, args); + } + /// ditto + void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) + if (is(typeof(*func) == function)) + { + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + runTaskDist_unsafe(settings, func, args); + } + /// ditto + void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) + { + auto func = &__traits(getMember, object, __traits(identifier, method)); + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + + runTaskDist_unsafe(settings, func, args); } /** Runs a new asynchronous task in all worker threads and returns the handles. @@ -210,6 +261,12 @@ shared final class TaskPool { See_also: `runTaskDist` */ void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) + if (!is(HCB == TaskSettings)) + { + runTaskDistH(TaskSettings.init, on_handle, func, args); + } + /// ditto + void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) { // TODO: support non-copyable argument types using .move import std.concurrency : send, receiveOnly; @@ -226,13 +283,13 @@ shared final class TaskPool { t.tid.send(Task.getThis()); func(args); } - runTaskDist(&call, caller, func, args); + runTaskDist(settings, &call, caller, func, args); foreach (i; 0 .. this.threadCount) on_handle(receiveOnly!Task); } - private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) + private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args) { import std.traits : ParameterTypeTuple; import vibe.internal.traits : areConvertibleTo; @@ -242,11 +299,11 @@ shared final class TaskPool { static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); - m_state.lock.queue.put(callable, args); + m_state.lock.queue.put(settings, callable, args); m_signal.emitSingle(); } - private void runTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! + private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! { import std.traits : ParameterTypeTuple; import vibe.internal.traits : areConvertibleTo; @@ -260,7 +317,7 @@ shared final class TaskPool { auto st = m_state.lock; foreach (thr; st.threads) { // create one TFI per thread to properly account for elaborate assignment operators/postblit - thr.m_queue.put(callable, args); + thr.m_queue.put(settings, callable, args); } } m_signal.emit(); @@ -355,12 +412,13 @@ nothrow @safe: @property size_t length() const { return m_queue.length; } - void put(CALLABLE, ARGS...)(ref CALLABLE c, ref ARGS args) + void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args) { import std.algorithm.comparison : max; if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2); assert(!m_queue.full); + m_queue.peekDst[0].settings = settings; m_queue.peekDst[0].set(c, args); m_queue.putN(1); } diff --git a/tests/issue-161-multiple-joiners.d b/tests/issue-161-multiple-joiners.d index c0ebc78..e1bac30 100644 --- a/tests/issue-161-multiple-joiners.d +++ b/tests/issue-161-multiple-joiners.d @@ -23,6 +23,9 @@ void main() t.join(); }); + // let the outer task run and start the inner task + yield(); + // let the outer task get another execution slice to write to t yield(); assert(t && t.running);