diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index e5e19c3..d4cf16f 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1212,8 +1212,8 @@ void setConcurrencyPrimitive(ConcurrencyPrimitive primitive) atomicStore(st_concurrencyPrimitive, primitive); } -void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tidInfo.ident, args); } -void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tidInfo.ident, args); } +void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tid, args); } +void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tid, args); } package class VibedScheduler : Scheduler { import core.sync.mutex; @@ -1239,7 +1239,7 @@ package class VibedScheduler : Scheduler { } } override void yield() {} - override @property ref ThreadInfo thisInfo(){ return Task.getThis().tidInfo; } + override @property ref ThreadInfo thisInfo() @trusted { return Task.getThis().tidInfo; } override TaskCondition newCondition(Mutex m) { try { diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index d97248d..45b907a 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -735,9 +735,10 @@ void yield() @safe { auto t = Task.getThis(); if (t != Task.init) { - t.taskFiber.handleInterrupt(); + auto tf = () @trusted { return t.taskFiber; } (); + tf.handleInterrupt(); s_scheduler.yield(); - t.taskFiber.handleInterrupt(); + tf.handleInterrupt(); } else { // Let yielded tasks execute () @safe nothrow { performIdleProcessing(); } (); @@ -763,9 +764,10 @@ void hibernate(scope void delegate() @safe nothrow on_interrupt = null) if (t == Task.init) { runEventLoopOnce(); } else { - t.taskFiber.handleInterrupt(on_interrupt); + auto tf = () @trusted { return t.taskFiber; } (); + tf.handleInterrupt(on_interrupt); s_scheduler.hibernate(); - t.taskFiber.handleInterrupt(on_interrupt); + tf.handleInterrupt(on_interrupt); } } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index b27c274..76f50ac 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -57,33 +57,34 @@ struct Task { if (!fiber) return Task.init; auto tfiber = cast(TaskFiber)fiber; assert(tfiber !is null, "Invalid or null fiber used to construct Task handle."); + // FIXME: returning a non-.init handle for a finished task might break some layered logic return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } (); } nothrow { - package @property inout(TaskFiber) taskFiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; } - @property inout(Fiber) fiber() inout @trusted { return this.taskFiber; } + package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; } + @property inout(Fiber) fiber() inout @system { return this.taskFiber; } @property size_t taskCounter() const @safe { return m_taskCounter; } - @property inout(Thread) thread() inout @safe { if (m_fiber) return this.taskFiber.thread; return null; } + @property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; } /** Determines if the task is still running. */ - @property bool running() + @property bool running() // FIXME: this is NOT thread safe const @trusted { assert(m_fiber !is null, "Invalid task handle"); try if (this.taskFiber.state == Fiber.State.TERM) return false; catch (Throwable) {} return this.taskFiber.m_running && this.taskFiber.m_taskCounter == m_taskCounter; } - // FIXME: this is not thread safe! - @property ref ThreadInfo tidInfo() @safe { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } - @property Tid tid() @safe { return tidInfo.ident; } + package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe! + + @property Tid tid() @trusted { return tidInfo.ident; } } T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; } - void join() @safe { if (running) taskFiber.join(m_taskCounter); } - void interrupt() @safe { if (running) taskFiber.interrupt(m_taskCounter); } + void join() @trusted { if (running) taskFiber.join(m_taskCounter); } // FIXME: this is NOT thread safe + void interrupt() @trusted { if (running) taskFiber.interrupt(m_taskCounter); } // FIXME: this is NOT thread safe string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); } @@ -416,9 +417,10 @@ final package class TaskFiber : Fiber { /** Blocks until the task has ended. */ void join(size_t task_counter) - @safe { + @trusted { + auto cnt = m_onExit.emitCount; while (m_running && m_taskCounter == task_counter) - m_onExit.wait(); + cnt = m_onExit.wait(1.seconds, cnt); } /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. @@ -448,7 +450,7 @@ final package class TaskFiber : Fiber { package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt) @safe nothrow { - assert(Task.getThis().fiber is this, "Handling interrupt outside of the corresponding fiber."); + assert(() @trusted { return Task.getThis().fiber; } () is this, "Handling interrupt outside of the corresponding fiber."); if (m_interrupt && on_interrupt) { logTrace("Handling interrupt flag."); m_interrupt = false; @@ -520,12 +522,13 @@ package struct TaskScheduler { { auto t = Task.getThis(); if (t == Task.init) return; // not really a task -> no-op - logTrace("Yielding (interrupt=%s)", t.taskFiber.m_interrupt); - t.taskFiber.handleInterrupt(); - if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(t.taskFiber); + auto tf = () @trusted { return t.taskFiber; } (); + logTrace("Yielding (interrupt=%s)", tf.m_interrupt); + tf.handleInterrupt(); + if (tf.m_queue !is null) return; // already scheduled to be resumed + m_taskQueue.insertBack(tf); doYield(t); - t.taskFiber.handleInterrupt(); + tf.handleInterrupt(); } nothrow: @@ -627,8 +630,9 @@ package struct TaskScheduler { { auto t = Task.getThis(); if (t == Task.init) return; // not really a task -> no-op - if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed - m_taskQueue.insertBack(t.taskFiber); + auto tf = () @trusted { return t.taskFiber; } (); + if (tf.m_queue !is null) return; // already scheduled to be resumed + m_taskQueue.insertBack(tf); doYield(t); } @@ -660,22 +664,24 @@ package struct TaskScheduler { if (t == thist) return; - auto thisthr = thist ? thist.taskFiber.thread : () @trusted { return Thread.getThis(); } (); + auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } (); assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); if (thist == Task.init) { resumeTask(t); } else { - assert(!thist.taskFiber.m_queue, "Calling task is running, but scheduled to be resumed!?"); - if (t.taskFiber.m_queue) { + auto tf = () @trusted { return t.taskFiber; } (); + auto thistf = () @trusted { return thist.taskFiber; } (); + assert(!thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); + if (tf.m_queue) { logTrace("Task to switch to is already scheduled. Moving to front of queue."); - assert(t.taskFiber.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); - m_taskQueue.remove(t.taskFiber); - assert(!t.taskFiber.m_queue, "Task removed from queue, but still has one set!?"); + assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); + m_taskQueue.remove(tf); + assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); } logTrace("Switching tasks"); - m_taskQueue.insertFront(thist.taskFiber); - m_taskQueue.insertFront(t.taskFiber); + m_taskQueue.insertFront(thistf); + m_taskQueue.insertFront(tf); doYield(thist); } }