diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 974384e..bb4b2cf 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -711,7 +711,7 @@ struct ManualEvent { all } - //@disable this(this); + //@disable this(this); // FIXME: commenting this out this is not a good idea... deprecated("ManualEvent is always non-null!") bool opCast() const nothrow { return true; } @@ -850,12 +850,12 @@ struct ManualEvent { int ec = this.emitCount; while (ec <= emit_count) { ThreadWaiter w; - LocalWaiter tw; - () @trusted { acquireWaiter(&w, &tw); } (); + LocalWaiter lw; + () @trusted { acquireWaiter(&w, &lw); } (); Waitable!( - cb => tw.wait(cb), - cb => tw.cancel() + cb => lw.wait(cb), + cb => lw.cancel() ) waitable; asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); ec = this.emitCount; @@ -883,16 +883,24 @@ struct ManualEvent { int ec = this.emitCount; while (ec <= emit_count) { shared(ThreadWaiter) w; - LocalWaiter tw; - () @trusted { acquireWaiter(&w, &tw); } (); + LocalWaiter lw; + () @trusted { acquireWaiter(&w, &lw); } (); () @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } (); - if (tw.next) { + scope (exit) { + shared(ThreadWaiter)* pw = atomicLoad(m_waiters); + while (pw !is null) { + assert(pw !is () @trusted { return &w; } (), "Thread waiter was not removed from queue."); + pw = pw.next; + } + } + + if (lw.next) { // if we are not the first waiter for this thread, // wait for getting resumed by emitForThisThread Waitable!( - cb => tw.wait(cb), - cb => tw.cancel() + cb => lw.wait(cb), + cb => lw.cancel() ) waitable; asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); if (waitable.cancelled) break; // timeout @@ -906,8 +914,8 @@ struct ManualEvent { EventID ) eventwaiter; Waitable!( - cb => tw.wait(cb), - cb => tw.cancel() + cb => lw.wait(cb), + cb => lw.cancel() ) localwaiter; logDebugV("Wait on event %s", ms_threadEvent); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter); @@ -961,21 +969,21 @@ struct ManualEvent { return true; } - private void acquireWaiter(ThreadWaiter* w, LocalWaiter* tw) + private void acquireWaiter(ThreadWaiter* w, LocalWaiter* lw) nothrow { // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling - tw.task = Task.getThis(); + lw.task = Task.getThis(); if (m_waiters) { - m_waiters.tasks.add(tw); + m_waiters.tasks.add(lw); } else { m_waiters = w; } } - private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* tw) + private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* lw) nothrow shared { - tw.task = Task.getThis(); + lw.task = Task.getThis(); if (ms_threadEvent == EventID.init) ms_threadEvent = eventDriver.events.create(); @@ -983,10 +991,12 @@ struct ManualEvent { auto sdriver = () @trusted { return cast(shared)eventDriver; } (); shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } (); - assert(pw !is w, "Waiter is already registered!"); + size_t cnt = 0; while (pw !is null) { + assert(pw !is w, "Waiter is already registered!"); if (pw.driver is sdriver) break; + assert(cnt++ < 1000, "Recursive waiter?!"); pw = () @trusted { return atomicLoad(pw.next); } (); } @@ -1002,10 +1012,47 @@ struct ManualEvent { } while (!() @trusted { return cas(&m_waiters, wn, w); } ()); } - () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(tw); + () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(lw); } } +unittest { + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + logInfo("A"); + + auto e = createManualEvent(); + auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); + auto w2 = runTask({ e.wait(500.msecs, e.emitCount); }); + runTask({ + sleep(200.msecs); + e.emit(); + sleep(50.msecs); + assert(!w1.running && !w2.running); + exitEventLoop(); + }); + runEventLoop(); + logInfo("B"); +} + +unittest { + import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep; + logInfo("C"); + + auto e = createSharedManualEvent(); + auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); + static void w(shared(ManualEvent) e){e.wait(500.msecs, e.emitCount);} + auto w2 = runWorkerTaskH(&w, e); + runTask({ + sleep(200.msecs); + e.emit(); + sleep(50.msecs); + assert(!w1.running && !w2.running); + exitEventLoop(); + }); + runEventLoop(); + logInfo("D"); +} + private struct StackSList(T) {