Merge pull request #60 from vibe-d/issue58_task_already_scheduled

Fix unscheduling tasks when resuming from outside of a task.
This commit is contained in:
Sönke Ludwig 2018-02-24 11:08:42 +01:00 committed by GitHub
commit 98280be27b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 20 deletions

View file

@ -78,11 +78,20 @@ install:
} }
$env:toolchain = "msvc"; $env:toolchain = "msvc";
$version = $env:DVersion; $version = $env:DVersion;
Invoke-WebRequest "https://github.com/ldc-developers/ldc/releases/download/v$($version)/ldc2-$($version)-win64-msvc.zip" -OutFile "c:\ldc.zip"; if ([System.Version]$version -lt [System.Version]"1.7.0") {
echo "finished."; Invoke-WebRequest "https://github.com/ldc-developers/ldc/releases/download/v$($version)/ldc2-$($version)-win64-msvc.zip" -OutFile "c:\ldc.zip";
pushd c:\\; echo "finished.";
7z x ldc.zip > $null; pushd c:\\;
popd; 7z x ldc.zip > $null;
popd;
}
else {
Invoke-WebRequest "https://github.com/ldc-developers/ldc/releases/download/v$($version)/ldc2-$($version)-windows-multilib.7z" -OutFile "c:\ldc.7z";
echo "finished.";
pushd c:\\;
7z x ldc.7z > $null;
popd;
}
} }
} }
- ps: SetUpDCompiler - ps: SetUpDCompiler
@ -109,7 +118,11 @@ before_build:
} }
elseif($env:DC -eq "ldc"){ elseif($env:DC -eq "ldc"){
$version = $env:DVersion; $version = $env:DVersion;
$env:PATH += ";C:\ldc2-$($version)-win64-msvc\bin"; if ([System.Version]$version -lt [System.Version]"1.7.0") {
$env:PATH += ";C:\ldc2-$($version)-win64-msvc\bin";
} else {
$env:PATH += ";C:\ldc2-$($version)-windows-multilib\bin";
}
$env:DC = "ldc2"; $env:DC = "ldc2";
} }
- ps: $env:compilersetup = "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall"; - ps: $env:compilersetup = "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall";

View file

@ -254,6 +254,7 @@ void exitEventLoop(bool shutdown_all_threads = false)
assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running."); assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running.");
if (shutdown_all_threads) { if (shutdown_all_threads) {
() @trusted nothrow { () @trusted nothrow {
shutdownWorkerPool();
atomicStore(st_term, true); atomicStore(st_term, true);
st_threadsSignal.emit(); st_threadsSignal.emit();
} (); } ();
@ -1349,13 +1350,8 @@ static ~this()
} }
if (is_main_thread) { if (is_main_thread) {
shared(TaskPool) tpool;
synchronized (st_threadsMutex) swap(tpool, st_workerPool);
if (tpool) {
logDiagnostic("Main thread still waiting for worker threads.");
tpool.terminate();
}
logDiagnostic("Main thread exiting"); logDiagnostic("Main thread exiting");
shutdownWorkerPool();
} }
synchronized (st_threadsMutex) { synchronized (st_threadsMutex) {
@ -1373,6 +1369,19 @@ static ~this()
st_threadShutdownCondition.notifyAll(); st_threadShutdownCondition.notifyAll();
} }
private void shutdownWorkerPool()
nothrow {
shared(TaskPool) tpool;
try synchronized (st_threadsMutex) swap(tpool, st_workerPool);
catch (Exception e) assert(false, e.msg);
if (tpool) {
logDiagnostic("Still waiting for worker threads to exit.");
tpool.terminate();
}
}
private void shutdownDriver() private void shutdownDriver()
{ {
if (ManualEvent.ms_threadEvent != EventID.init) { if (ManualEvent.ms_threadEvent != EventID.init) {

View file

@ -414,6 +414,8 @@ final package class TaskFiber : Fiber {
} }
} }
assert(!m_queue, "Fiber done but still scheduled to be resumed!?");
// make the fiber available for the next task // make the fiber available for the next task
recycleFiber(this); recycleFiber(this);
} }
@ -456,7 +458,7 @@ final package class TaskFiber : Fiber {
try call(Fiber.Rethrow.no); try call(Fiber.Rethrow.no);
catch (Exception e) assert(false, e.msg); catch (Exception e) assert(false, e.msg);
} (); } ();
} }
/** Blocks until the task has ended. /** Blocks until the task has ended.
*/ */
@ -779,21 +781,23 @@ package struct TaskScheduler {
auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } (); auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
auto tf = () @trusted { return t.taskFiber; } ();
if (tf.m_queue) {
debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
m_taskQueue.remove(tf);
assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
}
if (thist == Task.init && defer == No.defer) { if (thist == Task.init && defer == No.defer) {
assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
debug (VibeTaskLog) logTrace("switch to task from global context"); debug (VibeTaskLog) logTrace("switch to task from global context");
resumeTask(t); resumeTask(t);
debug (VibeTaskLog) logTrace("task yielded control back to global context"); debug (VibeTaskLog) logTrace("task yielded control back to global context");
} else { } else {
auto tf = () @trusted { return t.taskFiber; } ();
auto thistf = () @trusted { return thist.taskFiber; } (); auto thistf = () @trusted { return thist.taskFiber; } ();
assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
if (tf.m_queue) {
debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
m_taskQueue.remove(tf);
assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
}
debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
if (defer) { if (defer) {
@ -878,6 +882,7 @@ package struct TaskScheduler {
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } (); debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } ();
() @trusted { Fiber.yield(); } (); () @trusted { Fiber.yield(); } ();
debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } (); debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } ();
assert(!task.m_fiber.m_queue, "Task is still scheduled after resumption.");
} }
} }

View file

@ -0,0 +1,47 @@
/+ dub.sdl:
name "tests"
dependency "vibe-core" path=".."
+/
module tests;
import vibe.core.sync;
import vibe.core.core;
import std.datetime;
import core.atomic;
shared ManualEvent ev;
shared size_t counter;
enum ntasks = 500;
shared static this()
{
ev = createSharedManualEvent();
}
void main()
{
setTaskStackSize(64*1024);
runTask({
foreach (x; 0 .. ntasks)
runWorkerTask(&worker);
});
setTimer(dur!"msecs"(10), { ev.emit(); });
setTimer(dur!"seconds"(60), { assert(false, "Timers didn't fire within the time limit"); });
runApplication();
assert(atomicLoad(counter) == ntasks, "Event loop exited prematurely.");
}
void worker()
{
ev.wait();
ev.emit();
setTimer(dur!"seconds"(1), {
auto c = atomicOp!"+="(counter, 1);
if (c == ntasks) exitEventLoop(true);
});
}