From 4bccf6fcb596245c8dd8254d12db5517a10e7dfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 19 Jul 2017 14:54:33 +0200 Subject: [PATCH 1/9] Use StackSList for ThreadLocalWaiter and add simple loop detection. This will have to be adjusted to use a circular list with the possibility to insert a pivot element, so that consumption of waiters is safe in all cases (see the comment at 1265). --- source/vibe/core/sync.d | 85 ++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index a3b9976..866dec4 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1173,7 +1173,7 @@ private struct ThreadLocalWaiter { ThreadLocalWaiter* next; NativeEventDriver m_driver; EventID m_event = EventID.invalid; - Waiter* m_waiters; + StackSList!Waiter m_waiters; int m_refCount = 1; } @@ -1189,7 +1189,7 @@ private struct ThreadLocalWaiter { m_driver.events.releaseRef(m_event); } - @property bool unused() const @safe nothrow { return m_waiters is null; } + @property bool unused() const @safe nothrow { return m_waiters.empty; } void addRef() @safe nothrow { m_refCount++; } bool releaseRef() @safe nothrow { return --m_refCount > 0; } @@ -1201,8 +1201,19 @@ private struct ThreadLocalWaiter { Waiter w; Waiter* pw = () @trusted { return &w; } (); - w.next = m_waiters; - m_waiters = pw; + + + debug void assertWaiterNotInQueue() + { + m_waiters.iterate((w) { + assert(w !is pw, "Thread local waiter already in queue!?"); + return true; + }); + } + + m_waiters.add(pw); + + scope (failure) m_waiters.remove(pw); SysTime target_timeout, now; if (timeout != Duration.max) { @@ -1216,22 +1227,6 @@ private struct ThreadLocalWaiter { cb => w.cancel(), ) waitable; - void removeWaiter() - @safe nothrow { - Waiter* piw = m_waiters, ppiw = null; - while (piw !is null) { - if (piw is pw) { - if (ppiw) ppiw.next = piw.next; - else m_waiters = piw.next; - break; - } - ppiw = piw; - piw = piw.next; - } - } - - scope (failure) removeWaiter(); - if (evt != EventID.invalid) { Waitable!(EventCallback, (cb) { @@ -1251,45 +1246,41 @@ private struct ThreadLocalWaiter { } if (waitable.cancelled) { - removeWaiter(); + m_waiters.remove(pw); return false; - } else debug { - Waiter* piw = m_waiters; - while (piw !is null) { - assert(piw !is pw, "Thread local waiter still in queue after it got notified!?"); - piw = piw.next; - } - } + } else debug assertWaiterNotInQueue(); return true; } bool emit() @safe nothrow { - if (!m_waiters) return false; + import std.algorithm.mutation : swap; - Waiter* waiters = m_waiters; - m_waiters = null; - while (waiters) { - auto wnext = waiters.next; - assert(wnext !is waiters); - if (waiters.notifier !is null) { - logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr); - waiters.notifier(); - waiters.notifier = null; + if (m_waiters.empty) return false; + + StackSList!Waiter waiters; + swap(m_waiters, waiters); + + // FIXME: during iteration, waiters could remove themselves, but the head element will always stay in the list! + waiters.iterate((w) { + if (w.notifier !is null) { + logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); + w.notifier(); + w.notifier = null; } else logTrace("notify callback is null"); - waiters = wnext; - } + return true; + }); return true; } bool emitSingle() @safe nothrow { - if (!m_waiters) return false; + if (m_waiters.empty) return false; - auto w = m_waiters; - m_waiters = w.next; + auto w = m_waiters.first; + m_waiters.remove(w); if (w.notifier !is null) { logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); w.notifier(); @@ -1322,11 +1313,13 @@ private struct StackSList(T) bool remove(T* elem) { + debug uint counter = 0; T* w = m_first, wp; while (w !is elem) { if (!w) return false; wp = w; w = w.next; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); } if (wp) wp.next = w.next; else m_first = w.next; @@ -1335,6 +1328,7 @@ private struct StackSList(T) void filter(scope bool delegate(T* el) @safe nothrow del) { + debug uint counter = 0; T* w = m_first, pw; while (w !is null) { auto wnext = w.next; @@ -1343,16 +1337,19 @@ private struct StackSList(T) else m_first = wnext; } else pw = w; w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); } } void iterate(scope bool delegate(T* el) @safe nothrow del) { + debug uint counter = 0; T* w = m_first; while (w !is null) { auto wnext = w.next; if (!del(w)) break; w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); } } } @@ -2007,4 +2004,4 @@ class InterruptibleTaskReadWriteMutex /** The policy with which the lock has been created. */ @property Policy policy() const { return m_state.policy; } -} \ No newline at end of file +} From cfb4f831135c54b2f0711d9ef1a193f57918e9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 13:34:12 +0200 Subject: [PATCH 2/9] Add makeGCSafe/disposeGCSafe as GC safe variants of make/dispose. If necessary, these will call GC.addRange/GC.removeRange to avoid dangling GC references. --- source/vibe/internal/allocator.d | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/source/vibe/internal/allocator.d b/source/vibe/internal/allocator.d index 596b88f..d496aef 100644 --- a/source/vibe/internal/allocator.d +++ b/source/vibe/internal/allocator.d @@ -16,3 +16,26 @@ public import std.experimental.allocator.mallocator; s_threadAllocator = () @trusted { return allocatorObject(GCAllocator.instance); } (); return s_threadAllocator; } + +auto makeGCSafe(T, Allocator, A...)(Allocator allocator, A args) +{ + import core.memory : GC; + import std.traits : hasIndirections; + + auto ret = allocator.make!T(args); + static if (is (T == class)) enum tsize = __traits(classInstanceSize, T); + else enum tsize = T.sizeof; + static if (hasIndirections!T) + GC.addRange(cast(void*)ret, tsize, typeid(T)); + return ret; +} + +void disposeGCSafe(T, Allocator)(Allocator allocator, T obj) +{ + import core.memory : GC; + import std.traits : hasIndirections; + + static if (hasIndirections!T) + GC.removeRange(cast(void*)obj); + allocator.dispose(obj); +} From 536fa0978010b5ee518ade1981f58deed367e678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 13:34:39 +0200 Subject: [PATCH 3/9] Add helper module with intrusive singly and doubly linked lists. --- source/vibe/internal/list.d | 192 ++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 source/vibe/internal/list.d diff --git a/source/vibe/internal/list.d b/source/vibe/internal/list.d new file mode 100644 index 0000000..cfc4135 --- /dev/null +++ b/source/vibe/internal/list.d @@ -0,0 +1,192 @@ +module vibe.internal.list; + +import core.atomic; + +struct CircularDList(T) +{ + private { + T m_pivot; + } + + this(T pivot) + { + assert(pivot.prev is null && pivot.next is null); + pivot.next = pivot.prev = pivot; + m_pivot = pivot; + assert(this.empty); + } + + bool empty() const { return m_pivot.next is m_pivot; } + + + @property T front() { return m_pivot.next is m_pivot ? null : m_pivot.next; } + @property T back() { return m_pivot.prev is m_pivot ? null : m_pivot.prev; } + + void remove(T elem) + { + assert(elem !is m_pivot); + elem.prev.next = elem.next; + elem.next.prev = elem.prev; + elem.prev = elem.next = null; + } + + void insertBefore(T elem, T pivot) + { + assert(elem.prev is null && elem.next is null); + elem.prev = pivot.prev; + elem.next = pivot; + pivot.prev.next = elem; + pivot.prev = elem; + } + + void insertFront(T elem) { insertBefore(elem, m_pivot.next); } + + void insertBack(T elem) { insertBefore(elem, m_pivot); } + + // NOTE: allowed to remove the current element + int opApply(int delegate(T) @safe nothrow del) + @safe nothrow { + T prev = m_pivot; + debug size_t counter = 0; + while (prev.next !is m_pivot) { + auto el = prev.next; + if (auto ret = del(el)) + return ret; + if (prev.next is el) + prev = prev.next; + debug assert (++counter < 1_000_000, "Cycle in list?"); + } + return 0; + } +} + +unittest { + static final class C { + C prev, next; + int i; + this(int i) { this.i = i; } + } + + alias L = CircularDList!C; + auto l = L(new C(0)); + assert(l.empty); + + auto c = new C(1); + l.insertBack(c); + assert(!l.empty); + assert(l.front is c); + assert(l.back is c); + foreach (c; l) assert(c.i == 1); + + auto c2 = new C(2); + l.insertFront(c2); + assert(!l.empty); + assert(l.front is c2); + assert(l.back is c); + foreach (c; l) assert(c.i == 1 || c.i == 2); + + l.remove(c); + assert(!l.empty); + assert(l.front is c2); + assert(l.back is c2); + foreach (c; l) assert(c.i == 2); + + l.remove(c2); + assert(l.empty); +} + + +struct StackSList(T) +{ +@safe nothrow: + + import core.atomic : cas; + + private T m_first; + + @property T first() { return m_first; } + @property T front() { return m_first; } + + bool empty() const { return m_first is null; } + + void add(T elem) + { + debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); + elem.next = m_first; + m_first = elem; + } + + bool remove(T elem) + { + debug uint counter = 0; + T w = m_first, wp; + while (w !is elem) { + if (!w) return false; + wp = w; + w = w.next; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + if (wp) wp.next = w.next; + else m_first = w.next; + return true; + } + + void filter(scope bool delegate(T el) @safe nothrow del) + { + debug uint counter = 0; + T w = m_first, pw; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) { + if (pw) pw.next = wnext; + else m_first = wnext; + } else pw = w; + w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + } + + void iterate(scope bool delegate(T el) @safe nothrow del) + { + debug uint counter = 0; + T w = m_first; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) break; + w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + } +} + +unittest { + static final class C { + C next; + int i; + this(int i) { this.i = i; } + } + + alias L = StackSList!C; + L l; + assert(l.empty); + + auto c = new C(1); + l.add(c); + assert(!l.empty); + assert(l.front is c); + l.iterate((el) { assert(el.i == 1); return true; }); + + auto c2 = new C(2); + l.add(c2); + assert(!l.empty); + assert(l.front is c2); + l.iterate((el) { assert(el.i == 1 || el.i == 2); return true; }); + + l.remove(c); + assert(!l.empty); + assert(l.front is c2); + l.iterate((el) { assert(el.i == 2); return true; }); + + l.filter((el) => el.i == 0); + assert(l.empty); +} From 19db7732e6ef2b99a5744b33914a74676868eff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 13:36:27 +0200 Subject: [PATCH 4/9] Add nothrow annotations. --- source/vibe/core/task.d | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 437d50c..e146d76 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -809,7 +809,7 @@ package struct TaskScheduler { Returns `true` $(I iff) there are more tasks left to process. */ ScheduleStatus schedule() - { + nothrow { if (m_taskQueue.empty) return ScheduleStatus.idle; @@ -846,7 +846,7 @@ package struct TaskScheduler { /// Resumes execution of a yielded task. private void resumeTask(Task t) - { + nothrow { import std.encoding : sanitize; logTrace("task fiber resume"); From a4b36f08d352cc5e9205e426129f8c02fd53bfa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 15:57:44 +0200 Subject: [PATCH 5/9] Fix multiple issues in (Local)ManualEvent. - Copying LocalManualEvent now works correctly, using reference counting - ManualEvent correctly pins the reference to the thread-local waiter until it has finished emitting it - ThreadLocalWaiter uses a doubly-linked list to manage task waiters (more efficient deletion, FIFO trigger order) - Fixed a bug in ThreadLocalWaiter.emit() where the head element of the iterated list might already have stopped waiting, resulting in an invocation of a dangling TaskWaiter pointer --- source/vibe/core/sync.d | 313 +++++++++++++++++++--------------------- 1 file changed, 147 insertions(+), 166 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 866dec4..d7c15c8 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -23,7 +23,9 @@ import std.traits : ReturnType; */ LocalManualEvent createManualEvent() @safe { - return LocalManualEvent.init; + LocalManualEvent ret; + ret.initialize(); + return ret; } /// ditto shared(ManualEvent) createSharedManualEvent() @@ -319,7 +321,6 @@ unittest { } } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { // test deferred throwing import vibe.core.core; @@ -357,7 +358,6 @@ unittest { // test deferred throwing runEventLoop(); } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { runMutexUnitTests!TaskMutex(); } @@ -679,28 +679,45 @@ struct LocalManualEvent { @safe: private { - int m_emitCount; - ThreadLocalWaiter m_waiter; + alias Waiter = ThreadLocalWaiter!false; + + Waiter m_waiter; } // thread destructor in vibe.core.core will decrement the ref. count package static EventID ms_threadEvent; - //@disable this(this); // FIXME: commenting this out this is not a good idea... + private void initialize() + { + import vibe.internal.allocator : Mallocator, makeGCSafe; + m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); + } - deprecated("LocalManualEvent is always non-null!") - bool opCast() const nothrow { return true; } + this(this) + { + if (m_waiter) + return m_waiter.addRef(); + } + + ~this() + { + import vibe.internal.allocator : Mallocator, disposeGCSafe; + if (m_waiter) { + if (!m_waiter.releaseRef()) + () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); + } + } + + bool opCast() const nothrow { return m_waiter !is null; } /// A counter that is increased with every emit() call - int emitCount() const nothrow { return m_emitCount; } - /// ditto - int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } + int emitCount() const nothrow { return m_waiter.m_emitCount; } /// Emits the signal, waking up all owners of the signal. int emit() nothrow { logTrace("unshared emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emit(); return ec; } @@ -709,7 +726,7 @@ struct LocalManualEvent { int emitSingle() nothrow { logTrace("unshared single emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emitSingle(); return ec; } @@ -758,14 +775,14 @@ struct LocalManualEvent { target_timeout = now + timeout; } - while (m_emitCount <= emit_count) { + while (m_waiter.m_emitCount <= emit_count) { m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); try now = Clock.currTime(UTC()); catch (Exception e) { assert(false, e.msg); } if (now >= target_timeout) break; } - return m_emitCount; + return m_waiter.m_emitCount; } } @@ -793,14 +810,17 @@ unittest { struct ManualEvent { import core.thread : Thread; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; + import vibe.internal.list : StackSList; @safe: private { + alias ThreadWaiter = ThreadLocalWaiter!true; + int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; // actively waiting - StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs + StackSList!ThreadWaiter active; // actively waiting + StackSList!ThreadWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -813,7 +833,7 @@ struct ManualEvent { all } - @disable this(this); // FIXME: commenting this out this is not a good idea... + @disable this(this); deprecated("ManualEvent is always non-null!") bool opCast() const shared nothrow { return true; } @@ -831,12 +851,14 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.filter((ThreadLocalWaiter* w) { + m_waiters.lock.active.filter((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.init); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -845,7 +867,10 @@ struct ManualEvent { return true; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emit(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -862,13 +887,15 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.iterate((ThreadLocalWaiter* w) { + m_waiters.lock.active.iterate((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); if (w.unused) return true; - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.invalid); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -877,7 +904,10 @@ struct ManualEvent { return false; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emitSingle(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -937,7 +967,7 @@ struct ManualEvent { int ec = this.emitCount; - acquireThreadWaiter((ref ThreadLocalWaiter w) { + acquireThreadWaiter((scope ThreadWaiter w) { while (ec <= emit_count) { w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count); ec = this.emitCount; @@ -955,10 +985,9 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : processAllocator, make; - import core.memory : GC; + import vibe.internal.allocator : processAllocator, makeGCSafe; - ThreadLocalWaiter* w; + ThreadWaiter w; auto drv = eventDriver; with (m_waiters.lock) { @@ -984,34 +1013,37 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = processAllocator.make!ThreadLocalWaiter; + w = processAllocator.makeGCSafe!ThreadWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; - GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); } catch (Exception e) { assert(false, "Failed to allocate thread waiter."); } } (); } + assert(w.m_refCount == 1); active.add(w); } } - scope (exit) { - if (!w.releaseRef()) { - assert(w.m_driver is drv); - assert(w.unused); - with (m_waiters.lock) { - auto rmvd = active.remove(w); - assert(rmvd); - free.add(w); - // TODO: cap size of m_freeWaiters - } + scope (exit) releaseWaiter(w); + + del(w); + } + + private void releaseWaiter(ThreadWaiter w) + shared nothrow { + if (!w.releaseRef()) { + assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?"); + assert(w.unused, "Waiter still used, but not referenced!?"); + with (m_waiters.lock) { + auto rmvd = active.remove(w); + assert(rmvd, "Waiter not in active queue anymore!?"); + free.add(w); + // TODO: cap size of m_freeWaiters } } - - del(*w); } } @@ -1155,10 +1187,12 @@ package struct SpinLock { } } -private struct ThreadLocalWaiter { +private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { + import vibe.internal.list : CircularDList; + private { - static struct Waiter { - Waiter* next; + static struct TaskWaiter { + TaskWaiter* prev, next; Task task; void delegate() @safe nothrow notifier; bool cancelled; @@ -1170,50 +1204,52 @@ private struct ThreadLocalWaiter { void cancel() @safe nothrow { cancelled = true; notifier = null; } } - ThreadLocalWaiter* next; - NativeEventDriver m_driver; - EventID m_event = EventID.invalid; - StackSList!Waiter m_waiters; + static if (EVENT_TRIGGERED) { + package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread + NativeEventDriver m_driver; + EventID m_event = EventID.invalid; + } else { + int m_emitCount = 0; + } int m_refCount = 1; + TaskWaiter m_pivot; + TaskWaiter m_emitPivot; + CircularDList!(TaskWaiter*) m_waiters; } - this(this) - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.addRef(m_event); + this() + { + m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); } - ~this() - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.releaseRef(m_event); + static if (EVENT_TRIGGERED) { + ~this() + { + if (m_event != EventID.invalid) + eventDriver.events.releaseRef(m_event); + } } @property bool unused() const @safe nothrow { return m_waiters.empty; } - void addRef() @safe nothrow { m_refCount++; } - bool releaseRef() @safe nothrow { return --m_refCount > 0; } + void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; } + bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; } bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; import vibe.internal.async : Waitable, asyncAwaitAny; - Waiter w; - Waiter* pw = () @trusted { return &w; } (); + TaskWaiter waiter_store; + TaskWaiter* waiter = () @trusted { return &waiter_store; } (); - - debug void assertWaiterNotInQueue() - { - m_waiters.iterate((w) { - assert(w !is pw, "Thread local waiter already in queue!?"); - return true; - }); - } - - m_waiters.add(pw); - - scope (failure) m_waiters.remove(pw); + m_waiters.insertBack(waiter); + assert(waiter.next !is null); + scope (exit) + if (waiter.next !is null) { + m_waiters.remove(waiter); + assert(!waiter.next); + } SysTime target_timeout, now; if (timeout != Duration.max) { @@ -1222,16 +1258,16 @@ private struct ThreadLocalWaiter { target_timeout = now + timeout; } - Waitable!(typeof(Waiter.notifier), - cb => w.wait(cb), - cb => w.cancel(), + Waitable!(typeof(TaskWaiter.notifier), + cb => waiter.wait(cb), + cb => waiter.cancel(), ) waitable; if (evt != EventID.invalid) { Waitable!(EventCallback, (cb) { eventDriver.events.wait(evt, cb); - // check for exit codition *after* starting to wait for the event + // check for exit condition *after* starting to wait for the event // to avoid a race condition if (exit_condition()) { eventDriver.events.cancelWait(evt, cb); @@ -1246,111 +1282,57 @@ private struct ThreadLocalWaiter { } if (waitable.cancelled) { - m_waiters.remove(pw); + assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); return false; - } else debug assertWaiterNotInQueue(); - - return true; + } else { + assert(waiter.next is null, "Triggered waiter still in queue!?"); + return true; + } } - bool emit() + void emit() @safe nothrow { import std.algorithm.mutation : swap; + import vibe.core.core : yield; - if (m_waiters.empty) return false; + if (m_waiters.empty) return; - StackSList!Waiter waiters; - swap(m_waiters, waiters); + TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); - // FIXME: during iteration, waiters could remove themselves, but the head element will always stay in the list! - waiters.iterate((w) { - if (w.notifier !is null) { - logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); - w.notifier(); - w.notifier = null; - } else logTrace("notify callback is null"); - return true; - }); + if (pivot.next) { // another emit in progress? + // shift pivot to the end, so that the other emit call will process all pending waiters + if (pivot !is m_waiters.back) { + m_waiters.remove(pivot); + m_waiters.insertBack(pivot); + } + return; + } - return true; + m_waiters.insertBack(pivot); + scope (exit) m_waiters.remove(pivot); + + foreach (w; m_waiters) { + if (w is pivot) break; + emitWaiter(w); + } } bool emitSingle() @safe nothrow { if (m_waiters.empty) return false; + emitWaiter(m_waiters.front); + return true; + } - auto w = m_waiters.first; + private void emitWaiter(TaskWaiter* w) + @safe nothrow { m_waiters.remove(w); + if (w.notifier !is null) { logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); w.notifier(); w.notifier = null; } else logTrace("notify callback is null"); - - return true; - } -} - -private struct StackSList(T) -{ -@safe nothrow: - - import core.atomic : cas; - - private T* m_first; - - @property T* first() { return m_first; } - @property shared(T)* first() shared @trusted { return atomicLoad(m_first); } - - bool empty() const { return m_first is null; } - - void add(T* elem) - { - debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); - elem.next = m_first; - m_first = elem; - } - - bool remove(T* elem) - { - debug uint counter = 0; - T* w = m_first, wp; - while (w !is elem) { - if (!w) return false; - wp = w; - w = w.next; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } - if (wp) wp.next = w.next; - else m_first = w.next; - return true; - } - - void filter(scope bool delegate(T* el) @safe nothrow del) - { - debug uint counter = 0; - T* w = m_first, pw; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) { - if (pw) pw.next = wnext; - else m_first = wnext; - } else pw = w; - w = wnext; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } - } - - void iterate(scope bool delegate(T* el) @safe nothrow del) - { - debug uint counter = 0; - T* w = m_first; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) break; - w = wnext; - debug assert(++counter < 1_000_000, "Cycle in linked list?"); - } } } @@ -1366,7 +1348,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { { } - @trusted bool tryLock() { if (cas(&m_locked, false, true)) { From 100dfc30eec09a07f9ec2def2f8265fb6ac3e15e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 16:10:18 +0200 Subject: [PATCH 6/9] Add test that triggers the former bug in ThreadLocalWaiter.emit() and ensures FIFO emit order. --- source/vibe/core/sync.d | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index d7c15c8..3220e5f 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -802,6 +802,30 @@ unittest { runEventLoop(); } +unittest { + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + + LocalManualEvent l = createManualEvent(); + + Task t2; + runTask({ + l.wait(); + t2.interrupt(); + sleep(20.msecs); + exitEventLoop(); + }); + t2 = runTask({ + try { + l.wait(); + assert(false, "Shouldn't reach this."); + } catch (InterruptException e) {} + }); + runTask({ + l.emit(); + }); + runEventLoop(); +} + /** A manually triggered multi threaded cross-task event. From f6736d13ab4c21a37f4869d3aff5404e3b262067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 16:14:47 +0200 Subject: [PATCH 7/9] Add test that reproduces the LocalManualEvent copy bug. --- source/vibe/core/sync.d | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 3220e5f..7f5edcc 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -802,7 +802,7 @@ unittest { runEventLoop(); } -unittest { +unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; LocalManualEvent l = createManualEvent(); @@ -826,6 +826,22 @@ unittest { runEventLoop(); } +unittest { // ensure that LocalManualEvent behaves correctly after being copied + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + + LocalManualEvent l = createManualEvent(); + runTask({ + auto lc = l; + sleep(100.msecs); + lc.emit(); + }); + runTask({ + assert(l.wait(1.seconds, l.emitCount)); + exitEventLoop(); + }); + runEventLoop(); +} + /** A manually triggered multi threaded cross-task event. From a6eeae97f39a35937fef286528ad7eb2fc19e652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 16:42:58 +0200 Subject: [PATCH 8/9] Fix DirectoryWatcher's creation of its LocalManualEvent. --- source/vibe/core/file.d | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index bc232ae..b804bf4 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -563,7 +563,7 @@ private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, Input */ struct DirectoryWatcher { // TODO: avoid all those heap allocations! import std.array : Appender, appender; - import vibe.core.sync : LocalManualEvent; + import vibe.core.sync : LocalManualEvent, createManualEvent; @safe: @@ -594,6 +594,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations! private this(NativePath path, bool recursive) { m_context = new Context; // FIME: avoid GC allocation (use FD user data slot) + m_context.changeEvent = createManualEvent(); m_watcher = eventDriver.watchers.watchDirectory(path.toNativeString, recursive, &m_context.onChange); m_context.path = path; m_context.recursive = recursive; From be0de0a733c500c1d03e21d9123d9713bccf442e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 20 Jul 2017 16:48:24 +0200 Subject: [PATCH 9/9] Add assertions for uninitialized LocalManualEvents. --- source/vibe/core/sync.d | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 7f5edcc..aaf298f 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -716,6 +716,7 @@ struct LocalManualEvent { /// Emits the signal, waking up all owners of the signal. int emit() nothrow { + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); logTrace("unshared emit"); auto ec = m_waiter.m_emitCount++; m_waiter.emit(); @@ -725,6 +726,7 @@ struct LocalManualEvent { /// Emits the signal, waking up a single owners of the signal. int emitSingle() nothrow { + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); logTrace("unshared single emit"); auto ec = m_waiter.m_emitCount++; m_waiter.emitSingle(); @@ -768,6 +770,8 @@ struct LocalManualEvent { { import std.datetime : Clock, SysTime, UTC; + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); + SysTime target_timeout, now; if (timeout != Duration.max) { try now = Clock.currTime(UTC());