Fix shutdown behavior.

- Detection of the main thread contained a race condition
- The exit flag monitor task that is started for each event loop wasn't shut down, so that many could be running if runEventLoop was called multiple times.
This commit is contained in:
Sönke Ludwig 2017-01-13 22:11:59 +01:00
parent 7803fdaa81
commit e28c6950d7

View file

@ -203,11 +203,13 @@ int runEventLoop()
performIdleProcessing(); performIdleProcessing();
if (getExitFlag()) return 0; if (getExitFlag()) return 0;
Task exit_task;
// handle exit flag in the main thread to exit when // handle exit flag in the main thread to exit when
// exitEventLoop(true) is called from a thread) // exitEventLoop(true) is called from a thread)
() @trusted nothrow { () @trusted nothrow {
if (Thread.getThis() is st_threads[0].thread) if (s_isMainThread)
runTask(toDelegate(&watchExitFlag)); exit_task = runTask(toDelegate(&watchExitFlag));
} (); } ();
while (true) { while (true) {
@ -219,6 +221,11 @@ int runEventLoop()
performIdleProcessing(); performIdleProcessing();
} }
// make sure the exit flag watch task finishes together with this loop
// TODO: would be niced to do this without exceptions
if (exit_task && exit_task.running)
exit_task.interrupt();
logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).", logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).",
s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop); s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop);
eventDriver.core.clearExitFlag(); eventDriver.core.clearExitFlag();
@ -527,7 +534,7 @@ unittest {
unittest { // run and join local task from outside of a task unittest { // run and join local task from outside of a task
int i = 0; int i = 0;
auto t = runTask({ sleep(5.msecs); i = 1; }); auto t = runTask({ sleep(1.msecs); i = 1; });
t.join(); t.join();
assert(i == 1); assert(i == 1);
} }
@ -1177,6 +1184,7 @@ private {
__gshared Condition st_threadShutdownCondition; __gshared Condition st_threadShutdownCondition;
shared bool st_term = false; shared bool st_term = false;
bool s_isMainThread = false; // set in shared static this
bool s_exitEventLoop = false; bool s_exitEventLoop = false;
package bool s_eventLoopRunning = false; package bool s_eventLoopRunning = false;
bool delegate() @safe nothrow s_idleHandler; bool delegate() @safe nothrow s_idleHandler;
@ -1260,6 +1268,8 @@ shared static this()
} }
} }
s_isMainThread = true;
// COMPILER BUG: Must be some kind of module constructor order issue: // COMPILER BUG: Must be some kind of module constructor order issue:
// without this, the stdout/stderr handles are not initialized before // without this, the stdout/stderr handles are not initialized before
// the log module is set up. // the log module is set up.
@ -1329,13 +1339,12 @@ static ~this()
{ {
auto thisthr = Thread.getThis(); auto thisthr = Thread.getThis();
bool is_main_thread = false; bool is_main_thread = s_isMainThread;
synchronized (st_threadsMutex) { synchronized (st_threadsMutex) {
auto idx = st_threads.countUntil!(c => c.thread is thisthr); auto idx = st_threads.countUntil!(c => c.thread is thisthr);
// if we are the main thread, wait for all others before terminating logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread);
is_main_thread = idx == 0;
if (is_main_thread) { // we are the main thread, wait for others if (is_main_thread) { // we are the main thread, wait for others
atomicStore(st_term, true); atomicStore(st_term, true);
st_threadsSignal.emit(); st_threadsSignal.emit();
@ -1398,7 +1407,7 @@ nothrow {
} }
private void handleWorkerTasks() private void handleWorkerTasks()
{ nothrow {
logDebug("worker thread enter"); logDebug("worker thread enter");
auto thisthr = Thread.getThis(); auto thisthr = Thread.getThis();
@ -1408,7 +1417,7 @@ private void handleWorkerTasks()
auto emit_count = st_threadsSignal.emitCount; auto emit_count = st_threadsSignal.emitCount;
TaskFuncInfo task; TaskFuncInfo task;
synchronized (st_threadsMutex) { bool processTask() nothrow {
auto idx = st_threads.countUntil!(c => c.thread is thisthr); auto idx = st_threads.countUntil!(c => c.thread is thisthr);
assert(idx >= 0, "Worker thread not in st_threads array!?"); assert(idx >= 0, "Worker thread not in st_threads array!?");
logDebug("worker thread check"); logDebug("worker thread check");
@ -1418,7 +1427,7 @@ private void handleWorkerTasks()
logWarn("Worker thread shuts down with specific worker tasks left in its queue."); logWarn("Worker thread shuts down with specific worker tasks left in its queue.");
if (st_threads.count!(c => c.isWorker) == 1 && st_workerTasks.length > 0) if (st_threads.count!(c => c.isWorker) == 1 && st_workerTasks.length > 0)
logWarn("Worker threads shut down with worker tasks still left in the queue."); logWarn("Worker threads shut down with worker tasks still left in the queue.");
break; return false;
} }
if (!st_workerTasks.empty) { if (!st_workerTasks.empty) {
@ -1430,10 +1439,18 @@ private void handleWorkerTasks()
task = st_threads[idx].taskQueue.front; task = st_threads[idx].taskQueue.front;
st_threads[idx].taskQueue.popFront(); st_threads[idx].taskQueue.popFront();
} }
return true;
}
{
scope (failure) assert(false);
synchronized (st_threadsMutex)
if (!processTask())
break;
} }
if (task.func !is null) runTask_internal(task); if (task.func !is null) runTask_internal(task);
else emit_count = st_threadsSignal.wait(emit_count); else emit_count = st_threadsSignal.waitUninterruptible(emit_count);
} }
logDebug("worker thread exit"); logDebug("worker thread exit");
@ -1448,7 +1465,8 @@ private void watchExitFlag()
if (getExitFlag()) break; if (getExitFlag()) break;
} }
emit_count = st_threadsSignal.wait(emit_count); try emit_count = st_threadsSignal.wait(emit_count);
catch (InterruptException e) return;
} }
logDebug("main thread exit"); logDebug("main thread exit");