diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 3ae3219..78a356b 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -500,7 +500,7 @@ private void runMutexUnitTests(M)() // basic contention test runContendedTasks(false, false); - runTask({ + auto t3 = runTask({ assert(t1.running && t2.running); assert(m.m_impl.m_locked); t1.join(); @@ -513,11 +513,12 @@ private void runMutexUnitTests(M)() exitEventLoop(); }); runEventLoop(); + assert(!t3.running); assert(!m.m_impl.m_locked); // interruption test #1 runContendedTasks(true, false); - runTask({ + t3 = runTask({ assert(t1.running && t2.running); assert(m.m_impl.m_locked); t1.interrupt(); @@ -531,11 +532,12 @@ private void runMutexUnitTests(M)() exitEventLoop(); }); runEventLoop(); + assert(!t3.running); assert(!m.m_impl.m_locked); // interruption test #2 runContendedTasks(false, true); - runTask({ + t3 = runTask({ assert(t1.running && t2.running); assert(m.m_impl.m_locked); t2.interrupt(); @@ -549,6 +551,7 @@ private void runMutexUnitTests(M)() exitEventLoop(); }); runEventLoop(); + assert(!t3.running); assert(!m.m_impl.m_locked); } @@ -799,7 +802,7 @@ struct ManualEvent { StackSList!ThreadLocalWaiter active; StackSList!ThreadLocalWaiter free; } - Monitor!(Waiters, SpinLock) m_waiters; + Monitor!(Waiters, shared(SpinLock)) m_waiters; } // thread destructor in vibe.core.core will decrement the ref. count @@ -830,17 +833,48 @@ struct ManualEvent { ThreadLocalWaiter* lw; auto drv = eventDriver; - m_waiters.lock((ref waiters) { - waiters.active.filter((ThreadLocalWaiter* w) { - () @trusted { logTrace("waiter %s", cast(void*)w); } (); - if (w.m_driver is drv) lw = w; - else { - try { - () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); - } catch (Exception e) assert(false, e.msg); - } - return true; - }); + m_waiters.lock.active.filter((ThreadLocalWaiter* w) { + () @trusted { logTrace("waiter %s", cast(void*)w); } (); + if (w.m_driver is drv) lw = w; + else { + try { + assert(w.m_event != EventID.init); + () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); + } catch (Exception e) assert(false, e.msg); + } + return true; + }); + () @trusted { logTrace("lw %s", cast(void*)lw); } (); + if (lw) lw.emit(); + + logTrace("emit shared done"); + + return ec; + } + + /// Emits the signal, waking up at least one waiting task + int emitSingle() + shared nothrow @trusted { + import core.atomic : atomicOp, cas; + + () @trusted { logTrace("emit shared single %s", cast(void*)&this); } (); + + auto ec = atomicOp!"+="(m_emitCount, 1); + auto thisthr = Thread.getThis(); + + ThreadLocalWaiter* lw; + auto drv = eventDriver; + m_waiters.lock.active.iterate((ThreadLocalWaiter* w) { + () @trusted { logTrace("waiter %s", cast(void*)w); } (); + if (w.unused) return true; + if (w.m_driver is drv) lw = w; + else { + try { + assert(w.m_event != EventID.invalid); + () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); + } catch (Exception e) assert(false, e.msg); + } + return false; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); if (lw) lw.emit(); @@ -925,8 +959,8 @@ struct ManualEvent { ThreadLocalWaiter* w; auto drv = eventDriver; - m_waiters.lock((ref waiters) { - waiters.active.filter((aw) { + with (m_waiters.lock) { + active.iterate((aw) { if (aw.m_driver is drv) { w = aw; } @@ -934,7 +968,7 @@ struct ManualEvent { }); if (!w) { - waiters.free.filter((fw) { + free.filter((fw) { if (fw.m_driver is drv) { w = fw; return false; @@ -955,17 +989,17 @@ struct ManualEvent { } (); } - waiters.active.add(w); + active.add(w); } - }); + } scope (exit) { if (w.unused) { - m_waiters.lock((ref waiters) { - waiters.active.remove(w); - waiters.free.add(w); + with (m_waiters.lock) { + active.remove(w); + free.add(w); // TODO: cap size of m_freeWaiters - }); + } } } @@ -990,19 +1024,40 @@ unittest { runEventLoop(); } -private shared struct Monitor(T, M) +package shared struct Monitor(T, M) { + alias Mutex = M; + alias Data = T; private { - shared T m_data; - shared M m_mutex; + Mutex mutex; + Data data; } - void lock(scope void delegate(ref T) @safe nothrow access) - shared { - m_mutex.lock(); - scope (exit) m_mutex.unlock(); - access(*() @trusted { return cast(T*)&m_data; } ()); + static struct Locked { + shared(Monitor)* m; + @disable this(this); + ~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); } + ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } + alias get this; } + + Locked lock() { + () @trusted { (cast(Mutex)mutex).lock(); } (); + return Locked(() @trusted { return &this; } ()); + } + + const(Locked) lock() const { + () @trusted { (cast(Mutex)mutex).lock(); } (); + return const(Locked)(() @trusted { return &this; } ()); + } +} + + +package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex) +@trusted { + shared(Monitor!(T, M)) ret; + ret.mutex = cast(shared)mutex; + return ret; } package struct SpinLock { @@ -1053,7 +1108,7 @@ private struct ThreadLocalWaiter { ThreadLocalWaiter* next; NativeEventDriver m_driver; - EventID m_event; + EventID m_event = EventID.invalid; Waiter* m_waiters; } @@ -1221,6 +1276,16 @@ private struct StackSList(T) w = wnext; } } + + void iterate(scope bool delegate(T* el) @safe nothrow del) + { + T* w = m_first; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) break; + w = wnext; + } + } } private struct TaskMutexImpl(bool INTERRUPTIBLE) {