From bf2edc7fb24de9c44eab87731641a1bc8c73d9bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 10 Dec 2016 14:13:44 +0100 Subject: [PATCH] Fix TaskScheduler.waitAndProcess to not block if any tasks were resumed. In situations where no events were involved in an multi-task scenario, the old behavior could lead to the process to starve or hang until an event arrived. --- source/vibe/core/core.d | 2 +- source/vibe/core/task.d | 26 +++++++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index ba60376..d97248d 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1131,7 +1131,7 @@ package(vibe) void performIdleProcessing() again = s_idleHandler(); else again = false; - again = (s_scheduler.schedule() || again) && !getExitFlag(); + again = (s_scheduler.schedule() == ScheduleStatus.busy || again) && !getExitFlag(); if (again) { auto er = eventDriver.core.processEvents(0.seconds); diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index b84a37a..b27c274 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -57,7 +57,6 @@ struct Task { if (!fiber) return Task.init; auto tfiber = cast(TaskFiber)fiber; assert(tfiber !is null, "Invalid or null fiber used to construct Task handle."); - if (!tfiber.m_running) return Task.init; return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } (); } @@ -104,6 +103,7 @@ struct Task { md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ())); md.put(nativeToLittleEndian(m_taskCounter)); Base64.encode(md.finish()[0 .. 3], dst); + if (!this.running) dst.put("-fin"); } bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } @@ -458,8 +458,10 @@ final package class TaskFiber : Fiber { package void handleInterrupt() @safe { - if (m_interrupt) + if (m_interrupt) { + m_interrupt = false; throw new InterruptException; + } } } @@ -551,7 +553,7 @@ package struct TaskScheduler { bool any_events = false; while (true) { // process pending tasks - schedule(); + bool any_tasks_processed = schedule() != ScheduleStatus.idle; logTrace("Processing pending events..."); ExitReason er = eventDriver.core.processEvents(0.seconds); @@ -565,7 +567,7 @@ package struct TaskScheduler { break; case ExitReason.timeout: if (!scheduledTaskCount) - return any_events ? ExitReason.idle : ExitReason.timeout; + return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout; break; case ExitReason.idle: any_events = true; @@ -686,8 +688,12 @@ package struct TaskScheduler { Returns: Returns `true` $(I iff) there are more tasks left to process. */ - bool schedule() + ScheduleStatus schedule() { + if (m_taskQueue.empty) + return ScheduleStatus.idle; + + if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here! assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); @@ -703,13 +709,13 @@ package struct TaskScheduler { resumeTask(t.task); assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?"); - if (m_taskQueue.empty) return false; // handle gracefully in release mode + if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode } // remove marker task m_taskQueue.popFront(); - return !m_taskQueue.empty; + return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; } /// Resumes execution of a yielded task. @@ -740,6 +746,12 @@ package struct TaskScheduler { } } +package enum ScheduleStatus { + idle, + allProcessed, + busy +} + private struct TaskFiberQueue { @safe nothrow: