diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f1c44b3..e23f61d 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -429,7 +429,7 @@ package Task runTask_internal(alias TFI_SETUP)() () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); } - s_scheduler.switchTo(handle, TaskFiber.getThis().m_yieldLockCount > 0 ? Flag!"defer".yes : Flag!"defer".no); + switchToTask(handle); debug if (TaskFiber.ms_taskEventCallback) { () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); @@ -808,13 +808,27 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null) This function can be used in conjunction with `hibernate` to wake up a task. The task must live in the same thread as the caller. - See_Also: `hibernate` + If no priority is specified, `TaskSwitchPriority.prioritized` or + `TaskSwitchPriority.immediate` will be used, depending on whether a + yield lock is currently active. + + Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield + lock is active. + + This function must only be called on tasks that belong to the calling + thread and have previously been hibernated! + + See_Also: `hibernate`, `yieldLock` */ void switchToTask(Task t) @safe nothrow { - import std.typecons : Yes, No; - auto defer = TaskFiber.getThis().m_yieldLockCount > 0 ? Yes.defer : No.defer; - s_scheduler.switchTo(t, defer); + auto defer = TaskFiber.getThis().m_yieldLockCount > 0; + s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); +} +/// ditto +void switchToTask(Task t, TaskSwitchPriority priority) +@safe nothrow { + s_scheduler.switchTo(t, priority); } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index c422d8c..b112651 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -558,8 +558,8 @@ final package class TaskFiber : Fiber { if (caller.m_thread is m_thread) { auto thisus = () @trusted { return cast()this; } (); debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); - auto defer = caller.m_yieldLockCount > 0 ? Yes.defer : No.defer; - taskScheduler.switchTo(thisus.task, defer); + auto defer = caller.m_yieldLockCount > 0; + taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); } else { debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); } @@ -642,6 +642,25 @@ final package class TaskFiber : Fiber { } } + +/** Controls the priority to use for switching execution to a task. +*/ +enum TaskSwitchPriority { + /** Rescheduled according to the tasks priority + */ + normal, + + /** Rescheduled with maximum priority. + + The task will resume as soon as the current task yields. + */ + prioritized, + + /** Switch to the task immediately. + */ + immediate +} + package struct TaskFuncInfo { void function(ref TaskFuncInfo) func; void[2*size_t.sizeof] callable; @@ -894,7 +913,7 @@ package struct TaskScheduler { This forces immediate execution of the specified task. After the tasks finishes or yields, the calling task will continue execution. */ - void switchTo(Task t, Flag!"defer" defer = No.defer) + void switchTo(Task t, TaskSwitchPriority priority) { auto thist = Task.getThis(); @@ -905,13 +924,16 @@ package struct TaskScheduler { auto tf = () @trusted { return t.taskFiber; } (); if (tf.m_queue) { + // don't reset the position of already scheduled tasks + if (priority == TaskSwitchPriority.normal) return; + debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); m_taskQueue.remove(tf); assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); } - if (thist == Task.init && defer == No.defer) { + if (thist == Task.init && priority == TaskSwitchPriority.immediate) { assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); debug (VibeTaskLog) logTrace("switch to task from global context"); resumeTask(t); @@ -921,12 +943,20 @@ package struct TaskScheduler { assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); - if (defer) { - m_taskQueue.insertFront(tf); - } else { - m_taskQueue.insertFront(thistf); - m_taskQueue.insertFront(tf); - doYield(thist); + final switch (priority) { + case TaskSwitchPriority.normal: + reschedule(tf); + break; + case TaskSwitchPriority.prioritized: + tf.m_dynamicPriority = uint.max; + reschedule(tf); + break; + case TaskSwitchPriority.immediate: + tf.m_dynamicPriority = uint.max; + m_taskQueue.insertFront(thistf); + m_taskQueue.insertFront(tf); + doYield(thist); + break; } } } @@ -994,9 +1024,9 @@ package struct TaskScheduler { } } - private void doYieldAndReschedule(Task task) + private void reschedule(TaskFiber tf) { - auto tf = () @trusted { return task.taskFiber; } (); + import std.algorithm.comparison : min; // insert according to priority, limited to a priority // factor of 1:10 in case of heavy concurrency @@ -1006,9 +1036,16 @@ package struct TaskScheduler { // increase dynamic priority each time a task gets overtaken to // ensure a fair schedule - t.m_dynamicPriority += t.m_staticPriority; + t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); return false; }); + } + + private void doYieldAndReschedule(Task task) + { + auto tf = () @trusted { return task.taskFiber; } (); + + reschedule(tf); doYield(task); } @@ -1089,6 +1126,7 @@ private struct TaskFiberQueue { if (!max_skip-- || pred(t)) { task.m_queue = &this; task.m_next = t.m_next; + if (task.m_next) task.m_next.m_prev = task; t.m_next = task; task.m_prev = t; if (!task.m_next) last = task; @@ -1151,18 +1189,52 @@ unittest { auto f3 = new TaskFiber; auto f4 = new TaskFiber; auto f5 = new TaskFiber; - + auto f6 = new TaskFiber; TaskFiberQueue q; + + void checkQueue() + { + TaskFiber p; + for (auto t = q.front; t; t = t.m_next) { + assert(t.m_prev is p); + assert(t.m_next || t is q.last); + p = t; + } + + TaskFiber n; + for (auto t = q.last; t; t = t.m_prev) { + assert(t.m_next is n); + assert(t.m_prev || t is q.first); + n = t; + } + } + q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); - assert(q.first == f1 && q.last == f1); + assert(q.first is f1 && q.last is f1); + checkQueue(); + q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); - assert(q.first == f1 && q.last == f2); + assert(q.first is f1 && q.last is f2); + checkQueue(); + q.insertBackPred(f3, 1, (tf) => false); - assert(q.first == f1 && q.last == f2); + assert(q.first is f1 && q.last is f2); + assert(f1.m_next is f3); + assert(f3.m_prev is f1); + checkQueue(); + q.insertBackPred(f4, 10, (tf) => false); - assert(q.first == f4 && q.last == f2); + assert(q.first is f4 && q.last is f2); + checkQueue(); + q.insertBackPred(f5, 10, (tf) => true); - assert(q.first == f4 && q.last == f5); + assert(q.first is f4 && q.last is f5); + checkQueue(); + + q.insertBackPred(f6, 10, (tf) => tf is f4); + assert(q.first is f4 && q.last is f5); + assert(f4.m_next is f6); + checkQueue(); } private struct FLSInfo { diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 0712dce..48ec7be 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -3,7 +3,7 @@ module vibe.internal.async; import std.traits : ParameterTypeTuple, ReturnType; import std.typecons : tuple; import vibe.core.core : hibernate, switchToTask; -import vibe.core.task : InterruptException, Task; +import vibe.core.task : InterruptException, Task, TaskSwitchPriority; import vibe.core.log; import core.time : Duration, seconds; @@ -131,7 +131,10 @@ void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__) fired[%1$s] = true; any_fired = true; Waitables[%1$s].done(%3$s); - if (t != Task.init) switchToTask(t); + if (t != Task.init) { + version (VibeHighEventPriority) switchToTask(t); + else switchToTask(t, TaskSwitchPriority.normal); + } }; debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s); diff --git a/tests/vibe.core.net.1376.d b/tests/vibe.core.net.1376.d index 4e01fd9..2d6529d 100644 --- a/tests/vibe.core.net.1376.d +++ b/tests/vibe.core.net.1376.d @@ -12,7 +12,7 @@ import core.time : msecs; shared static this() { - listenTCP(11375,(conn){ + auto l = listenTCP(0, (conn) { auto td = runTask!TCPConnection((conn) { ubyte [3] buf; try { @@ -22,17 +22,17 @@ shared static this() }, conn); sleep(10.msecs); conn.close(); - }); + }, "127.0.0.1"); runTask({ try { - auto conn = connectTCP("127.0.0.1", 11375); + auto conn = connectTCP("127.0.0.1", l.bindAddress.port); conn.write("a"); conn.close(); } catch (Exception e) assert(false, e.msg); try { - auto conn = connectTCP("127.0.0.1", 11375); + auto conn = connectTCP("127.0.0.1", l.bindAddress.port); conn.close(); } catch (Exception e) assert(false, e.msg);