diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index df3a878..cd6a82a 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -126,13 +126,13 @@ class LocalTaskSemaphore //import vibe.utils.memory; private { - static struct Waiter { + static struct ThreadWaiter { ManualEvent signal; ubyte priority; uint seq; } - BinaryHeap!(Array!Waiter, asc) m_waiters; + BinaryHeap!(Array!ThreadWaiter, asc) m_waiters; uint m_maxLocks; uint m_locks; uint m_seq; @@ -182,7 +182,7 @@ class LocalTaskSemaphore if (tryLock()) return; - Waiter w; + ThreadWaiter w; w.signal = createManualEvent(); w.priority = priority; w.seq = min(0, m_seq - w.priority); @@ -201,7 +201,7 @@ class LocalTaskSemaphore { m_locks--; if (m_waiters.length > 0 && available > 0) { - Waiter w = m_waiters.front(); + ThreadWaiter w = m_waiters.front(); w.signal.emit(); // resume one m_waiters.removeFront(); } @@ -209,7 +209,7 @@ class LocalTaskSemaphore // if true, a goes after b. ie. b comes out front() /// private - static bool asc(ref Waiter a, ref Waiter b) + static bool asc(ref ThreadWaiter a, ref ThreadWaiter b) { if (a.seq == b.seq) { if (a.priority == b.priority) { @@ -225,7 +225,7 @@ class LocalTaskSemaphore private void rewindSeq() { - Array!Waiter waiters = m_waiters.release(); + Array!ThreadWaiter waiters = m_waiters.release(); ushort min_seq; import std.algorithm : min; foreach (ref waiter; waiters[]) @@ -644,23 +644,24 @@ shared(ManualEvent) createSharedManualEvent() */ struct ManualEvent { import core.thread : Thread; - import vibe.internal.async : asyncAwait, asyncAwaitUninterruptible; + import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; private { - static struct Waiter { - Waiter* next; - immutable EventID event; - immutable EventDriver driver; - immutable Thread thread; - StackSList!ThreadWaiter tasks; - } static struct ThreadWaiter { ThreadWaiter* next; + /*immutable*/ EventID event; + /*immutable*/ EventDriver driver; + //immutable Thread thread; + StackSList!LocalWaiter tasks; + } + static struct LocalWaiter { + LocalWaiter* next; Task task; void delegate() @safe nothrow notifier; + bool cancelled = false; void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; } - void cancel() @safe nothrow { notifier = null; } + void cancel() @safe nothrow { cancelled = true; auto n = notifier; notifier = null; n(); } void wait(void delegate() @safe nothrow del) shared @safe nothrow { @@ -675,7 +676,7 @@ struct ManualEvent { } } int m_emitCount; - Waiter* m_waiters; + ThreadWaiter* m_waiters; } @@ -710,20 +711,22 @@ struct ManualEvent { final switch (mode) { case EmitMode.all: // FIXME: would be nice to have atomicSwap instead - auto w = cast(Waiter*)atomicLoad(m_waiters); - if (w !is null && !cas(&m_waiters, cast(shared(Waiter)*)w, cast(shared(Waiter)*)null)) + auto w = cast(ThreadWaiter*)atomicLoad(m_waiters); + if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) return ec; while (w !is null) { - if (w.thread is thisthr) { + if (w.driver is eventDriver) { // Note: emitForThisThread can result in w getting deallocated at any // time, so we need to copy any fields first auto tasks = w.tasks; w = w.next; emitForThisThread(w.tasks.m_first, mode); } else { + auto drv = w.driver; auto evt = w.event; w = w.next; - eventDriver.triggerEvent(evt, true); + if (evt != EventID.init) + drv.triggerEvent(evt, true); } } break; @@ -742,7 +745,7 @@ struct ManualEvent { auto w = m_waiters; m_waiters = null; if (w !is null) { - assert(w.thread is Thread.getThis(), "Unshared ManualEvent has waiters in foreign thread!"); + assert(w.driver is eventDriver, "Unshared ManualEvent has waiters in foreign thread!"); assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!"); emitForThisThread(w.tasks.m_first, EmitMode.all); } @@ -781,8 +784,8 @@ struct ManualEvent { */ int wait(Duration timeout, int emit_count) { - Waiter w; - ThreadWaiter tw; + ThreadWaiter w; + LocalWaiter tw; int ec = this.emitCount; while (ec <= emit_count) { @@ -799,12 +802,13 @@ struct ManualEvent { /// ditto int wait(Duration timeout, int emit_count) shared { - shared(Waiter) w; - ThreadWaiter tw; - acquireWaiter(w, tw); + shared(ThreadWaiter) w; + LocalWaiter tw; int ec = this.emitCount; while (ec <= emit_count) { + acquireWaiter(w, tw); + if (tw.next) { // if we are not the first waiter for this thread, // wait for getting resumed by emitForThisThread @@ -816,14 +820,21 @@ struct ManualEvent { } else { // if we are the first waiter for this thread, // wait for the thread event to get emitted - /*asyncAwait!(EventCallback, void delegate() @safe nothrow, + Waitable!( cb => eventDriver.waitForEvent(ms_threadEvent, cb), + cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb), + EventID + ) eventwaiter; + Waitable!( cb => tw.wait(cb), - cb => eventDriver.cancelWaitForEvent(ms_threadEvent) - )(timeout); - emitForThisThread(w.waiters); - ec = this.emitCount;*/ - assert(false); + cb => tw.cancel() + ) localwaiter; + asyncAwaitAny!true(timeout, eventwaiter, localwaiter); + + ec = this.emitCount; + + if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode + else if (localwaiter.cancelled) break; // timeout } } return ec; @@ -844,8 +855,8 @@ struct ManualEvent { /// ditto int waitUninterruptible(Duration timeout, int emit_count) nothrow { - Waiter w; - ThreadWaiter tw; + ThreadWaiter w; + LocalWaiter tw; acquireWaiter(w, tw); int ec = this.emitCount; @@ -861,23 +872,45 @@ struct ManualEvent { /// ditto int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { - /*Waiter w; - ThreadWaiter tw; - auto event = acquireWaiter(w, tw); + shared(ThreadWaiter) w; + LocalWaiter tw; int ec = this.emitCount; while (ec <= emit_count) { - asyncAwaitUninterruptible!(void delegate(), - cb => tw.wait(cb), - cb => tw.cancel() - )(timeout); - ec = this.emitCount; + acquireWaiter(w, tw); + + if (tw.next) { + // if we are not the first waiter for this thread, + // wait for getting resumed by emitForThisThread + asyncAwaitUninterruptible!(void delegate() @safe nothrow, + cb => tw.wait(cb), + cb => tw.cancel() + )(timeout); + ec = this.emitCount; + } else { + // if we are the first waiter for this thread, + // wait for the thread event to get emitted + Waitable!( + cb => eventDriver.waitForEvent(ms_threadEvent, cb), + cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb), + EventID + ) eventwaiter; + Waitable!( + cb => tw.wait(cb), + cb => tw.cancel() + ) localwaiter; + asyncAwaitAny!false(timeout, eventwaiter, localwaiter); + + ec = this.emitCount; + + if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode + else if (localwaiter.cancelled) break; // timeout + } } - return ec;*/ - assert(false); + return ec; } - private static bool emitForThisThread(ThreadWaiter* waiters, EmitMode mode) + private static bool emitForThisThread(LocalWaiter* waiters, EmitMode mode) nothrow { if (!waiters) return false; @@ -896,9 +929,9 @@ struct ManualEvent { return true; } - private void acquireWaiter(ref Waiter w, ref ThreadWaiter tw) + private void acquireWaiter(ref ThreadWaiter w, ref LocalWaiter tw) nothrow { - // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the Waiter will be dangling + // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling tw.task = Task.getThis(); if (m_waiters) { @@ -908,18 +941,28 @@ struct ManualEvent { } } - private void acquireWaiter(ref shared(Waiter) w, ref ThreadWaiter tw) + private void acquireWaiter(ref shared(ThreadWaiter) w, ref LocalWaiter tw) nothrow shared { tw.task = Task.getThis(); if (ms_threadEvent == EventID.init) ms_threadEvent = eventDriver.createEvent(); + auto sdriver = cast(shared)eventDriver; + if (m_waiters) { - //m_waiters.tasks.add(&tw); - assert(false); + shared(ThreadWaiter)* pw = m_waiters; + while (pw !is null) { + if (pw.driver is sdriver) { + (cast(ThreadWaiter*)pw).tasks.add(&tw); + break; + } + pw = atomicLoad(pw.next); + } } else { m_waiters = &w; + w.event = ms_threadEvent; + w.driver = sdriver; } } }