From 280023dfc21a9da39a92db1e648b68d7824197b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 15 Mar 2020 08:59:17 +0100 Subject: [PATCH 1/5] Avoid potential overflow for dynamic task priority. --- source/vibe/core/task.d | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index c422d8c..1fd1b22 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -996,7 +996,7 @@ package struct TaskScheduler { private void doYieldAndReschedule(Task task) { - auto tf = () @trusted { return task.taskFiber; } (); + import std.algorithm.comparison : min; // insert according to priority, limited to a priority // factor of 1:10 in case of heavy concurrency @@ -1006,7 +1006,7 @@ package struct TaskScheduler { // increase dynamic priority each time a task gets overtaken to // ensure a fair schedule - t.m_dynamicPriority += t.m_staticPriority; + t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); return false; }); From d07c2f02e642b1647d8333ea587b745341d8f2ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 15 Mar 2020 10:34:37 +0100 Subject: [PATCH 2/5] Fix TaskQueue.insertBackPred. --- source/vibe/core/task.d | 47 +++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 1fd1b22..407871c 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -1089,6 +1089,7 @@ private struct TaskFiberQueue { if (!max_skip-- || pred(t)) { task.m_queue = &this; task.m_next = t.m_next; + if (task.m_next) task.m_next.m_prev = task; t.m_next = task; task.m_prev = t; if (!task.m_next) last = task; @@ -1151,18 +1152,52 @@ unittest { auto f3 = new TaskFiber; auto f4 = new TaskFiber; auto f5 = new TaskFiber; - + auto f6 = new TaskFiber; TaskFiberQueue q; + + void checkQueue() + { + TaskFiber p; + for (auto t = q.front; t; t = t.m_next) { + assert(t.m_prev is p); + assert(t.m_next || t is q.last); + p = t; + } + + TaskFiber n; + for (auto t = q.last; t; t = t.m_prev) { + assert(t.m_next is n); + assert(t.m_prev || t is q.first); + n = t; + } + } + q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); - assert(q.first == f1 && q.last == f1); + assert(q.first is f1 && q.last is f1); + checkQueue(); + q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); - assert(q.first == f1 && q.last == f2); + assert(q.first is f1 && q.last is f2); + checkQueue(); + q.insertBackPred(f3, 1, (tf) => false); - assert(q.first == f1 && q.last == f2); + assert(q.first is f1 && q.last is f2); + assert(f1.m_next is f3); + assert(f3.m_prev is f1); + checkQueue(); + q.insertBackPred(f4, 10, (tf) => false); - assert(q.first == f4 && q.last == f2); + assert(q.first is f4 && q.last is f2); + checkQueue(); + q.insertBackPred(f5, 10, (tf) => true); - assert(q.first == f4 && q.last == f5); + assert(q.first is f4 && q.last is f5); + checkQueue(); + + q.insertBackPred(f6, 10, (tf) => tf is f4); + assert(q.first is f4 && q.last is f5); + assert(f4.m_next is f6); + checkQueue(); } private struct FLSInfo { From 1eebe7a9ceefb4032129562fb1eda7a899a1fcfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 15 Mar 2020 09:02:18 +0100 Subject: [PATCH 3/5] Add TaskSwitchPriority to control the priority to use when transferring execution. --- source/vibe/core/core.d | 24 +++++++++++++---- source/vibe/core/task.d | 59 +++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f1c44b3..e23f61d 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -429,7 +429,7 @@ package Task runTask_internal(alias TFI_SETUP)() () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); } - s_scheduler.switchTo(handle, TaskFiber.getThis().m_yieldLockCount > 0 ? Flag!"defer".yes : Flag!"defer".no); + switchToTask(handle); debug if (TaskFiber.ms_taskEventCallback) { () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); @@ -808,13 +808,27 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null) This function can be used in conjunction with `hibernate` to wake up a task. The task must live in the same thread as the caller. - See_Also: `hibernate` + If no priority is specified, `TaskSwitchPriority.prioritized` or + `TaskSwitchPriority.immediate` will be used, depending on whether a + yield lock is currently active. + + Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield + lock is active. + + This function must only be called on tasks that belong to the calling + thread and have previously been hibernated! + + See_Also: `hibernate`, `yieldLock` */ void switchToTask(Task t) @safe nothrow { - import std.typecons : Yes, No; - auto defer = TaskFiber.getThis().m_yieldLockCount > 0 ? Yes.defer : No.defer; - s_scheduler.switchTo(t, defer); + auto defer = TaskFiber.getThis().m_yieldLockCount > 0; + s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); +} +/// ditto +void switchToTask(Task t, TaskSwitchPriority priority) +@safe nothrow { + s_scheduler.switchTo(t, priority); } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 407871c..b112651 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -558,8 +558,8 @@ final package class TaskFiber : Fiber { if (caller.m_thread is m_thread) { auto thisus = () @trusted { return cast()this; } (); debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); - auto defer = caller.m_yieldLockCount > 0 ? Yes.defer : No.defer; - taskScheduler.switchTo(thisus.task, defer); + auto defer = caller.m_yieldLockCount > 0; + taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); } else { debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); } @@ -642,6 +642,25 @@ final package class TaskFiber : Fiber { } } + +/** Controls the priority to use for switching execution to a task. +*/ +enum TaskSwitchPriority { + /** Rescheduled according to the tasks priority + */ + normal, + + /** Rescheduled with maximum priority. + + The task will resume as soon as the current task yields. + */ + prioritized, + + /** Switch to the task immediately. + */ + immediate +} + package struct TaskFuncInfo { void function(ref TaskFuncInfo) func; void[2*size_t.sizeof] callable; @@ -894,7 +913,7 @@ package struct TaskScheduler { This forces immediate execution of the specified task. After the tasks finishes or yields, the calling task will continue execution. */ - void switchTo(Task t, Flag!"defer" defer = No.defer) + void switchTo(Task t, TaskSwitchPriority priority) { auto thist = Task.getThis(); @@ -905,13 +924,16 @@ package struct TaskScheduler { auto tf = () @trusted { return t.taskFiber; } (); if (tf.m_queue) { + // don't reset the position of already scheduled tasks + if (priority == TaskSwitchPriority.normal) return; + debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); m_taskQueue.remove(tf); assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); } - if (thist == Task.init && defer == No.defer) { + if (thist == Task.init && priority == TaskSwitchPriority.immediate) { assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); debug (VibeTaskLog) logTrace("switch to task from global context"); resumeTask(t); @@ -921,12 +943,20 @@ package struct TaskScheduler { assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); - if (defer) { - m_taskQueue.insertFront(tf); - } else { - m_taskQueue.insertFront(thistf); - m_taskQueue.insertFront(tf); - doYield(thist); + final switch (priority) { + case TaskSwitchPriority.normal: + reschedule(tf); + break; + case TaskSwitchPriority.prioritized: + tf.m_dynamicPriority = uint.max; + reschedule(tf); + break; + case TaskSwitchPriority.immediate: + tf.m_dynamicPriority = uint.max; + m_taskQueue.insertFront(thistf); + m_taskQueue.insertFront(tf); + doYield(thist); + break; } } } @@ -994,7 +1024,7 @@ package struct TaskScheduler { } } - private void doYieldAndReschedule(Task task) + private void reschedule(TaskFiber tf) { import std.algorithm.comparison : min; @@ -1009,6 +1039,13 @@ package struct TaskScheduler { t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); return false; }); + } + + private void doYieldAndReschedule(Task task) + { + auto tf = () @trusted { return task.taskFiber; } (); + + reschedule(tf); doYield(task); } From d2b777607ae05542a3294e8c019f5c7b046f910c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 15 Mar 2020 09:02:49 +0100 Subject: [PATCH 4/5] Use normal task priority when handling events. Defining VibeHighEventPriority to revers to the old event scheduling priority. --- source/vibe/internal/async.d | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 0712dce..48ec7be 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -3,7 +3,7 @@ module vibe.internal.async; import std.traits : ParameterTypeTuple, ReturnType; import std.typecons : tuple; import vibe.core.core : hibernate, switchToTask; -import vibe.core.task : InterruptException, Task; +import vibe.core.task : InterruptException, Task, TaskSwitchPriority; import vibe.core.log; import core.time : Duration, seconds; @@ -131,7 +131,10 @@ void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__) fired[%1$s] = true; any_fired = true; Waitables[%1$s].done(%3$s); - if (t != Task.init) switchToTask(t); + if (t != Task.init) { + version (VibeHighEventPriority) switchToTask(t); + else switchToTask(t, TaskSwitchPriority.normal); + } }; debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s); From 0b44d150459985da14fd8974766f53bc310da7fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 15 Mar 2020 14:01:18 +0100 Subject: [PATCH 5/5] Don't rely on a hard-coded port in test. --- tests/vibe.core.net.1376.d | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/vibe.core.net.1376.d b/tests/vibe.core.net.1376.d index 4e01fd9..2d6529d 100644 --- a/tests/vibe.core.net.1376.d +++ b/tests/vibe.core.net.1376.d @@ -12,7 +12,7 @@ import core.time : msecs; shared static this() { - listenTCP(11375,(conn){ + auto l = listenTCP(0, (conn) { auto td = runTask!TCPConnection((conn) { ubyte [3] buf; try { @@ -22,17 +22,17 @@ shared static this() }, conn); sleep(10.msecs); conn.close(); - }); + }, "127.0.0.1"); runTask({ try { - auto conn = connectTCP("127.0.0.1", 11375); + auto conn = connectTCP("127.0.0.1", l.bindAddress.port); conn.write("a"); conn.close(); } catch (Exception e) assert(false, e.msg); try { - auto conn = connectTCP("127.0.0.1", 11375); + auto conn = connectTCP("127.0.0.1", l.bindAddress.port); conn.close(); } catch (Exception e) assert(false, e.msg);