Merge pull request #197 from vibe-d/task_priority_improvements

Task priority improvements
This commit is contained in:
Sönke Ludwig 2020-03-16 14:13:59 +01:00 committed by GitHub
commit 500385a303
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 119 additions and 30 deletions

View file

@ -429,7 +429,7 @@ package Task runTask_internal(alias TFI_SETUP)()
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
} }
s_scheduler.switchTo(handle, TaskFiber.getThis().m_yieldLockCount > 0 ? Flag!"defer".yes : Flag!"defer".no); switchToTask(handle);
debug if (TaskFiber.ms_taskEventCallback) { debug if (TaskFiber.ms_taskEventCallback) {
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
@ -808,13 +808,27 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
This function can be used in conjunction with `hibernate` to wake up a This function can be used in conjunction with `hibernate` to wake up a
task. The task must live in the same thread as the caller. task. The task must live in the same thread as the caller.
See_Also: `hibernate` If no priority is specified, `TaskSwitchPriority.prioritized` or
`TaskSwitchPriority.immediate` will be used, depending on whether a
yield lock is currently active.
Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield
lock is active.
This function must only be called on tasks that belong to the calling
thread and have previously been hibernated!
See_Also: `hibernate`, `yieldLock`
*/ */
void switchToTask(Task t) void switchToTask(Task t)
@safe nothrow { @safe nothrow {
import std.typecons : Yes, No; auto defer = TaskFiber.getThis().m_yieldLockCount > 0;
auto defer = TaskFiber.getThis().m_yieldLockCount > 0 ? Yes.defer : No.defer; s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
s_scheduler.switchTo(t, defer); }
/// ditto
void switchToTask(Task t, TaskSwitchPriority priority)
@safe nothrow {
s_scheduler.switchTo(t, priority);
} }

View file

@ -558,8 +558,8 @@ final package class TaskFiber : Fiber {
if (caller.m_thread is m_thread) { if (caller.m_thread is m_thread) {
auto thisus = () @trusted { return cast()this; } (); auto thisus = () @trusted { return cast()this; } ();
debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); debug (VibeTaskLog) logTrace("Resuming task with interrupt flag.");
auto defer = caller.m_yieldLockCount > 0 ? Yes.defer : No.defer; auto defer = caller.m_yieldLockCount > 0;
taskScheduler.switchTo(thisus.task, defer); taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
} else { } else {
debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming.");
} }
@ -642,6 +642,25 @@ final package class TaskFiber : Fiber {
} }
} }
/** Controls the priority to use for switching execution to a task.
*/
enum TaskSwitchPriority {
/** Rescheduled according to the tasks priority
*/
normal,
/** Rescheduled with maximum priority.
The task will resume as soon as the current task yields.
*/
prioritized,
/** Switch to the task immediately.
*/
immediate
}
package struct TaskFuncInfo { package struct TaskFuncInfo {
void function(ref TaskFuncInfo) func; void function(ref TaskFuncInfo) func;
void[2*size_t.sizeof] callable; void[2*size_t.sizeof] callable;
@ -894,7 +913,7 @@ package struct TaskScheduler {
This forces immediate execution of the specified task. After the tasks finishes or yields, This forces immediate execution of the specified task. After the tasks finishes or yields,
the calling task will continue execution. the calling task will continue execution.
*/ */
void switchTo(Task t, Flag!"defer" defer = No.defer) void switchTo(Task t, TaskSwitchPriority priority)
{ {
auto thist = Task.getThis(); auto thist = Task.getThis();
@ -905,13 +924,16 @@ package struct TaskScheduler {
auto tf = () @trusted { return t.taskFiber; } (); auto tf = () @trusted { return t.taskFiber; } ();
if (tf.m_queue) { if (tf.m_queue) {
// don't reset the position of already scheduled tasks
if (priority == TaskSwitchPriority.normal) return;
debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
m_taskQueue.remove(tf); m_taskQueue.remove(tf);
assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
} }
if (thist == Task.init && defer == No.defer) { if (thist == Task.init && priority == TaskSwitchPriority.immediate) {
assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
debug (VibeTaskLog) logTrace("switch to task from global context"); debug (VibeTaskLog) logTrace("switch to task from global context");
resumeTask(t); resumeTask(t);
@ -921,12 +943,20 @@ package struct TaskScheduler {
assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
if (defer) { final switch (priority) {
m_taskQueue.insertFront(tf); case TaskSwitchPriority.normal:
} else { reschedule(tf);
break;
case TaskSwitchPriority.prioritized:
tf.m_dynamicPriority = uint.max;
reschedule(tf);
break;
case TaskSwitchPriority.immediate:
tf.m_dynamicPriority = uint.max;
m_taskQueue.insertFront(thistf); m_taskQueue.insertFront(thistf);
m_taskQueue.insertFront(tf); m_taskQueue.insertFront(tf);
doYield(thist); doYield(thist);
break;
} }
} }
} }
@ -994,9 +1024,9 @@ package struct TaskScheduler {
} }
} }
private void doYieldAndReschedule(Task task) private void reschedule(TaskFiber tf)
{ {
auto tf = () @trusted { return task.taskFiber; } (); import std.algorithm.comparison : min;
// insert according to priority, limited to a priority // insert according to priority, limited to a priority
// factor of 1:10 in case of heavy concurrency // factor of 1:10 in case of heavy concurrency
@ -1006,9 +1036,16 @@ package struct TaskScheduler {
// increase dynamic priority each time a task gets overtaken to // increase dynamic priority each time a task gets overtaken to
// ensure a fair schedule // ensure a fair schedule
t.m_dynamicPriority += t.m_staticPriority; t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority);
return false; return false;
}); });
}
private void doYieldAndReschedule(Task task)
{
auto tf = () @trusted { return task.taskFiber; } ();
reschedule(tf);
doYield(task); doYield(task);
} }
@ -1089,6 +1126,7 @@ private struct TaskFiberQueue {
if (!max_skip-- || pred(t)) { if (!max_skip-- || pred(t)) {
task.m_queue = &this; task.m_queue = &this;
task.m_next = t.m_next; task.m_next = t.m_next;
if (task.m_next) task.m_next.m_prev = task;
t.m_next = task; t.m_next = task;
task.m_prev = t; task.m_prev = t;
if (!task.m_next) last = task; if (!task.m_next) last = task;
@ -1151,18 +1189,52 @@ unittest {
auto f3 = new TaskFiber; auto f3 = new TaskFiber;
auto f4 = new TaskFiber; auto f4 = new TaskFiber;
auto f5 = new TaskFiber; auto f5 = new TaskFiber;
auto f6 = new TaskFiber;
TaskFiberQueue q; TaskFiberQueue q;
void checkQueue()
{
TaskFiber p;
for (auto t = q.front; t; t = t.m_next) {
assert(t.m_prev is p);
assert(t.m_next || t is q.last);
p = t;
}
TaskFiber n;
for (auto t = q.last; t; t = t.m_prev) {
assert(t.m_next is n);
assert(t.m_prev || t is q.first);
n = t;
}
}
q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); });
assert(q.first == f1 && q.last == f1); assert(q.first is f1 && q.last is f1);
checkQueue();
q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); });
assert(q.first == f1 && q.last == f2); assert(q.first is f1 && q.last is f2);
checkQueue();
q.insertBackPred(f3, 1, (tf) => false); q.insertBackPred(f3, 1, (tf) => false);
assert(q.first == f1 && q.last == f2); assert(q.first is f1 && q.last is f2);
assert(f1.m_next is f3);
assert(f3.m_prev is f1);
checkQueue();
q.insertBackPred(f4, 10, (tf) => false); q.insertBackPred(f4, 10, (tf) => false);
assert(q.first == f4 && q.last == f2); assert(q.first is f4 && q.last is f2);
checkQueue();
q.insertBackPred(f5, 10, (tf) => true); q.insertBackPred(f5, 10, (tf) => true);
assert(q.first == f4 && q.last == f5); assert(q.first is f4 && q.last is f5);
checkQueue();
q.insertBackPred(f6, 10, (tf) => tf is f4);
assert(q.first is f4 && q.last is f5);
assert(f4.m_next is f6);
checkQueue();
} }
private struct FLSInfo { private struct FLSInfo {

View file

@ -3,7 +3,7 @@ module vibe.internal.async;
import std.traits : ParameterTypeTuple, ReturnType; import std.traits : ParameterTypeTuple, ReturnType;
import std.typecons : tuple; import std.typecons : tuple;
import vibe.core.core : hibernate, switchToTask; import vibe.core.core : hibernate, switchToTask;
import vibe.core.task : InterruptException, Task; import vibe.core.task : InterruptException, Task, TaskSwitchPriority;
import vibe.core.log; import vibe.core.log;
import core.time : Duration, seconds; import core.time : Duration, seconds;
@ -131,7 +131,10 @@ void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__)
fired[%1$s] = true; fired[%1$s] = true;
any_fired = true; any_fired = true;
Waitables[%1$s].done(%3$s); Waitables[%1$s].done(%3$s);
if (t != Task.init) switchToTask(t); if (t != Task.init) {
version (VibeHighEventPriority) switchToTask(t);
else switchToTask(t, TaskSwitchPriority.normal);
}
}; };
debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s); debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s);

View file

@ -12,7 +12,7 @@ import core.time : msecs;
shared static this() shared static this()
{ {
listenTCP(11375,(conn){ auto l = listenTCP(0, (conn) {
auto td = runTask!TCPConnection((conn) { auto td = runTask!TCPConnection((conn) {
ubyte [3] buf; ubyte [3] buf;
try { try {
@ -22,17 +22,17 @@ shared static this()
}, conn); }, conn);
sleep(10.msecs); sleep(10.msecs);
conn.close(); conn.close();
}); }, "127.0.0.1");
runTask({ runTask({
try { try {
auto conn = connectTCP("127.0.0.1", 11375); auto conn = connectTCP("127.0.0.1", l.bindAddress.port);
conn.write("a"); conn.write("a");
conn.close(); conn.close();
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
try { try {
auto conn = connectTCP("127.0.0.1", 11375); auto conn = connectTCP("127.0.0.1", l.bindAddress.port);
conn.close(); conn.close();
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);