diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 866dec4..d7c15c8 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -23,7 +23,9 @@ import std.traits : ReturnType; */ LocalManualEvent createManualEvent() @safe { - return LocalManualEvent.init; + LocalManualEvent ret; + ret.initialize(); + return ret; } /// ditto shared(ManualEvent) createSharedManualEvent() @@ -319,7 +321,6 @@ unittest { } } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { // test deferred throwing import vibe.core.core; @@ -357,7 +358,6 @@ unittest { // test deferred throwing runEventLoop(); } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { runMutexUnitTests!TaskMutex(); } @@ -679,28 +679,45 @@ struct LocalManualEvent { @safe: private { - int m_emitCount; - ThreadLocalWaiter m_waiter; + alias Waiter = ThreadLocalWaiter!false; + + Waiter m_waiter; } // thread destructor in vibe.core.core will decrement the ref. count package static EventID ms_threadEvent; - //@disable this(this); // FIXME: commenting this out this is not a good idea... + private void initialize() + { + import vibe.internal.allocator : Mallocator, makeGCSafe; + m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); + } - deprecated("LocalManualEvent is always non-null!") - bool opCast() const nothrow { return true; } + this(this) + { + if (m_waiter) + return m_waiter.addRef(); + } + + ~this() + { + import vibe.internal.allocator : Mallocator, disposeGCSafe; + if (m_waiter) { + if (!m_waiter.releaseRef()) + () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); + } + } + + bool opCast() const nothrow { return m_waiter !is null; } /// A counter that is increased with every emit() call - int emitCount() const nothrow { return m_emitCount; } - /// ditto - int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } + int emitCount() const nothrow { return m_waiter.m_emitCount; } /// Emits the signal, waking up all owners of the signal. int emit() nothrow { logTrace("unshared emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emit(); return ec; } @@ -709,7 +726,7 @@ struct LocalManualEvent { int emitSingle() nothrow { logTrace("unshared single emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emitSingle(); return ec; } @@ -758,14 +775,14 @@ struct LocalManualEvent { target_timeout = now + timeout; } - while (m_emitCount <= emit_count) { + while (m_waiter.m_emitCount <= emit_count) { m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); try now = Clock.currTime(UTC()); catch (Exception e) { assert(false, e.msg); } if (now >= target_timeout) break; } - return m_emitCount; + return m_waiter.m_emitCount; } } @@ -793,14 +810,17 @@ unittest { struct ManualEvent { import core.thread : Thread; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; + import vibe.internal.list : StackSList; @safe: private { + alias ThreadWaiter = ThreadLocalWaiter!true; + int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; // actively waiting - StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs + StackSList!ThreadWaiter active; // actively waiting + StackSList!ThreadWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -813,7 +833,7 @@ struct ManualEvent { all } - @disable this(this); // FIXME: commenting this out this is not a good idea... + @disable this(this); deprecated("ManualEvent is always non-null!") bool opCast() const shared nothrow { return true; } @@ -831,12 +851,14 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.filter((ThreadLocalWaiter* w) { + m_waiters.lock.active.filter((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.init); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -845,7 +867,10 @@ struct ManualEvent { return true; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emit(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -862,13 +887,15 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.iterate((ThreadLocalWaiter* w) { + m_waiters.lock.active.iterate((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); if (w.unused) return true; - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.invalid); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -877,7 +904,10 @@ struct ManualEvent { return false; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emitSingle(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -937,7 +967,7 @@ struct ManualEvent { int ec = this.emitCount; - acquireThreadWaiter((ref ThreadLocalWaiter w) { + acquireThreadWaiter((scope ThreadWaiter w) { while (ec <= emit_count) { w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count); ec = this.emitCount; @@ -955,10 +985,9 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : processAllocator, make; - import core.memory : GC; + import vibe.internal.allocator : processAllocator, makeGCSafe; - ThreadLocalWaiter* w; + ThreadWaiter w; auto drv = eventDriver; with (m_waiters.lock) { @@ -984,34 +1013,37 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = processAllocator.make!ThreadLocalWaiter; + w = processAllocator.makeGCSafe!ThreadWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; - GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); } catch (Exception e) { assert(false, "Failed to allocate thread waiter."); } } (); } + assert(w.m_refCount == 1); active.add(w); } } - scope (exit) { - if (!w.releaseRef()) { - assert(w.m_driver is drv); - assert(w.unused); - with (m_waiters.lock) { - auto rmvd = active.remove(w); - assert(rmvd); - free.add(w); - // TODO: cap size of m_freeWaiters - } + scope (exit) releaseWaiter(w); + + del(w); + } + + private void releaseWaiter(ThreadWaiter w) + shared nothrow { + if (!w.releaseRef()) { + assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?"); + assert(w.unused, "Waiter still used, but not referenced!?"); + with (m_waiters.lock) { + auto rmvd = active.remove(w); + assert(rmvd, "Waiter not in active queue anymore!?"); + free.add(w); + // TODO: cap size of m_freeWaiters } } - - del(*w); } } @@ -1155,10 +1187,12 @@ package struct SpinLock { } } -private struct ThreadLocalWaiter { +private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { + import vibe.internal.list : CircularDList; + private { - static struct Waiter { - Waiter* next; + static struct TaskWaiter { + TaskWaiter* prev, next; Task task; void delegate() @safe nothrow notifier; bool cancelled; @@ -1170,50 +1204,52 @@ private struct ThreadLocalWaiter { void cancel() @safe nothrow { cancelled = true; notifier = null; } } - ThreadLocalWaiter* next; - NativeEventDriver m_driver; - EventID m_event = EventID.invalid; - StackSList!Waiter m_waiters; + static if (EVENT_TRIGGERED) { + package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread + NativeEventDriver m_driver; + EventID m_event = EventID.invalid; + } else { + int m_emitCount = 0; + } int m_refCount = 1; + TaskWaiter m_pivot; + TaskWaiter m_emitPivot; + CircularDList!(TaskWaiter*) m_waiters; } - this(this) - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.addRef(m_event); + this() + { + m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); } - ~this() - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.releaseRef(m_event); + static if (EVENT_TRIGGERED) { + ~this() + { + if (m_event != EventID.invalid) + eventDriver.events.releaseRef(m_event); + } } @property bool unused() const @safe nothrow { return m_waiters.empty; } - void addRef() @safe nothrow { m_refCount++; } - bool releaseRef() @safe nothrow { return --m_refCount > 0; } + void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; } + bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; } bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; import vibe.internal.async : Waitable, asyncAwaitAny; - Waiter w; - Waiter* pw = () @trusted { return &w; } (); + TaskWaiter waiter_store; + TaskWaiter* waiter = () @trusted { return &waiter_store; } (); - - debug void assertWaiterNotInQueue() - { - m_waiters.iterate((w) { - assert(w !is pw, "Thread local waiter already in queue!?"); - return true; - }); - } - - m_waiters.add(pw); - - scope (failure) m_waiters.remove(pw); + m_waiters.insertBack(waiter); + assert(waiter.next !is null); + scope (exit) + if (waiter.next !is null) { + m_waiters.remove(waiter); + assert(!waiter.next); + } SysTime target_timeout, now; if (timeout != Duration.max) { @@ -1222,16 +1258,16 @@ private struct ThreadLocalWaiter { target_timeout = now + timeout; } - Waitable!(typeof(Waiter.notifier), - cb => w.wait(cb), - cb => w.cancel(), + Waitable!(typeof(TaskWaiter.notifier), + cb => waiter.wait(cb), + cb => waiter.cancel(), ) waitable; if (evt != EventID.invalid) { Waitable!(EventCallback, (cb) { eventDriver.events.wait(evt, cb); - // check for exit codition *after* starting to wait for the event + // check for exit condition *after* starting to wait for the event // to avoid a race condition if (exit_condition()) { eventDriver.events.cancelWait(evt, cb); @@ -1246,111 +1282,57 @@ private struct ThreadLocalWaiter { } if (waitable.cancelled) { - m_waiters.remove(pw); + assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); return false; - } else debug assertWaiterNotInQueue(); - - return true; + } else { + assert(waiter.next is null, "Triggered waiter still in queue!?"); + return true; + } } - bool emit() + void emit() @safe nothrow { import std.algorithm.mutation : swap; + import vibe.core.core : yield; - if (m_waiters.empty) return false; + if (m_waiters.empty) return; - StackSList!Waiter waiters; - swap(m_waiters, waiters); + TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); - // FIXME: during iteration, waiters could remove themselves, but the head element will always stay in the list! - waiters.iterate((w) { - if (w.notifier !is null) { - logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); - w.notifier(); - w.notifier = null; - } else logTrace("notify callback is null"); - return true; - }); + if (pivot.next) { // another emit in progress? + // shift pivot to the end, so that the other emit call will process all pending waiters + if (pivot !is m_waiters.back) { + m_waiters.remove(pivot); + m_waiters.insertBack(pivot); + } + return; + } - return true; + m_waiters.insertBack(pivot); + scope (exit) m_waiters.remove(pivot); + + foreach (w; m_waiters) { + if (w is pivot) break; + emitWaiter(w); + } } bool emitSingle() @safe nothrow { if (m_waiters.empty) return false; + emitWaiter(m_waiters.front); + return true; + } - auto w = m_waiters.first; + private void emitWaiter(TaskWaiter* w) + @safe nothrow { m_waiters.remove(w); + if (w.notifier !is null) { logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); w.notifier(); w.notifier = null; } else logTrace("notify callback is null"); - - return true; - } -} - -private struct StackSList(T) -{ -@safe nothrow: - - import core.atomic : cas; - - private T* m_first; - - @property T* first() { return m_first; } - @property shared(T)* first() shared @trusted { return atomicLoad(m_first); } - - bool empty() const { return m_first is null; } - - void add(T* elem) - { - debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); - elem.next = m_first; - m_first = elem; - } - - bool remove(T* elem) - { - debug uint counter = 0; - T* w = m_first, wp; - while (w !is elem) { - if (!w) return false; - wp = w; - w = w.next; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } - if (wp) wp.next = w.next; - else m_first = w.next; - return true; - } - - void filter(scope bool delegate(T* el) @safe nothrow del) - { - debug uint counter = 0; - T* w = m_first, pw; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) { - if (pw) pw.next = wnext; - else m_first = wnext; - } else pw = w; - w = wnext; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } - } - - void iterate(scope bool delegate(T* el) @safe nothrow del) - { - debug uint counter = 0; - T* w = m_first; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) break; - w = wnext; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } } } @@ -1366,7 +1348,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { { } - @trusted bool tryLock() { if (cas(&m_locked, false, true)) {