Thread safety improvements in the task system.

Removes some invalid safety annotations and adds a workaround for a starvation issue in Task.join() across threads boundaries. This is still not thread-safe, but now has a safety-net and is documented, so that it doesn't get lost.
This commit is contained in:
Sönke Ludwig 2016-12-19 20:24:08 +01:00
parent 1b2c0f33d1
commit 8c0660781d
3 changed files with 42 additions and 34 deletions

View file

@ -1212,8 +1212,8 @@ void setConcurrencyPrimitive(ConcurrencyPrimitive primitive)
atomicStore(st_concurrencyPrimitive, primitive); atomicStore(st_concurrencyPrimitive, primitive);
} }
void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tidInfo.ident, args); } void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tid, args); }
void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tidInfo.ident, args); } void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tid, args); }
package class VibedScheduler : Scheduler { package class VibedScheduler : Scheduler {
import core.sync.mutex; import core.sync.mutex;
@ -1239,7 +1239,7 @@ package class VibedScheduler : Scheduler {
} }
} }
override void yield() {} override void yield() {}
override @property ref ThreadInfo thisInfo(){ return Task.getThis().tidInfo; } override @property ref ThreadInfo thisInfo() @trusted { return Task.getThis().tidInfo; }
override TaskCondition newCondition(Mutex m) override TaskCondition newCondition(Mutex m)
{ {
try { try {

View file

@ -735,9 +735,10 @@ void yield()
@safe { @safe {
auto t = Task.getThis(); auto t = Task.getThis();
if (t != Task.init) { if (t != Task.init) {
t.taskFiber.handleInterrupt(); auto tf = () @trusted { return t.taskFiber; } ();
tf.handleInterrupt();
s_scheduler.yield(); s_scheduler.yield();
t.taskFiber.handleInterrupt(); tf.handleInterrupt();
} else { } else {
// Let yielded tasks execute // Let yielded tasks execute
() @safe nothrow { performIdleProcessing(); } (); () @safe nothrow { performIdleProcessing(); } ();
@ -763,9 +764,10 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
if (t == Task.init) { if (t == Task.init) {
runEventLoopOnce(); runEventLoopOnce();
} else { } else {
t.taskFiber.handleInterrupt(on_interrupt); auto tf = () @trusted { return t.taskFiber; } ();
tf.handleInterrupt(on_interrupt);
s_scheduler.hibernate(); s_scheduler.hibernate();
t.taskFiber.handleInterrupt(on_interrupt); tf.handleInterrupt(on_interrupt);
} }
} }

View file

@ -57,33 +57,34 @@ struct Task {
if (!fiber) return Task.init; if (!fiber) return Task.init;
auto tfiber = cast(TaskFiber)fiber; auto tfiber = cast(TaskFiber)fiber;
assert(tfiber !is null, "Invalid or null fiber used to construct Task handle."); assert(tfiber !is null, "Invalid or null fiber used to construct Task handle.");
// FIXME: returning a non-.init handle for a finished task might break some layered logic
return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } (); return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
} }
nothrow { nothrow {
package @property inout(TaskFiber) taskFiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; } package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; }
@property inout(Fiber) fiber() inout @trusted { return this.taskFiber; } @property inout(Fiber) fiber() inout @system { return this.taskFiber; }
@property size_t taskCounter() const @safe { return m_taskCounter; } @property size_t taskCounter() const @safe { return m_taskCounter; }
@property inout(Thread) thread() inout @safe { if (m_fiber) return this.taskFiber.thread; return null; } @property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; }
/** Determines if the task is still running. /** Determines if the task is still running.
*/ */
@property bool running() @property bool running() // FIXME: this is NOT thread safe
const @trusted { const @trusted {
assert(m_fiber !is null, "Invalid task handle"); assert(m_fiber !is null, "Invalid task handle");
try if (this.taskFiber.state == Fiber.State.TERM) return false; catch (Throwable) {} try if (this.taskFiber.state == Fiber.State.TERM) return false; catch (Throwable) {}
return this.taskFiber.m_running && this.taskFiber.m_taskCounter == m_taskCounter; return this.taskFiber.m_running && this.taskFiber.m_taskCounter == m_taskCounter;
} }
// FIXME: this is not thread safe! package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe!
@property ref ThreadInfo tidInfo() @safe { return m_fiber ? taskFiber.tidInfo : s_tidInfo; }
@property Tid tid() @safe { return tidInfo.ident; } @property Tid tid() @trusted { return tidInfo.ident; }
} }
T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; } T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; }
void join() @safe { if (running) taskFiber.join(m_taskCounter); } void join() @trusted { if (running) taskFiber.join(m_taskCounter); } // FIXME: this is NOT thread safe
void interrupt() @safe { if (running) taskFiber.interrupt(m_taskCounter); } void interrupt() @trusted { if (running) taskFiber.interrupt(m_taskCounter); } // FIXME: this is NOT thread safe
string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); } string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); }
@ -416,9 +417,10 @@ final package class TaskFiber : Fiber {
/** Blocks until the task has ended. /** Blocks until the task has ended.
*/ */
void join(size_t task_counter) void join(size_t task_counter)
@safe { @trusted {
auto cnt = m_onExit.emitCount;
while (m_running && m_taskCounter == task_counter) while (m_running && m_taskCounter == task_counter)
m_onExit.wait(); cnt = m_onExit.wait(1.seconds, cnt);
} }
/** Throws an InterruptExeption within the task as soon as it calls an interruptible function. /** Throws an InterruptExeption within the task as soon as it calls an interruptible function.
@ -448,7 +450,7 @@ final package class TaskFiber : Fiber {
package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt) package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt)
@safe nothrow { @safe nothrow {
assert(Task.getThis().fiber is this, "Handling interrupt outside of the corresponding fiber."); assert(() @trusted { return Task.getThis().fiber; } () is this, "Handling interrupt outside of the corresponding fiber.");
if (m_interrupt && on_interrupt) { if (m_interrupt && on_interrupt) {
logTrace("Handling interrupt flag."); logTrace("Handling interrupt flag.");
m_interrupt = false; m_interrupt = false;
@ -520,12 +522,13 @@ package struct TaskScheduler {
{ {
auto t = Task.getThis(); auto t = Task.getThis();
if (t == Task.init) return; // not really a task -> no-op if (t == Task.init) return; // not really a task -> no-op
logTrace("Yielding (interrupt=%s)", t.taskFiber.m_interrupt); auto tf = () @trusted { return t.taskFiber; } ();
t.taskFiber.handleInterrupt(); logTrace("Yielding (interrupt=%s)", tf.m_interrupt);
if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed tf.handleInterrupt();
m_taskQueue.insertBack(t.taskFiber); if (tf.m_queue !is null) return; // already scheduled to be resumed
m_taskQueue.insertBack(tf);
doYield(t); doYield(t);
t.taskFiber.handleInterrupt(); tf.handleInterrupt();
} }
nothrow: nothrow:
@ -627,8 +630,9 @@ package struct TaskScheduler {
{ {
auto t = Task.getThis(); auto t = Task.getThis();
if (t == Task.init) return; // not really a task -> no-op if (t == Task.init) return; // not really a task -> no-op
if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed auto tf = () @trusted { return t.taskFiber; } ();
m_taskQueue.insertBack(t.taskFiber); if (tf.m_queue !is null) return; // already scheduled to be resumed
m_taskQueue.insertBack(tf);
doYield(t); doYield(t);
} }
@ -660,22 +664,24 @@ package struct TaskScheduler {
if (t == thist) return; if (t == thist) return;
auto thisthr = thist ? thist.taskFiber.thread : () @trusted { return Thread.getThis(); } (); auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
if (thist == Task.init) { if (thist == Task.init) {
resumeTask(t); resumeTask(t);
} else { } else {
assert(!thist.taskFiber.m_queue, "Calling task is running, but scheduled to be resumed!?"); auto tf = () @trusted { return t.taskFiber; } ();
if (t.taskFiber.m_queue) { auto thistf = () @trusted { return thist.taskFiber; } ();
assert(!thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
if (tf.m_queue) {
logTrace("Task to switch to is already scheduled. Moving to front of queue."); logTrace("Task to switch to is already scheduled. Moving to front of queue.");
assert(t.taskFiber.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
m_taskQueue.remove(t.taskFiber); m_taskQueue.remove(tf);
assert(!t.taskFiber.m_queue, "Task removed from queue, but still has one set!?"); assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
} }
logTrace("Switching tasks"); logTrace("Switching tasks");
m_taskQueue.insertFront(thist.taskFiber); m_taskQueue.insertFront(thistf);
m_taskQueue.insertFront(t.taskFiber); m_taskQueue.insertFront(tf);
doYield(thist); doYield(thist);
} }
} }