Fix TaskScheduler.waitAndProcess to not block if any tasks were resumed.
In situations where no events were involved in an multi-task scenario, the old behavior could lead to the process to starve or hang until an event arrived.
This commit is contained in:
parent
df69c86bd4
commit
bf2edc7fb2
|
@ -1131,7 +1131,7 @@ package(vibe) void performIdleProcessing()
|
||||||
again = s_idleHandler();
|
again = s_idleHandler();
|
||||||
else again = false;
|
else again = false;
|
||||||
|
|
||||||
again = (s_scheduler.schedule() || again) && !getExitFlag();
|
again = (s_scheduler.schedule() == ScheduleStatus.busy || again) && !getExitFlag();
|
||||||
|
|
||||||
if (again) {
|
if (again) {
|
||||||
auto er = eventDriver.core.processEvents(0.seconds);
|
auto er = eventDriver.core.processEvents(0.seconds);
|
||||||
|
|
|
@ -57,7 +57,6 @@ struct Task {
|
||||||
if (!fiber) return Task.init;
|
if (!fiber) return Task.init;
|
||||||
auto tfiber = cast(TaskFiber)fiber;
|
auto tfiber = cast(TaskFiber)fiber;
|
||||||
assert(tfiber !is null, "Invalid or null fiber used to construct Task handle.");
|
assert(tfiber !is null, "Invalid or null fiber used to construct Task handle.");
|
||||||
if (!tfiber.m_running) return Task.init;
|
|
||||||
return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
|
return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,6 +103,7 @@ struct Task {
|
||||||
md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ()));
|
md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ()));
|
||||||
md.put(nativeToLittleEndian(m_taskCounter));
|
md.put(nativeToLittleEndian(m_taskCounter));
|
||||||
Base64.encode(md.finish()[0 .. 3], dst);
|
Base64.encode(md.finish()[0 .. 3], dst);
|
||||||
|
if (!this.running) dst.put("-fin");
|
||||||
}
|
}
|
||||||
|
|
||||||
bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
|
bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
|
||||||
|
@ -458,9 +458,11 @@ final package class TaskFiber : Fiber {
|
||||||
|
|
||||||
package void handleInterrupt()
|
package void handleInterrupt()
|
||||||
@safe {
|
@safe {
|
||||||
if (m_interrupt)
|
if (m_interrupt) {
|
||||||
|
m_interrupt = false;
|
||||||
throw new InterruptException;
|
throw new InterruptException;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
package struct TaskFuncInfo {
|
package struct TaskFuncInfo {
|
||||||
|
@ -551,7 +553,7 @@ package struct TaskScheduler {
|
||||||
bool any_events = false;
|
bool any_events = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
// process pending tasks
|
// process pending tasks
|
||||||
schedule();
|
bool any_tasks_processed = schedule() != ScheduleStatus.idle;
|
||||||
|
|
||||||
logTrace("Processing pending events...");
|
logTrace("Processing pending events...");
|
||||||
ExitReason er = eventDriver.core.processEvents(0.seconds);
|
ExitReason er = eventDriver.core.processEvents(0.seconds);
|
||||||
|
@ -565,7 +567,7 @@ package struct TaskScheduler {
|
||||||
break;
|
break;
|
||||||
case ExitReason.timeout:
|
case ExitReason.timeout:
|
||||||
if (!scheduledTaskCount)
|
if (!scheduledTaskCount)
|
||||||
return any_events ? ExitReason.idle : ExitReason.timeout;
|
return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout;
|
||||||
break;
|
break;
|
||||||
case ExitReason.idle:
|
case ExitReason.idle:
|
||||||
any_events = true;
|
any_events = true;
|
||||||
|
@ -686,8 +688,12 @@ package struct TaskScheduler {
|
||||||
Returns:
|
Returns:
|
||||||
Returns `true` $(I iff) there are more tasks left to process.
|
Returns `true` $(I iff) there are more tasks left to process.
|
||||||
*/
|
*/
|
||||||
bool schedule()
|
ScheduleStatus schedule()
|
||||||
{
|
{
|
||||||
|
if (m_taskQueue.empty)
|
||||||
|
return ScheduleStatus.idle;
|
||||||
|
|
||||||
|
|
||||||
if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here!
|
if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here!
|
||||||
|
|
||||||
assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!");
|
assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!");
|
||||||
|
@ -703,13 +709,13 @@ package struct TaskScheduler {
|
||||||
resumeTask(t.task);
|
resumeTask(t.task);
|
||||||
|
|
||||||
assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?");
|
assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?");
|
||||||
if (m_taskQueue.empty) return false; // handle gracefully in release mode
|
if (m_taskQueue.empty) return ScheduleStatus.idle; // handle gracefully in release mode
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove marker task
|
// remove marker task
|
||||||
m_taskQueue.popFront();
|
m_taskQueue.popFront();
|
||||||
|
|
||||||
return !m_taskQueue.empty;
|
return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resumes execution of a yielded task.
|
/// Resumes execution of a yielded task.
|
||||||
|
@ -740,6 +746,12 @@ package struct TaskScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
package enum ScheduleStatus {
|
||||||
|
idle,
|
||||||
|
allProcessed,
|
||||||
|
busy
|
||||||
|
}
|
||||||
|
|
||||||
private struct TaskFiberQueue {
|
private struct TaskFiberQueue {
|
||||||
@safe nothrow:
|
@safe nothrow:
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue