Merge pull request #196 from vibe-d/task_priorities

Implement priority based task scheduling.
merged-on-behalf-of: Leonid Kramer <l-kramer@users.noreply.github.com>
This commit is contained in:
The Dlang Bot 2020-03-15 08:00:42 +01:00 committed by GitHub
commit 1f86470e4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 274 additions and 41 deletions

View file

@ -1,7 +1,7 @@
/**
This module contains the core functionality of the vibe.d framework.
Copyright: © 2012-2019 Sönke Ludwig
Copyright: © 2012-2020 Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig
*/
@ -319,7 +319,7 @@ Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args)
{
return runTask_internal!((ref tfi) { tfi.set(task, args); });
}
///
/// ditto
Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args)
@system {
return runTask_internal!((ref tfi) { tfi.set(task, args); });
@ -330,6 +330,50 @@ Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
{
return runTask_internal!((ref tfi) { tfi.set(task, args); });
}
/// ditto
Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args)
{
return runTask_internal!((ref tfi) {
tfi.settings = settings;
tfi.set(task, args);
});
}
/// ditto
Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args)
@system {
return runTask_internal!((ref tfi) {
tfi.settings = settings;
tfi.set(task, args);
});
}
/// ditto
Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args)
if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init))))
{
return runTask_internal!((ref tfi) {
tfi.settings = settings;
tfi.set(task, args);
});
}
unittest { // test proportional priority scheduling
auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); });
scope (exit) tm.stop();
size_t a, b;
auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; yield(); } });
auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; yield(); } });
runTask({
t1.join();
t2.join();
exitEventLoop();
});
runEventLoop();
assert(a + b == 100);
assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1
}
/**
Runs an asyncronous task that is guaranteed to finish before the caller's
@ -429,6 +473,21 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS arg
setupWorkerThreads();
st_workerPool.runTask!method(object, args);
}
/// ditto
void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
setupWorkerThreads();
st_workerPool.runTask(settings, func, args);
}
/// ditto
void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
setupWorkerThreads();
st_workerPool.runTask!method(settings, object, args);
}
/**
Runs a new asynchronous task in a worker thread, returning the task handle.
@ -452,6 +511,20 @@ Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS ar
setupWorkerThreads();
return st_workerPool.runTaskH!method(object, args);
}
/// ditto
Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
setupWorkerThreads();
return st_workerPool.runTaskH(settings, func, args);
}
/// ditto
Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
setupWorkerThreads();
return st_workerPool.runTaskH!method(settings, object, args);
}
/// Running a worker task using a function
unittest {
@ -583,6 +656,19 @@ void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
setupWorkerThreads();
return st_workerPool.runTaskDist!method(object, args);
}
/// ditto
void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
setupWorkerThreads();
return st_workerPool.runTaskDist(settings, func, args);
}
/// ditto
void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args)
{
setupWorkerThreads();
return st_workerPool.runTaskDist!method(settings, object, args);
}
/** Runs a new asynchronous task in all worker threads and returns the handles.
@ -598,6 +684,13 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref
setupWorkerThreads();
st_workerPool.runTaskDistH(on_handle, func, args);
}
/// ditto
void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
setupWorkerThreads();
st_workerPool.runTaskDistH(settings, on_handle, func, args);
}
/**

View file

@ -1,7 +1,7 @@
/**
Contains interfaces and enums for evented I/O drivers.
Copyright: © 2012-2016 Sönke Ludwig
Copyright: © 2012-2020 Sönke Ludwig
Authors: Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
*/
@ -31,6 +31,8 @@ struct Task {
static ThreadInfo s_tidInfo;
}
enum basePriority = 0x00010000;
private this(TaskFiber fiber, size_t task_counter)
@safe nothrow {
() @trusted { m_fiber = cast(shared)fiber; } ();
@ -129,6 +131,27 @@ struct Task {
bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
}
/** Settings to control the behavior of newly started tasks.
*/
struct TaskSettings {
/** Scheduling priority of the task
The priority of a task is roughly proportional to the amount of
times it gets scheduled in comparison to competing tasks. For
example, a task with priority 100 will be scheduled every 10 rounds
when competing against a task with priority 1000.
The default priority is defined by `basePriority` and has a value
of 65536. Priorities should be computed relative to `basePriority`.
A task with a priority of zero will only be executed if no other
non-zero task is competing.
*/
uint priority = Task.basePriority;
}
/**
Implements a task local storage variable.
@ -316,7 +339,9 @@ final package class TaskFiber : Fiber {
Thread m_thread;
ThreadInfo m_tidInfo;
uint m_staticPriority, m_dynamicPriority;
shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags
bool m_shutdown = false;
shared(ManualEvent) m_onExit;
@ -384,6 +409,7 @@ final package class TaskFiber : Fiber {
TaskFuncInfo task;
swap(task, m_taskFunc);
m_dynamicPriority = m_staticPriority = task.settings.priority;
Task handle = this.task;
try {
atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
@ -621,6 +647,7 @@ package struct TaskFuncInfo {
void[2*size_t.sizeof] callable;
void[maxTaskParameterSize] args;
debug ulong functionPointer;
TaskSettings settings;
void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{
@ -715,7 +742,6 @@ package struct TaskScheduler {
private {
TaskFiberQueue m_taskQueue;
TaskFiber m_markerTask;
}
@safe:
@ -738,8 +764,7 @@ package struct TaskScheduler {
debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ());
tf.handleInterrupt();
if (tf.m_queue !is null) return; // already scheduled to be resumed
m_taskQueue.insertBack(tf);
doYield(t);
doYieldAndReschedule(t);
tf.handleInterrupt();
}
@ -846,13 +871,10 @@ package struct TaskScheduler {
if (t == Task.init) return; // not really a task -> no-op
auto tf = () @trusted { return t.taskFiber; } ();
if (tf.m_queue !is null) return; // already scheduled to be resumed
m_taskQueue.insertBack(tf);
doYield(t);
doYieldAndReschedule(t);
}
/** Holds execution until the task gets explicitly resumed.
*/
void hibernate()
{
@ -923,33 +945,26 @@ package struct TaskScheduler {
return ScheduleStatus.idle;
if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here!
scope (exit) assert(!m_markerTask.m_queue, "Marker task still in queue!?");
assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!");
assert(!m_markerTask.m_queue, "TaskScheduler.schedule() was called recursively!");
// keep track of the end of the queue, so that we don't process tasks
// infinitely
m_taskQueue.insertBack(m_markerTask);
if (m_taskQueue.empty) return ScheduleStatus.idle;
while (m_taskQueue.front !is m_markerTask) {
foreach (i; 0 .. m_taskQueue.length) {
auto t = m_taskQueue.front;
m_taskQueue.popFront();
// reset priority
t.m_dynamicPriority = t.m_staticPriority;
debug (VibeTaskLog) logTrace("resuming task");
auto task = t.task;
if (task != Task.init)
resumeTask(t.task);
debug (VibeTaskLog) logTrace("task out");
assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?");
if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode
if (m_taskQueue.empty) break;
}
// remove marker task
m_taskQueue.popFront();
debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length);
return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy;
@ -979,6 +994,25 @@ package struct TaskScheduler {
}
}
private void doYieldAndReschedule(Task task)
{
auto tf = () @trusted { return task.taskFiber; } ();
// insert according to priority, limited to a priority
// factor of 1:10 in case of heavy concurrency
m_taskQueue.insertBackPred(tf, 10, (t) {
if (t.m_dynamicPriority >= tf.m_dynamicPriority)
return true;
// increase dynamic priority each time a task gets overtaken to
// ensure a fair schedule
t.m_dynamicPriority += t.m_staticPriority;
return false;
});
doYield(task);
}
private void doYield(Task task)
{
assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!");
@ -1041,6 +1075,31 @@ private struct TaskFiberQueue {
length++;
}
// inserts a task after the first task for which the predicate yields `true`,
// starting from the back. a maximum of max_skip tasks will be skipped
// before the task is inserted regardless of the predicate.
void insertBackPred(TaskFiber task, size_t max_skip,
scope bool delegate(TaskFiber) @safe nothrow pred)
{
assert(task.m_queue is null, "Task is already scheduled to be resumed!");
assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
assert(task.m_next is null, "Task has m_next set without being in a queue!?");
for (auto t = last; t; t = t.m_prev) {
if (!max_skip-- || pred(t)) {
task.m_queue = &this;
task.m_next = t.m_next;
t.m_next = task;
task.m_prev = t;
if (!task.m_next) last = task;
length++;
return;
}
}
insertFront(task);
}
void popFront()
{
if (first is last) last = null;
@ -1086,6 +1145,26 @@ unittest {
assert(q.empty && q.length == 0);
}
unittest {
auto f1 = new TaskFiber;
auto f2 = new TaskFiber;
auto f3 = new TaskFiber;
auto f4 = new TaskFiber;
auto f5 = new TaskFiber;
TaskFiberQueue q;
q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); });
assert(q.first == f1 && q.last == f1);
q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); });
assert(q.first == f1 && q.last == f2);
q.insertBackPred(f3, 1, (tf) => false);
assert(q.first == f1 && q.last == f2);
q.insertBackPred(f4, 10, (tf) => false);
assert(q.first == f4 && q.last == f2);
q.insertBackPred(f5, 10, (tf) => true);
assert(q.first == f4 && q.last == f5);
}
private struct FLSInfo {
void function(void[], size_t) fct;
size_t offset;

View file

@ -1,7 +1,7 @@
/**
Multi-threaded task pool implementation.
Copyright: © 2012-2017 Sönke Ludwig
Copyright: © 2012-2020 Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig
*/
@ -11,7 +11,7 @@ import vibe.core.concurrency : isWeaklyIsolated;
import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
import vibe.core.log;
import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor;
import vibe.core.task : Task, TaskFuncInfo, callWithMove;
import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove;
import core.sync.mutex : Mutex;
import core.thread : Thread;
import std.concurrency : prioritySend, receiveOnly;
@ -113,16 +113,30 @@ shared final class TaskPool {
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTask_unsafe(func, args);
runTask_unsafe(TaskSettings.init, func, args);
}
/// ditto
void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
auto func = &__traits(getMember, object, __traits(identifier, method));
runTask_unsafe(func, args);
runTask_unsafe(TaskSettings.init, func, args);
}
/// ditto
void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTask_unsafe(settings, func, args);
}
/// ditto
void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
auto func = &__traits(getMember, object, __traits(identifier, method));
runTask_unsafe(settings, func, args);
}
/** Runs a new asynchronous task in a worker thread, returning the task handle.
@ -141,9 +155,9 @@ shared final class TaskPool {
// workaround for runWorkerTaskH to work when called outside of a task
if (Task.getThis() == Task.init) {
Task ret;
.runTask({ ret = doRunTaskH(func, args); }).join();
.runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join();
return ret;
} else return doRunTaskH(func, args);
} else return doRunTaskH(TaskSettings.init, func, args);
}
/// ditto
Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
@ -154,10 +168,32 @@ shared final class TaskPool {
}
return runTaskH(&wrapper!(), object, args);
}
/// ditto
Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (isFunctionPointer!FT)
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
// workaround for runWorkerTaskH to work when called outside of a task
if (Task.getThis() == Task.init) {
Task ret;
.runTask({ ret = doRunTaskH(settings, func, args); }).join();
return ret;
} else return doRunTaskH(settings, func, args);
}
/// ditto
Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
static void wrapper()(shared(T) object, ref ARGS args) {
__traits(getMember, object, __traits(identifier, method))(args);
}
return runTaskH(settings, &wrapper!(), object, args);
}
// NOTE: needs to be a separate function to avoid recursion for the
// workaround above, which breaks @safe inference
private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args)
private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args)
if (isFunctionPointer!FT)
{
import std.typecons : Typedef;
@ -173,7 +209,7 @@ shared final class TaskPool {
caller.tid.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
runTask_unsafe(settings, &taskFun, caller, func, args);
return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } ();
}
@ -191,7 +227,7 @@ shared final class TaskPool {
if (is(typeof(*func) == function))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(func, args);
runTaskDist_unsafe(TaskSettings.init, func, args);
}
/// ditto
void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
@ -199,7 +235,22 @@ shared final class TaskPool {
auto func = &__traits(getMember, object, __traits(identifier, method));
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(func, args);
runTaskDist_unsafe(TaskSettings.init, func, args);
}
/// ditto
void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
if (is(typeof(*func) == function))
{
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(settings, func, args);
}
/// ditto
void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
{
auto func = &__traits(getMember, object, __traits(identifier, method));
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
runTaskDist_unsafe(settings, func, args);
}
/** Runs a new asynchronous task in all worker threads and returns the handles.
@ -210,6 +261,12 @@ shared final class TaskPool {
See_also: `runTaskDist`
*/
void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
if (!is(HCB == TaskSettings))
{
runTaskDistH(TaskSettings.init, on_handle, func, args);
}
/// ditto
void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
{
// TODO: support non-copyable argument types using .move
import std.concurrency : send, receiveOnly;
@ -226,13 +283,13 @@ shared final class TaskPool {
t.tid.send(Task.getThis());
func(args);
}
runTaskDist(&call, caller, func, args);
runTaskDist(settings, &call, caller, func, args);
foreach (i; 0 .. this.threadCount)
on_handle(receiveOnly!Task);
}
private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args)
private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args)
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
@ -242,11 +299,11 @@ shared final class TaskPool {
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
m_state.lock.queue.put(callable, args);
m_state.lock.queue.put(settings, callable, args);
m_signal.emitSingle();
}
private void runTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types!
private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types!
{
import std.traits : ParameterTypeTuple;
import vibe.internal.traits : areConvertibleTo;
@ -260,7 +317,7 @@ shared final class TaskPool {
auto st = m_state.lock;
foreach (thr; st.threads) {
// create one TFI per thread to properly account for elaborate assignment operators/postblit
thr.m_queue.put(callable, args);
thr.m_queue.put(settings, callable, args);
}
}
m_signal.emit();
@ -355,12 +412,13 @@ nothrow @safe:
@property size_t length() const { return m_queue.length; }
void put(CALLABLE, ARGS...)(ref CALLABLE c, ref ARGS args)
void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args)
{
import std.algorithm.comparison : max;
if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2);
assert(!m_queue.full);
m_queue.peekDst[0].settings = settings;
m_queue.peekDst[0].set(c, args);
m_queue.putN(1);
}

View file

@ -23,6 +23,9 @@ void main()
t.join();
});
// let the outer task run and start the inner task
yield();
// let the outer task get another execution slice to write to t
yield();
assert(t && t.running);