Add yieldLock(), which enables enforcing no task switches within a scope.
runTask() was altered to delay the task switch in case it gets called within an active yieldLock(), so that running tasks is still possible.
This commit is contained in:
parent
698824e811
commit
bc689489a8
|
@ -27,7 +27,7 @@ import std.functional;
|
||||||
import std.range : empty, front, popFront;
|
import std.range : empty, front, popFront;
|
||||||
import std.string;
|
import std.string;
|
||||||
import std.traits : isFunctionPointer;
|
import std.traits : isFunctionPointer;
|
||||||
import std.typecons : Typedef, Tuple, tuple;
|
import std.typecons : Flag, Yes, Typedef, Tuple, tuple;
|
||||||
import std.variant;
|
import std.variant;
|
||||||
import core.atomic;
|
import core.atomic;
|
||||||
import core.sync.condition;
|
import core.sync.condition;
|
||||||
|
@ -373,12 +373,11 @@ package Task runTask_internal(alias TFI_SETUP)()
|
||||||
f.bumpTaskCounter();
|
f.bumpTaskCounter();
|
||||||
auto handle = f.task();
|
auto handle = f.task();
|
||||||
|
|
||||||
debug Task self = Task.getThis();
|
|
||||||
debug if (TaskFiber.ms_taskEventCallback) {
|
debug if (TaskFiber.ms_taskEventCallback) {
|
||||||
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
|
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
|
||||||
}
|
}
|
||||||
|
|
||||||
s_scheduler.switchTo(handle);
|
s_scheduler.switchTo(handle, TaskFiber.getThis().m_yieldLockCount > 0 ? Flag!"defer".yes : Flag!"defer".no);
|
||||||
|
|
||||||
debug if (TaskFiber.ms_taskEventCallback) {
|
debug if (TaskFiber.ms_taskEventCallback) {
|
||||||
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
|
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
|
||||||
|
@ -648,6 +647,7 @@ void yield()
|
||||||
tf.handleInterrupt();
|
tf.handleInterrupt();
|
||||||
} else {
|
} else {
|
||||||
// Let yielded tasks execute
|
// Let yielded tasks execute
|
||||||
|
assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock()!");
|
||||||
() @safe nothrow { performIdleProcessing(); } ();
|
() @safe nothrow { performIdleProcessing(); } ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -669,6 +669,7 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
|
||||||
@safe nothrow {
|
@safe nothrow {
|
||||||
auto t = Task.getThis();
|
auto t = Task.getThis();
|
||||||
if (t == Task.init) {
|
if (t == Task.init) {
|
||||||
|
assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock!");
|
||||||
runEventLoopOnce();
|
runEventLoopOnce();
|
||||||
} else {
|
} else {
|
||||||
auto tf = () @trusted { return t.taskFiber; } ();
|
auto tf = () @trusted { return t.taskFiber; } ();
|
||||||
|
@ -1062,6 +1063,52 @@ struct Timer {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** Returns an object that ensures that no task switches happen during its life time.
|
||||||
|
|
||||||
|
Any attempt to run the event loop or switching to another task will cause
|
||||||
|
an assertion to be thrown within the scope that defines the lifetime of the
|
||||||
|
returned object.
|
||||||
|
|
||||||
|
Multiple yield locks can appear in nested scopes.
|
||||||
|
*/
|
||||||
|
auto yieldLock()
|
||||||
|
{
|
||||||
|
static struct YieldLock {
|
||||||
|
private this(bool) { inc(); }
|
||||||
|
@disable this();
|
||||||
|
@disable this(this);
|
||||||
|
~this() { dec(); }
|
||||||
|
|
||||||
|
private void inc()
|
||||||
|
{
|
||||||
|
TaskFiber.getThis().m_yieldLockCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void dec()
|
||||||
|
{
|
||||||
|
TaskFiber.getThis().m_yieldLockCount--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return YieldLock(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
auto tf = TaskFiber.getThis();
|
||||||
|
assert(tf.m_yieldLockCount == 0);
|
||||||
|
{
|
||||||
|
auto lock = yieldLock();
|
||||||
|
assert(tf.m_yieldLockCount == 1);
|
||||||
|
{
|
||||||
|
auto lock2 = yieldLock();
|
||||||
|
assert(tf.m_yieldLockCount == 2);
|
||||||
|
}
|
||||||
|
assert(tf.m_yieldLockCount == 1);
|
||||||
|
}
|
||||||
|
assert(tf.m_yieldLockCount == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**************************************************************************************************/
|
/**************************************************************************************************/
|
||||||
/* private types */
|
/* private types */
|
||||||
/**************************************************************************************************/
|
/**************************************************************************************************/
|
||||||
|
|
|
@ -302,6 +302,7 @@ final package class TaskFiber : Fiber {
|
||||||
void[] m_fls;
|
void[] m_fls;
|
||||||
|
|
||||||
bool m_interrupt; // Task.interrupt() is progress
|
bool m_interrupt; // Task.interrupt() is progress
|
||||||
|
package int m_yieldLockCount;
|
||||||
|
|
||||||
static TaskFiber ms_globalDummyFiber;
|
static TaskFiber ms_globalDummyFiber;
|
||||||
static FLSInfo[] ms_flsInfo;
|
static FLSInfo[] ms_flsInfo;
|
||||||
|
@ -325,7 +326,7 @@ final package class TaskFiber : Fiber {
|
||||||
auto f = () @trusted nothrow {
|
auto f = () @trusted nothrow {
|
||||||
return Fiber.getThis();
|
return Fiber.getThis();
|
||||||
} ();
|
} ();
|
||||||
if (f) return cast(TaskFiber)f;
|
if (auto tf = cast(TaskFiber)f) return tf;
|
||||||
if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber;
|
if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber;
|
||||||
return ms_globalDummyFiber;
|
return ms_globalDummyFiber;
|
||||||
}
|
}
|
||||||
|
@ -637,6 +638,8 @@ package struct TaskScheduler {
|
||||||
*/
|
*/
|
||||||
ExitReason process()
|
ExitReason process()
|
||||||
{
|
{
|
||||||
|
assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not process events within an active yieldLock()!");
|
||||||
|
|
||||||
bool any_events = false;
|
bool any_events = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
// process pending tasks
|
// process pending tasks
|
||||||
|
@ -742,7 +745,7 @@ package struct TaskScheduler {
|
||||||
This forces immediate execution of the specified task. After the tasks finishes or yields,
|
This forces immediate execution of the specified task. After the tasks finishes or yields,
|
||||||
the calling task will continue execution.
|
the calling task will continue execution.
|
||||||
*/
|
*/
|
||||||
void switchTo(Task t)
|
void switchTo(Task t, Flag!"defer" defer = No.defer)
|
||||||
{
|
{
|
||||||
auto thist = Task.getThis();
|
auto thist = Task.getThis();
|
||||||
|
|
||||||
|
@ -750,14 +753,15 @@ package struct TaskScheduler {
|
||||||
|
|
||||||
auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
|
auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
|
||||||
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
|
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
|
||||||
if (thist == Task.init) {
|
if (thist == Task.init && defer == No.defer) {
|
||||||
|
assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
|
||||||
logTrace("switch to task from global context");
|
logTrace("switch to task from global context");
|
||||||
resumeTask(t);
|
resumeTask(t);
|
||||||
logTrace("task yielded control back to global context");
|
logTrace("task yielded control back to global context");
|
||||||
} else {
|
} else {
|
||||||
auto tf = () @trusted { return t.taskFiber; } ();
|
auto tf = () @trusted { return t.taskFiber; } ();
|
||||||
auto thistf = () @trusted { return thist.taskFiber; } ();
|
auto thistf = () @trusted { return thist.taskFiber; } ();
|
||||||
assert(!thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
|
assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
|
||||||
if (tf.m_queue) {
|
if (tf.m_queue) {
|
||||||
logTrace("Task to switch to is already scheduled. Moving to front of queue.");
|
logTrace("Task to switch to is already scheduled. Moving to front of queue.");
|
||||||
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
|
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
|
||||||
|
@ -766,9 +770,13 @@ package struct TaskScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
|
logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
|
||||||
m_taskQueue.insertFront(thistf);
|
if (defer) {
|
||||||
m_taskQueue.insertFront(tf);
|
m_taskQueue.insertFront(tf);
|
||||||
doYield(thist);
|
} else {
|
||||||
|
m_taskQueue.insertFront(thistf);
|
||||||
|
m_taskQueue.insertFront(tf);
|
||||||
|
doYield(thist);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -840,6 +848,7 @@ package struct TaskScheduler {
|
||||||
|
|
||||||
private void doYield(Task task)
|
private void doYield(Task task)
|
||||||
{
|
{
|
||||||
|
assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!");
|
||||||
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } ();
|
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } ();
|
||||||
() @trusted { Fiber.yield(); } ();
|
() @trusted { Fiber.yield(); } ();
|
||||||
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } ();
|
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } ();
|
||||||
|
|
Loading…
Reference in a new issue