Add TaskSwitchPriority to control the priority to use when transferring execution.

This commit is contained in:
Sönke Ludwig 2020-03-15 09:02:18 +01:00
parent d07c2f02e6
commit 1eebe7a9ce
2 changed files with 67 additions and 16 deletions

View file

@ -429,7 +429,7 @@ package Task runTask_internal(alias TFI_SETUP)()
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
} }
s_scheduler.switchTo(handle, TaskFiber.getThis().m_yieldLockCount > 0 ? Flag!"defer".yes : Flag!"defer".no); switchToTask(handle);
debug if (TaskFiber.ms_taskEventCallback) { debug if (TaskFiber.ms_taskEventCallback) {
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
@ -808,13 +808,27 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
This function can be used in conjunction with `hibernate` to wake up a This function can be used in conjunction with `hibernate` to wake up a
task. The task must live in the same thread as the caller. task. The task must live in the same thread as the caller.
See_Also: `hibernate` If no priority is specified, `TaskSwitchPriority.prioritized` or
`TaskSwitchPriority.immediate` will be used, depending on whether a
yield lock is currently active.
Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield
lock is active.
This function must only be called on tasks that belong to the calling
thread and have previously been hibernated!
See_Also: `hibernate`, `yieldLock`
*/ */
void switchToTask(Task t) void switchToTask(Task t)
@safe nothrow { @safe nothrow {
import std.typecons : Yes, No; auto defer = TaskFiber.getThis().m_yieldLockCount > 0;
auto defer = TaskFiber.getThis().m_yieldLockCount > 0 ? Yes.defer : No.defer; s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
s_scheduler.switchTo(t, defer); }
/// ditto
void switchToTask(Task t, TaskSwitchPriority priority)
@safe nothrow {
s_scheduler.switchTo(t, priority);
} }

View file

@ -558,8 +558,8 @@ final package class TaskFiber : Fiber {
if (caller.m_thread is m_thread) { if (caller.m_thread is m_thread) {
auto thisus = () @trusted { return cast()this; } (); auto thisus = () @trusted { return cast()this; } ();
debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); debug (VibeTaskLog) logTrace("Resuming task with interrupt flag.");
auto defer = caller.m_yieldLockCount > 0 ? Yes.defer : No.defer; auto defer = caller.m_yieldLockCount > 0;
taskScheduler.switchTo(thisus.task, defer); taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
} else { } else {
debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming.");
} }
@ -642,6 +642,25 @@ final package class TaskFiber : Fiber {
} }
} }
/** Controls the priority to use for switching execution to a task.
*/
enum TaskSwitchPriority {
/** Rescheduled according to the tasks priority
*/
normal,
/** Rescheduled with maximum priority.
The task will resume as soon as the current task yields.
*/
prioritized,
/** Switch to the task immediately.
*/
immediate
}
package struct TaskFuncInfo { package struct TaskFuncInfo {
void function(ref TaskFuncInfo) func; void function(ref TaskFuncInfo) func;
void[2*size_t.sizeof] callable; void[2*size_t.sizeof] callable;
@ -894,7 +913,7 @@ package struct TaskScheduler {
This forces immediate execution of the specified task. After the tasks finishes or yields, This forces immediate execution of the specified task. After the tasks finishes or yields,
the calling task will continue execution. the calling task will continue execution.
*/ */
void switchTo(Task t, Flag!"defer" defer = No.defer) void switchTo(Task t, TaskSwitchPriority priority)
{ {
auto thist = Task.getThis(); auto thist = Task.getThis();
@ -905,13 +924,16 @@ package struct TaskScheduler {
auto tf = () @trusted { return t.taskFiber; } (); auto tf = () @trusted { return t.taskFiber; } ();
if (tf.m_queue) { if (tf.m_queue) {
// don't reset the position of already scheduled tasks
if (priority == TaskSwitchPriority.normal) return;
debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
m_taskQueue.remove(tf); m_taskQueue.remove(tf);
assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
} }
if (thist == Task.init && defer == No.defer) { if (thist == Task.init && priority == TaskSwitchPriority.immediate) {
assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
debug (VibeTaskLog) logTrace("switch to task from global context"); debug (VibeTaskLog) logTrace("switch to task from global context");
resumeTask(t); resumeTask(t);
@ -921,12 +943,20 @@ package struct TaskScheduler {
assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
if (defer) { final switch (priority) {
m_taskQueue.insertFront(tf); case TaskSwitchPriority.normal:
} else { reschedule(tf);
m_taskQueue.insertFront(thistf); break;
m_taskQueue.insertFront(tf); case TaskSwitchPriority.prioritized:
doYield(thist); tf.m_dynamicPriority = uint.max;
reschedule(tf);
break;
case TaskSwitchPriority.immediate:
tf.m_dynamicPriority = uint.max;
m_taskQueue.insertFront(thistf);
m_taskQueue.insertFront(tf);
doYield(thist);
break;
} }
} }
} }
@ -994,7 +1024,7 @@ package struct TaskScheduler {
} }
} }
private void doYieldAndReschedule(Task task) private void reschedule(TaskFiber tf)
{ {
import std.algorithm.comparison : min; import std.algorithm.comparison : min;
@ -1009,6 +1039,13 @@ package struct TaskScheduler {
t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority);
return false; return false;
}); });
}
private void doYieldAndReschedule(Task task)
{
auto tf = () @trusted { return task.taskFiber; } ();
reschedule(tf);
doYield(task); doYield(task);
} }