diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 534ce4a..da7e203 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -102,12 +102,12 @@ int runEventLoop() runTask(toDelegate(&watchExitFlag)); } (); - while ((s_scheduler.scheduledTaskCount || eventDriver.waiterCount) && !s_exitEventLoop) { - logTrace("process events"); - if (eventDriver.processEvents() == ExitReason.exited) { + while (true) { + auto er = s_scheduler.waitAndProcess(); + if (er != ExitReason.idle || s_exitEventLoop) { + logDebug("Event loop exit reason (exit flag=%s): %s", s_exitEventLoop, er); break; } - logTrace("idle processing"); performIdleProcessing(); } @@ -156,18 +156,18 @@ void exitEventLoop(bool shutdown_all_threads = false) */ bool processEvents() @safe nothrow { - if (!eventDriver.processEvents(0.seconds)) return false; - performIdleProcessing(); - return true; + return !s_scheduler.process().among(ExitReason.exited, ExitReason.outOfWaiters); } /** Wait once for events and process them. */ -void runEventLoopOnce() +ExitReason runEventLoopOnce() @safe nothrow { - eventDriver.processEvents(Duration.max); - performIdleProcessing(); + auto ret = s_scheduler.waitAndProcess(); + if (ret == ExitReason.idle) + performIdleProcessing(); + return ret; } /** @@ -560,7 +560,7 @@ public void setupWorkerThreads(uint num = logicalProcessorCount()) foreach (i; 0 .. num) { auto thr = new Thread(&workerThreadFunc); - thr.name = format("Vibe Task Worker #%s", i); + thr.name = format("vibe-%s", i); st_threads ~= ThreadContext(thr, true); thr.start(); } @@ -747,13 +747,11 @@ unittest { */ Timer createTimer(void delegate() nothrow @safe callback) @safe nothrow { - void cb(TimerID tm) - nothrow @safe { - if (callback !is null) - callback(); - } auto ret = Timer(eventDriver.createTimer()); - eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure! + if (callback !is null) { + void cb(TimerID tm) nothrow @safe { callback(); } + eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure! + } return ret; } @@ -945,6 +943,7 @@ struct Timer { private this(TimerID id) nothrow { + assert(id != TimerID.init, "Invalid timer ID."); m_driver = eventDriver; m_id = id; } @@ -1022,7 +1021,7 @@ package(vibe) void performIdleProcessing() if (again) { auto er = eventDriver.processEvents(0.seconds); - if (er.among!(ExitReason.exited, ExitReason.idle)) { + if (er.among!(ExitReason.exited, ExitReason.outOfWaiters)) { logDebug("Setting exit flag due to driver signalling exit"); s_exitEventLoop = true; return; @@ -1158,7 +1157,7 @@ shared static this() st_threadShutdownCondition = new Condition(st_threadsMutex); auto thisthr = Thread.getThis(); - thisthr.name = "Main"; + thisthr.name = "main"; assert(st_threads.length == 0, "Main thread not the first thread!?"); st_threads ~= ThreadContext(thisthr, false); diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index cd6a82a..be56446 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -7,6 +7,7 @@ */ module vibe.core.sync; +import vibe.core.log : logDebugV, logTrace, logInfo; import vibe.core.task; import core.atomic; @@ -314,7 +315,6 @@ unittest { // test deferred throwing }); auto t2 = runTask({ - scope (failure) assert(false, "Only InterruptException supposed to be thrown!"); mutex.lock(); scope (exit) mutex.unlock(); try { @@ -322,6 +322,8 @@ unittest { // test deferred throwing assert(false, "Yield is supposed to have thrown an InterruptException."); } catch (InterruptException) { // as expected! + } catch (Exception) { + assert(false, "Only InterruptException supposed to be thrown!"); } }); @@ -567,6 +569,7 @@ class TaskCondition : core.sync.condition.Condition { */ unittest { import vibe.core.core; + import vibe.core.log; __gshared Mutex mutex; __gshared TaskCondition condition; @@ -576,6 +579,8 @@ unittest { mutex = new Mutex; condition = new TaskCondition(mutex); + logDebug("SETTING UP TASKS"); + // start up the workers and count how many are running foreach (i; 0 .. 4) { workers_still_running++; @@ -584,16 +589,23 @@ unittest { sleep(100.msecs); // notify the waiter that we're finished - synchronized (mutex) + synchronized (mutex) { workers_still_running--; + logDebug("DECREMENT %s", workers_still_running); + } + logDebug("NOTIFY"); condition.notify(); }); } + logDebug("STARTING WAIT LOOP"); + // wait until all tasks have decremented the counter back to zero synchronized (mutex) { - while (workers_still_running > 0) + while (workers_still_running > 0) { + logDebug("STILL running %s", workers_still_running); condition.wait(); + } } } @@ -649,9 +661,9 @@ struct ManualEvent { private { static struct ThreadWaiter { ThreadWaiter* next; - /*immutable*/ EventID event; - /*immutable*/ EventDriver driver; - //immutable Thread thread; + EventID event; + EventDriver driver; + Thread thread; StackSList!LocalWaiter tasks; } static struct LocalWaiter { @@ -660,24 +672,14 @@ struct ManualEvent { void delegate() @safe nothrow notifier; bool cancelled = false; - void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; } - void cancel() @safe nothrow { cancelled = true; auto n = notifier; notifier = null; n(); } - - void wait(void delegate() @safe nothrow del) - shared @safe nothrow { + void wait(void delegate() @safe nothrow del) @safe nothrow { + assert(notifier is null, "Local waiter is used twice!"); notifier = del; - if (!next) eventDriver.waitForEvent(ms_threadEvent, &onEvent); - } - - private void onEvent(EventID event) - @safe nothrow { - assert(event == ms_threadEvent); - notifier(); } + void cancel() @safe nothrow { cancelled = true; notifier = null; } } int m_emitCount; ThreadWaiter* m_waiters; - } // thread destructor in vibe.core.core will decrement the ref. count @@ -705,6 +707,8 @@ struct ManualEvent { shared nothrow { import core.atomic : atomicOp, cas; + logTrace("emit shared"); + auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); @@ -712,27 +716,35 @@ struct ManualEvent { case EmitMode.all: // FIXME: would be nice to have atomicSwap instead auto w = cast(ThreadWaiter*)atomicLoad(m_waiters); - if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) + if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) { + logTrace("Another thread emitted concurrently - returning."); return ec; + } while (w !is null) { + // Note: emitForThisThread can result in w getting deallocated at any + // time, so we need to copy any fields first + auto wnext = w.next; + atomicStore((cast(shared)w).next, null); + assert(wnext !is w, "Same waiter enqueued twice!?"); if (w.driver is eventDriver) { - // Note: emitForThisThread can result in w getting deallocated at any - // time, so we need to copy any fields first - auto tasks = w.tasks; - w = w.next; + logTrace("Same thread emit (%s/%s)", cast(void*)w, cast(void*)w.tasks.first); emitForThisThread(w.tasks.m_first, mode); } else { + logTrace("Foreign thread \"%s\" notify: %s", w.thread.name, w.event); auto drv = w.driver; auto evt = w.event; - w = w.next; if (evt != EventID.init) - drv.triggerEvent(evt, true); + (cast(shared)drv).triggerEvent(evt, true); } + w = wnext; } break; case EmitMode.single: assert(false); } + + logTrace("emit shared done"); + return ec; } /// ditto @@ -740,6 +752,8 @@ struct ManualEvent { nothrow { auto ec = m_emitCount++; + logTrace("unshared emit"); + final switch (mode) { case EmitMode.all: auto w = m_waiters; @@ -758,6 +772,9 @@ struct ManualEvent { /** Acquires ownership and waits until the signal is emitted. + Note that in order not to miss any emits it is necessary to use the + overload taking an integer. + Throws: May throw an $(D InterruptException) if the task gets interrupted using $(D Task.interrupt()). @@ -766,79 +783,20 @@ struct ManualEvent { /// ditto int wait() shared { return wait(this.emitCount); } - /** Acquires ownership and waits until the emit count differs from the given one. + /** Acquires ownership and waits until the emit count differs from the + given one or until a timeout is reached. Throws: May throw an $(D InterruptException) if the task gets interrupted using $(D Task.interrupt()). */ - int wait(int emit_count) { return wait(Duration.max, emit_count); } + int wait(int emit_count) { return doWait!true(Duration.max, emit_count); } /// ditto - int wait(int emit_count) shared { return wait(Duration.max, emit_count); } - - /** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reaced. - - Throws: - May throw an $(D InterruptException) if the task gets interrupted - using $(D Task.interrupt()). - */ - int wait(Duration timeout, int emit_count) - { - ThreadWaiter w; - LocalWaiter tw; - - int ec = this.emitCount; - while (ec <= emit_count) { - // wait for getting resumed directly by emit/emitForThisThread - acquireWaiter(w, tw); - asyncAwait!(void delegate() @safe nothrow, - cb => tw.wait(cb), - cb => tw.cancel() - )(timeout); - ec = this.emitCount; - } - return ec; - } + int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); } /// ditto - int wait(Duration timeout, int emit_count) - shared { - shared(ThreadWaiter) w; - LocalWaiter tw; - - int ec = this.emitCount; - while (ec <= emit_count) { - acquireWaiter(w, tw); - - if (tw.next) { - // if we are not the first waiter for this thread, - // wait for getting resumed by emitForThisThread - asyncAwait!(void delegate() @safe nothrow, - cb => tw.wait(cb), - cb => tw.cancel() - )(timeout); - ec = this.emitCount; - } else { - // if we are the first waiter for this thread, - // wait for the thread event to get emitted - Waitable!( - cb => eventDriver.waitForEvent(ms_threadEvent, cb), - cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb), - EventID - ) eventwaiter; - Waitable!( - cb => tw.wait(cb), - cb => tw.cancel() - ) localwaiter; - asyncAwaitAny!true(timeout, eventwaiter, localwaiter); - - ec = this.emitCount; - - if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode - else if (localwaiter.cancelled) break; // timeout - } - } - return ec; - } + int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); } + /// ditto + int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); } /** Same as $(D wait), but defers throwing any $(D InterruptException). @@ -849,45 +807,51 @@ struct ManualEvent { /// int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } /// ditto - int waitUninterruptible(int emit_count) nothrow { return waitUninterruptible(Duration.max, emit_count); } + int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); } /// ditto - int waitUninterruptible(int emit_count) shared nothrow { return waitUninterruptible(Duration.max, emit_count); } + int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); } /// ditto - int waitUninterruptible(Duration timeout, int emit_count) - nothrow { - ThreadWaiter w; - LocalWaiter tw; - acquireWaiter(w, tw); + int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); } + /// ditto + int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); } + private int doWait(bool interruptible)(Duration timeout, int emit_count) + { int ec = this.emitCount; while (ec <= emit_count) { - asyncAwaitUninterruptible!(void delegate(), + ThreadWaiter w; + LocalWaiter tw; + acquireWaiter(w, tw); + + Waitable!( cb => tw.wait(cb), cb => tw.cancel() - )(timeout); + ) waitable; + asyncAwaitAny!interruptible(timeout, waitable); ec = this.emitCount; } return ec; } - /// ditto - int waitUninterruptible(Duration timeout, int emit_count) - shared nothrow { - shared(ThreadWaiter) w; - LocalWaiter tw; + private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) + shared { int ec = this.emitCount; while (ec <= emit_count) { + shared(ThreadWaiter) w; + LocalWaiter tw; acquireWaiter(w, tw); + logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); if (tw.next) { // if we are not the first waiter for this thread, // wait for getting resumed by emitForThisThread - asyncAwaitUninterruptible!(void delegate() @safe nothrow, + Waitable!( cb => tw.wait(cb), cb => tw.cancel() - )(timeout); - ec = this.emitCount; + ) waitable; + asyncAwaitAny!interruptible(timeout, waitable); } else { + again: // if we are the first waiter for this thread, // wait for the thread event to get emitted Waitable!( @@ -899,13 +863,20 @@ struct ManualEvent { cb => tw.wait(cb), cb => tw.cancel() ) localwaiter; - asyncAwaitAny!false(timeout, eventwaiter, localwaiter); + logDebugV("Wait on event %s", ms_threadEvent); + asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter); - ec = this.emitCount; - - if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode - else if (localwaiter.cancelled) break; // timeout + if (!eventwaiter.cancelled) { + if (atomicLoad(w.next) == null) + emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode + else goto again; + } else if (localwaiter.cancelled) break; // timeout } + + assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w, + "Waiter did not get removed from waiter queue."); + + ec = this.emitCount; } return ec; } @@ -914,12 +885,19 @@ struct ManualEvent { nothrow { if (!waiters) return false; + logTrace("emitForThisThread"); + final switch (mode) { case EmitMode.all: while (waiters) { - if (waiters.notifier !is null) + auto wnext = waiters.next; + assert(wnext !is waiters); + if (waiters.notifier !is null) { + logTrace("notify task %s %s %s", cast(void*)waiters, cast(void*)waiters.notifier.funcptr, waiters.notifier.ptr); waiters.notifier(); - waiters = waiters.next; + waiters.notifier = null; + } else logTrace("notify callback is null"); + waiters = wnext; } break; case EmitMode.single: @@ -950,20 +928,27 @@ struct ManualEvent { auto sdriver = cast(shared)eventDriver; - if (m_waiters) { - shared(ThreadWaiter)* pw = m_waiters; - while (pw !is null) { - if (pw.driver is sdriver) { - (cast(ThreadWaiter*)pw).tasks.add(&tw); - break; - } - pw = atomicLoad(pw.next); - } - } else { - m_waiters = &w; - w.event = ms_threadEvent; - w.driver = sdriver; + shared(ThreadWaiter)* pw = atomicLoad(m_waiters); + assert(pw !is &w, "Waiter is already registered!"); + while (pw !is null) { + if (pw.driver is sdriver) + break; + pw = atomicLoad(pw.next); } + + if (!pw) { + pw = &w; + shared(ThreadWaiter)* wn; + do { + wn = atomicLoad(m_waiters); + w.next = wn; + w.event = ms_threadEvent; + w.driver = sdriver; + w.thread = cast(shared)Thread.getThis(); + } while (!cas(&m_waiters, wn, &w)); + } + + (cast(ThreadWaiter*)pw).tasks.add(&tw); } } @@ -1023,7 +1008,6 @@ private struct StackSList(T) } private struct TaskMutexImpl(bool INTERRUPTIBLE) { - import std.stdio; private { shared(bool) m_locked = false; shared(uint) m_waiters = 0; @@ -1041,7 +1025,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { { if (cas(&m_locked, false, true)) { debug m_owner = Task.getThis(); - version(MutexPrint) writefln("mutex %s lock %s", cast(void*)this, atomicLoad(m_waiters)); + debug(VibeMutexPrint) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters)); return true; } return false; @@ -1052,7 +1036,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { if (tryLock()) return; debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); atomicOp!"+="(m_waiters, 1); - version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); + debug(VibeMutexPrint) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); scope(exit) atomicOp!"-="(m_waiters, 1); auto ecnt = m_signal.emitCount(); while (!tryLock()) { @@ -1069,7 +1053,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { m_owner = Task(); } atomicStore!(MemoryOrder.rel)(m_locked, false); - version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); + debug(VibeMutexPrint) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); if (atomicLoad(m_waiters) > 0) m_signal.emit(); } @@ -1113,7 +1097,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { { if (tryLock()) return; atomicOp!"+="(m_waiters, 1); - version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); + debug(VibeMutexPrint) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); scope(exit) atomicOp!"-="(m_waiters, 1); auto ecnt = m_signal.emitCount(); while (!tryLock()) { @@ -1133,7 +1117,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { m_owner = Task.init; } }); - version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); + debug(VibeMutexPrint) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); if (atomicLoad(m_waiters) > 0) m_signal.emit(); } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 823dd64..a7e306c 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -79,11 +79,29 @@ struct Task { T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; } - void join() { if (running) taskFiber.join(); } - void interrupt() { if (running) taskFiber.interrupt(); } + void join() { if (running) taskFiber.join(m_taskCounter); } + void interrupt() { if (running) taskFiber.interrupt(m_taskCounter); } string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); } + void getDebugID(R)(ref R dst) + { + import std.digest.md : MD5; + import std.bitmanip : nativeToLittleEndian; + import std.base64 : Base64; + + if (!m_fiber) { + dst.put("----"); + return; + } + + MD5 md; + md.start(); + md.put(nativeToLittleEndian(cast(size_t)cast(void*)m_fiber)); + md.put(nativeToLittleEndian(cast(size_t)cast(void*)m_taskCounter)); + Base64.encode(md.finish()[0 .. 3], dst); + } + bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } } @@ -261,7 +279,7 @@ final package class TaskFiber : Fiber { shared size_t m_taskCounter; shared bool m_running; - Task[] m_joiners; + shared(ManualEvent) m_onExit; // task local storage BitArray m_flsInit; @@ -352,12 +370,8 @@ final package class TaskFiber : Fiber { this.tidInfo.ident = Tid.init; // clear message box - foreach (t; m_joiners) { - logTrace("Resuming joining task."); - taskScheduler.switchTo(t); - } - m_joiners.length = 0; - m_joiners.assumeSafeAppend(); + logTrace("Notifying joining tasks."); + m_onExit.emit(); // make sure that the task does not get left behind in the yielder queue if terminated during yield() if (m_queue) m_queue.remove(this); @@ -395,28 +409,21 @@ final package class TaskFiber : Fiber { /** Blocks until the task has ended. */ - void join() + void join(size_t task_counter) { - import vibe.core.core : hibernate, yield; - - auto caller = Task.getThis(); - if (!m_running) return; - if (caller != Task.init) { - assert(caller.fiber !is this, "A task cannot join itself."); - assert(caller.thread is this.thread, "Joining tasks in foreign threads is currently not supported."); - m_joiners ~= caller; - } else assert(Thread.getThis() is this.thread, "Joining tasks in different threads is not yet supported."); - auto run_count = m_taskCounter; - if (caller == Task.init) vibe.core.core.yield(); // let the task continue (it must be yielded currently) - while (m_running && run_count == m_taskCounter) hibernate(); + while (m_running && m_taskCounter == task_counter) + m_onExit.wait(); } /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. */ - void interrupt() + void interrupt(size_t task_counter) { import vibe.core.core : taskScheduler; + if (m_taskCounter != task_counter) + return; + auto caller = Task.getThis(); if (caller != Task.init) { assert(caller != this.task, "A task cannot interrupt itself."); @@ -481,6 +488,9 @@ package struct TaskFuncInfo { } package struct TaskScheduler { + import eventcore.driver : ExitReason; + import eventcore.core : eventDriver; + private { TaskFiberQueue m_taskQueue; TaskFiber m_markerTask; @@ -512,6 +522,99 @@ package struct TaskScheduler { nothrow: + /** Performs a single round of scheduling without blocking. + + This will execute scheduled tasks and process events from the + event queue, as long as possible without having to wait. + + Returns: + A reason is returned: + $(UL + $(LI `ExitReason.exit`: The event loop was exited due to a manual request) + $(LI `ExitReason.outOfWaiters`: There are no more scheduled + tasks or events, so the application would do nothing from + now on) + $(LI `ExitReason.idle`: All scheduled tasks and pending events + have been processed normally) + $(LI `ExitReason.timeout`: Scheduled tasks have been processed, + but there were no pending events present.) + ) + */ + ExitReason process() + { + bool any_events = false; + while (true) { + // process pending tasks + schedule(); + + logTrace("Processing pending events..."); + ExitReason er = eventDriver.processEvents(0.seconds); + logTrace("Done."); + + final switch (er) { + case ExitReason.exited: return ExitReason.exited; + case ExitReason.outOfWaiters: + if (!scheduledTaskCount) + return ExitReason.outOfWaiters; + break; + case ExitReason.timeout: + if (!scheduledTaskCount) + return any_events ? ExitReason.idle : ExitReason.timeout; + break; + case ExitReason.idle: + any_events = true; + if (!scheduledTaskCount) + return ExitReason.idle; + break; + } + } + } + + /** Performs a single round of scheduling, blocking if necessary. + + Returns: + A reason is returned: + $(UL + $(LI `ExitReason.exit`: The event loop was exited due to a manual request) + $(LI `ExitReason.outOfWaiters`: There are no more scheduled + tasks or events, so the application would do nothing from + now on) + $(LI `ExitReason.idle`: All scheduled tasks and pending events + have been processed normally) + ) + */ + ExitReason waitAndProcess() + { + // first, process tasks without blocking + auto er = process(); + + final switch (er) { + case ExitReason.exited, ExitReason.outOfWaiters: return er; + case ExitReason.idle: return ExitReason.idle; + case ExitReason.timeout: break; + } + + // if the first run didn't process any events, block and + // process one chunk + logTrace("Wait for new events to process..."); + er = eventDriver.processEvents(); + logTrace("Done."); + final switch (er) { + case ExitReason.exited: return ExitReason.exited; + case ExitReason.outOfWaiters: + if (!scheduledTaskCount) + return ExitReason.outOfWaiters; + break; + case ExitReason.timeout: assert(false, "Unexpected return code"); + case ExitReason.idle: break; + } + + // finally, make sure that all scheduled tasks are run + er = process(); + if (er == ExitReason.timeout) return ExitReason.idle; + else return er; + } + void yieldUninterruptible() { auto t = Task.getThis(); diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 1815bc4..9aad772 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -75,48 +75,63 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) bool any_fired = false; Task t; - logTrace("Performing %s async operations in %s", waitables.length, func); + bool still_inside = true; + scope (exit) still_inside = false; + + logDebugV("Performing %s async operations in %s", waitables.length, func); foreach (i, W; Waitables) { - callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow { - logTrace("Waitable %s in %s fired.", i, func); + /*scope*/auto cb = (typeof(Waitables[i].results) results) @safe nothrow { + assert(still_inside, "Notification fired after asyncAwait had already returned!"); + logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init); fired[i] = true; any_fired = true; waitables[i].results = results; if (t != Task.init) switchToTask(t); }; + callbacks[i] = cb; - logTrace("Starting operation %s", i); + logDebugV("Starting operation %s", i); waitables[i].waitCallback(callbacks[i]); - scope_guards[i] = ScopeGuard({ + + scope ccb = { if (!fired[i]) { - logTrace("Cancelling operation %s", i); + logDebugV("Cancelling operation %s", i); waitables[i].cancelCallback(callbacks[i]); + waitables[i].cancelled = true; any_fired = true; fired[i] = true; } - }); + }; + scope_guards[i] = ScopeGuard(ccb); + if (any_fired) { - logTrace("Returning without waiting."); + logDebugV("Returning to %s without waiting.", func); return; } } - logTrace("Need to wait..."); + logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible"); + t = Task.getThis(); + do { static if (interruptible) { bool interrupted = false; hibernate(() @safe nothrow { - logTrace("Got interrupted in %s.", func); + logDebugV("Got interrupted in %s.", func); interrupted = true; }); + logDebugV("Task resumed (fired=%s, interrupted=%s)", fired, interrupted); if (interrupted) throw new InterruptException; - } else hibernate(); + } else { + hibernate(); + logDebugV("Task resumed (fired=%s)", fired); + } } while (!any_fired); - logTrace("Return result for %s.", func); + logDebugV("Return result for %s.", func); } private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow; @@ -128,4 +143,26 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); }); assert(ret[0] == 42); assert(cnt == 1); -} \ No newline at end of file +} + +@safe nothrow /*@nogc*/ unittest { + int a, b, c; + Waitable!( + (cb) { a++; cb(42); }, + (cb) { assert(false); }, + int + ) w1; + Waitable!( + (cb) { b++; }, + (cb) { c++; }, + int + ) w2; + + asyncAwaitAny!false(w1, w2); + assert(w1.results[0] == 42 && w2.results[0] == 0); + assert(a == 1 && b == 0 && c == 0); + + asyncAwaitAny!false(w2, w1); + assert(w1.results[0] == 42 && w2.results[0] == 0); + assert(a == 2 && b == 1 && c == 1); +}