diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index af7fd85..823dd64 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -333,7 +333,7 @@ final package class TaskFiber : Fiber { debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); if (!isEventLoopRunning) { logTrace("Event loop not running at task start - yielding."); - vibe.core.core.yield(); + vibe.core.core.taskScheduler.yieldUninterruptible(); logTrace("Initial resume of task."); } task.func(&task); @@ -345,9 +345,17 @@ final package class TaskFiber : Fiber { logDebug("Full error: %s", e.toString().sanitize()); } + if (m_interrupt) { + logDebug("Task exited while an interrupt was in flight."); + m_interrupt = false; + } + this.tidInfo.ident = Tid.init; // clear message box - foreach (t; m_joiners) taskScheduler.switchTo(t); + foreach (t; m_joiners) { + logTrace("Resuming joining task."); + taskScheduler.switchTo(t); + } m_joiners.length = 0; m_joiners.assumeSafeAppend(); @@ -414,6 +422,7 @@ final package class TaskFiber : Fiber { assert(caller != this.task, "A task cannot interrupt itself."); assert(caller.thread is this.thread, "Interrupting tasks in different threads is not yet supported."); } else assert(Thread.getThis() is this.thread, "Interrupting tasks in different threads is not yet supported."); + logTrace("Resuming task with interrupt flag."); m_interrupt = true; taskScheduler.switchTo(this.task); } @@ -428,10 +437,17 @@ final package class TaskFiber : Fiber { @safe nothrow { assert(Task.getThis().fiber is this, "Handling interrupt outside of the corresponding fiber."); if (m_interrupt && on_interrupt) { + logTrace("Handling interrupt flag."); m_interrupt = false; on_interrupt(); } } + + package void handleInterrupt() + @safe { + if (m_interrupt) + throw new InterruptException; + } } package struct TaskFuncInfo { @@ -470,11 +486,11 @@ package struct TaskScheduler { TaskFiber m_markerTask; } - @safe nothrow: + @safe: @disable this(this); - @property size_t scheduledTaskCount() const { return m_taskQueue.length; } + @property size_t scheduledTaskCount() const nothrow { return m_taskQueue.length; } /** Lets other pending tasks execute before continuing execution. @@ -483,6 +499,20 @@ package struct TaskScheduler { fĂ­rst-in-first-out manner. */ void yield() + { + auto t = Task.getThis(); + if (t == Task.init) return; // not really a task -> no-op + logTrace("Yielding (interrupt=%s)", t.taskFiber.m_interrupt); + t.taskFiber.handleInterrupt(); + if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed + m_taskQueue.insertBack(t.taskFiber); + doYield(t); + t.taskFiber.handleInterrupt(); + } + + nothrow: + + void yieldUninterruptible() { auto t = Task.getThis(); if (t == Task.init) return; // not really a task -> no-op @@ -502,7 +532,7 @@ package struct TaskScheduler { if (thist == Task.init) { assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); static import vibe.core.core; - vibe.core.core.processEvents(); + vibe.core.core.runEventLoopOnce(); } else { doYield(thist); } @@ -516,12 +546,23 @@ package struct TaskScheduler { void switchTo(Task t) { auto thist = Task.getThis(); + + if (t == thist) return; + auto thisthr = thist ? thist.taskFiber.thread : () @trusted { return Thread.getThis(); } (); assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); if (thist == Task.init) { resumeTask(t); } else { - assert(!thist.taskFiber.m_queue, "Task already scheduled to be resumed... FIXME: should this really be an error?"); + assert(!thist.taskFiber.m_queue, "Calling task is running, but scheduled to be resumed!?"); + if (t.taskFiber.m_queue) { + logTrace("Task to switch to is already scheduled. Moving to front of queue."); + assert(t.taskFiber.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); + m_taskQueue.remove(t.taskFiber); + assert(!t.taskFiber.m_queue, "Task removed from queue, but still has one set!?"); + } + + logTrace("Switching tasks"); m_taskQueue.insertFront(thist.taskFiber); m_taskQueue.insertFront(t.taskFiber); doYield(thist); @@ -604,7 +645,7 @@ private struct TaskFiberQueue { void insertFront(TaskFiber task) { - assert(task.m_queue == null, "Task is already scheduled to be resumed!"); + assert(task.m_queue is null, "Task is already scheduled to be resumed!"); assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); assert(task.m_next is null, "Task has m_next set without being in a queue!?"); task.m_queue = &this; @@ -621,7 +662,7 @@ private struct TaskFiberQueue { void insertBack(TaskFiber task) { - assert(task.m_queue == null, "Task is already scheduled to be resumed!"); + assert(task.m_queue is null, "Task is already scheduled to be resumed!"); assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); assert(task.m_next is null, "Task has m_next set without being in a queue!?"); task.m_queue = &this;