diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index bc232ae..b804bf4 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -563,7 +563,7 @@ private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, Input */ struct DirectoryWatcher { // TODO: avoid all those heap allocations! import std.array : Appender, appender; - import vibe.core.sync : LocalManualEvent; + import vibe.core.sync : LocalManualEvent, createManualEvent; @safe: @@ -594,6 +594,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations! private this(NativePath path, bool recursive) { m_context = new Context; // FIME: avoid GC allocation (use FD user data slot) + m_context.changeEvent = createManualEvent(); m_watcher = eventDriver.watchers.watchDirectory(path.toNativeString, recursive, &m_context.onChange); m_context.path = path; m_context.recursive = recursive; diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index a3b9976..aaf298f 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -23,7 +23,9 @@ import std.traits : ReturnType; */ LocalManualEvent createManualEvent() @safe { - return LocalManualEvent.init; + LocalManualEvent ret; + ret.initialize(); + return ret; } /// ditto shared(ManualEvent) createSharedManualEvent() @@ -319,7 +321,6 @@ unittest { } } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { // test deferred throwing import vibe.core.core; @@ -357,7 +358,6 @@ unittest { // test deferred throwing runEventLoop(); } -version (VibeLibevDriver) {} else // timers are not implemented for libev, yet unittest { runMutexUnitTests!TaskMutex(); } @@ -679,28 +679,46 @@ struct LocalManualEvent { @safe: private { - int m_emitCount; - ThreadLocalWaiter m_waiter; + alias Waiter = ThreadLocalWaiter!false; + + Waiter m_waiter; } // thread destructor in vibe.core.core will decrement the ref. count package static EventID ms_threadEvent; - //@disable this(this); // FIXME: commenting this out this is not a good idea... + private void initialize() + { + import vibe.internal.allocator : Mallocator, makeGCSafe; + m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); + } - deprecated("LocalManualEvent is always non-null!") - bool opCast() const nothrow { return true; } + this(this) + { + if (m_waiter) + return m_waiter.addRef(); + } + + ~this() + { + import vibe.internal.allocator : Mallocator, disposeGCSafe; + if (m_waiter) { + if (!m_waiter.releaseRef()) + () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); + } + } + + bool opCast() const nothrow { return m_waiter !is null; } /// A counter that is increased with every emit() call - int emitCount() const nothrow { return m_emitCount; } - /// ditto - int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } + int emitCount() const nothrow { return m_waiter.m_emitCount; } /// Emits the signal, waking up all owners of the signal. int emit() nothrow { + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); logTrace("unshared emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emit(); return ec; } @@ -708,8 +726,9 @@ struct LocalManualEvent { /// Emits the signal, waking up a single owners of the signal. int emitSingle() nothrow { + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); logTrace("unshared single emit"); - auto ec = m_emitCount++; + auto ec = m_waiter.m_emitCount++; m_waiter.emitSingle(); return ec; } @@ -751,6 +770,8 @@ struct LocalManualEvent { { import std.datetime : Clock, SysTime, UTC; + assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); + SysTime target_timeout, now; if (timeout != Duration.max) { try now = Clock.currTime(UTC()); @@ -758,14 +779,14 @@ struct LocalManualEvent { target_timeout = now + timeout; } - while (m_emitCount <= emit_count) { + while (m_waiter.m_emitCount <= emit_count) { m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); try now = Clock.currTime(UTC()); catch (Exception e) { assert(false, e.msg); } if (now >= target_timeout) break; } - return m_emitCount; + return m_waiter.m_emitCount; } } @@ -785,6 +806,46 @@ unittest { runEventLoop(); } +unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + + LocalManualEvent l = createManualEvent(); + + Task t2; + runTask({ + l.wait(); + t2.interrupt(); + sleep(20.msecs); + exitEventLoop(); + }); + t2 = runTask({ + try { + l.wait(); + assert(false, "Shouldn't reach this."); + } catch (InterruptException e) {} + }); + runTask({ + l.emit(); + }); + runEventLoop(); +} + +unittest { // ensure that LocalManualEvent behaves correctly after being copied + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + + LocalManualEvent l = createManualEvent(); + runTask({ + auto lc = l; + sleep(100.msecs); + lc.emit(); + }); + runTask({ + assert(l.wait(1.seconds, l.emitCount)); + exitEventLoop(); + }); + runEventLoop(); +} + /** A manually triggered multi threaded cross-task event. @@ -793,14 +854,17 @@ unittest { struct ManualEvent { import core.thread : Thread; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; + import vibe.internal.list : StackSList; @safe: private { + alias ThreadWaiter = ThreadLocalWaiter!true; + int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; // actively waiting - StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs + StackSList!ThreadWaiter active; // actively waiting + StackSList!ThreadWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -813,7 +877,7 @@ struct ManualEvent { all } - @disable this(this); // FIXME: commenting this out this is not a good idea... + @disable this(this); deprecated("ManualEvent is always non-null!") bool opCast() const shared nothrow { return true; } @@ -831,12 +895,14 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.filter((ThreadLocalWaiter* w) { + m_waiters.lock.active.filter((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.init); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -845,7 +911,10 @@ struct ManualEvent { return true; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emit(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -862,13 +931,15 @@ struct ManualEvent { auto ec = atomicOp!"+="(m_emitCount, 1); auto thisthr = Thread.getThis(); - ThreadLocalWaiter* lw; + ThreadWaiter lw; auto drv = eventDriver; - m_waiters.lock.active.iterate((ThreadLocalWaiter* w) { + m_waiters.lock.active.iterate((ThreadWaiter w) { () @trusted { logTrace("waiter %s", cast(void*)w); } (); if (w.unused) return true; - if (w.m_driver is drv) lw = w; - else { + if (w.m_driver is drv) { + lw = w; + lw.addRef(); + } else { try { assert(w.m_event != EventID.invalid); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); @@ -877,7 +948,10 @@ struct ManualEvent { return false; }); () @trusted { logTrace("lw %s", cast(void*)lw); } (); - if (lw) lw.emit(); + if (lw) { + lw.emitSingle(); + releaseWaiter(lw); + } logTrace("emit shared done"); @@ -937,7 +1011,7 @@ struct ManualEvent { int ec = this.emitCount; - acquireThreadWaiter((ref ThreadLocalWaiter w) { + acquireThreadWaiter((scope ThreadWaiter w) { while (ec <= emit_count) { w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count); ec = this.emitCount; @@ -955,10 +1029,9 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : processAllocator, make; - import core.memory : GC; + import vibe.internal.allocator : processAllocator, makeGCSafe; - ThreadLocalWaiter* w; + ThreadWaiter w; auto drv = eventDriver; with (m_waiters.lock) { @@ -984,34 +1057,37 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = processAllocator.make!ThreadLocalWaiter; + w = processAllocator.makeGCSafe!ThreadWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; - GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); } catch (Exception e) { assert(false, "Failed to allocate thread waiter."); } } (); } + assert(w.m_refCount == 1); active.add(w); } } - scope (exit) { - if (!w.releaseRef()) { - assert(w.m_driver is drv); - assert(w.unused); - with (m_waiters.lock) { - auto rmvd = active.remove(w); - assert(rmvd); - free.add(w); - // TODO: cap size of m_freeWaiters - } + scope (exit) releaseWaiter(w); + + del(w); + } + + private void releaseWaiter(ThreadWaiter w) + shared nothrow { + if (!w.releaseRef()) { + assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?"); + assert(w.unused, "Waiter still used, but not referenced!?"); + with (m_waiters.lock) { + auto rmvd = active.remove(w); + assert(rmvd, "Waiter not in active queue anymore!?"); + free.add(w); + // TODO: cap size of m_freeWaiters } } - - del(*w); } } @@ -1155,10 +1231,12 @@ package struct SpinLock { } } -private struct ThreadLocalWaiter { +private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { + import vibe.internal.list : CircularDList; + private { - static struct Waiter { - Waiter* next; + static struct TaskWaiter { + TaskWaiter* prev, next; Task task; void delegate() @safe nothrow notifier; bool cancelled; @@ -1170,39 +1248,52 @@ private struct ThreadLocalWaiter { void cancel() @safe nothrow { cancelled = true; notifier = null; } } - ThreadLocalWaiter* next; - NativeEventDriver m_driver; - EventID m_event = EventID.invalid; - Waiter* m_waiters; + static if (EVENT_TRIGGERED) { + package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread + NativeEventDriver m_driver; + EventID m_event = EventID.invalid; + } else { + int m_emitCount = 0; + } int m_refCount = 1; + TaskWaiter m_pivot; + TaskWaiter m_emitPivot; + CircularDList!(TaskWaiter*) m_waiters; } - this(this) - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.addRef(m_event); + this() + { + m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); } - ~this() - @safe nothrow { - if (m_event != EventID.invalid) - m_driver.events.releaseRef(m_event); + static if (EVENT_TRIGGERED) { + ~this() + { + if (m_event != EventID.invalid) + eventDriver.events.releaseRef(m_event); + } } - @property bool unused() const @safe nothrow { return m_waiters is null; } + @property bool unused() const @safe nothrow { return m_waiters.empty; } - void addRef() @safe nothrow { m_refCount++; } - bool releaseRef() @safe nothrow { return --m_refCount > 0; } + void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; } + bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; } bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; import vibe.internal.async : Waitable, asyncAwaitAny; - Waiter w; - Waiter* pw = () @trusted { return &w; } (); - w.next = m_waiters; - m_waiters = pw; + TaskWaiter waiter_store; + TaskWaiter* waiter = () @trusted { return &waiter_store; } (); + + m_waiters.insertBack(waiter); + assert(waiter.next !is null); + scope (exit) + if (waiter.next !is null) { + m_waiters.remove(waiter); + assert(!waiter.next); + } SysTime target_timeout, now; if (timeout != Duration.max) { @@ -1211,32 +1302,16 @@ private struct ThreadLocalWaiter { target_timeout = now + timeout; } - Waitable!(typeof(Waiter.notifier), - cb => w.wait(cb), - cb => w.cancel(), + Waitable!(typeof(TaskWaiter.notifier), + cb => waiter.wait(cb), + cb => waiter.cancel(), ) waitable; - void removeWaiter() - @safe nothrow { - Waiter* piw = m_waiters, ppiw = null; - while (piw !is null) { - if (piw is pw) { - if (ppiw) ppiw.next = piw.next; - else m_waiters = piw.next; - break; - } - ppiw = piw; - piw = piw.next; - } - } - - scope (failure) removeWaiter(); - if (evt != EventID.invalid) { Waitable!(EventCallback, (cb) { eventDriver.events.wait(evt, cb); - // check for exit codition *after* starting to wait for the event + // check for exit condition *after* starting to wait for the event // to avoid a race condition if (exit_condition()) { eventDriver.events.cancelWait(evt, cb); @@ -1251,109 +1326,57 @@ private struct ThreadLocalWaiter { } if (waitable.cancelled) { - removeWaiter(); + assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); return false; - } else debug { - Waiter* piw = m_waiters; - while (piw !is null) { - assert(piw !is pw, "Thread local waiter still in queue after it got notified!?"); - piw = piw.next; - } + } else { + assert(waiter.next is null, "Triggered waiter still in queue!?"); + return true; } - - return true; } - bool emit() + void emit() @safe nothrow { - if (!m_waiters) return false; + import std.algorithm.mutation : swap; + import vibe.core.core : yield; - Waiter* waiters = m_waiters; - m_waiters = null; - while (waiters) { - auto wnext = waiters.next; - assert(wnext !is waiters); - if (waiters.notifier !is null) { - logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr); - waiters.notifier(); - waiters.notifier = null; - } else logTrace("notify callback is null"); - waiters = wnext; + if (m_waiters.empty) return; + + TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); + + if (pivot.next) { // another emit in progress? + // shift pivot to the end, so that the other emit call will process all pending waiters + if (pivot !is m_waiters.back) { + m_waiters.remove(pivot); + m_waiters.insertBack(pivot); + } + return; } - return true; + m_waiters.insertBack(pivot); + scope (exit) m_waiters.remove(pivot); + + foreach (w; m_waiters) { + if (w is pivot) break; + emitWaiter(w); + } } bool emitSingle() @safe nothrow { - if (!m_waiters) return false; + if (m_waiters.empty) return false; + emitWaiter(m_waiters.front); + return true; + } + + private void emitWaiter(TaskWaiter* w) + @safe nothrow { + m_waiters.remove(w); - auto w = m_waiters; - m_waiters = w.next; if (w.notifier !is null) { logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); w.notifier(); w.notifier = null; } else logTrace("notify callback is null"); - - return true; - } -} - -private struct StackSList(T) -{ -@safe nothrow: - - import core.atomic : cas; - - private T* m_first; - - @property T* first() { return m_first; } - @property shared(T)* first() shared @trusted { return atomicLoad(m_first); } - - bool empty() const { return m_first is null; } - - void add(T* elem) - { - debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); - elem.next = m_first; - m_first = elem; - } - - bool remove(T* elem) - { - T* w = m_first, wp; - while (w !is elem) { - if (!w) return false; - wp = w; - w = w.next; - } - if (wp) wp.next = w.next; - else m_first = w.next; - return true; - } - - void filter(scope bool delegate(T* el) @safe nothrow del) - { - T* w = m_first, pw; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) { - if (pw) pw.next = wnext; - else m_first = wnext; - } else pw = w; - w = wnext; - } - } - - void iterate(scope bool delegate(T* el) @safe nothrow del) - { - T* w = m_first; - while (w !is null) { - auto wnext = w.next; - if (!del(w)) break; - w = wnext; - } } } @@ -1369,7 +1392,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { { } - @trusted bool tryLock() { if (cas(&m_locked, false, true)) { @@ -2007,4 +2029,4 @@ class InterruptibleTaskReadWriteMutex /** The policy with which the lock has been created. */ @property Policy policy() const { return m_state.policy; } -} \ No newline at end of file +} diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 437d50c..e146d76 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -809,7 +809,7 @@ package struct TaskScheduler { Returns `true` $(I iff) there are more tasks left to process. */ ScheduleStatus schedule() - { + nothrow { if (m_taskQueue.empty) return ScheduleStatus.idle; @@ -846,7 +846,7 @@ package struct TaskScheduler { /// Resumes execution of a yielded task. private void resumeTask(Task t) - { + nothrow { import std.encoding : sanitize; logTrace("task fiber resume"); diff --git a/source/vibe/internal/allocator.d b/source/vibe/internal/allocator.d index 596b88f..d496aef 100644 --- a/source/vibe/internal/allocator.d +++ b/source/vibe/internal/allocator.d @@ -16,3 +16,26 @@ public import std.experimental.allocator.mallocator; s_threadAllocator = () @trusted { return allocatorObject(GCAllocator.instance); } (); return s_threadAllocator; } + +auto makeGCSafe(T, Allocator, A...)(Allocator allocator, A args) +{ + import core.memory : GC; + import std.traits : hasIndirections; + + auto ret = allocator.make!T(args); + static if (is (T == class)) enum tsize = __traits(classInstanceSize, T); + else enum tsize = T.sizeof; + static if (hasIndirections!T) + GC.addRange(cast(void*)ret, tsize, typeid(T)); + return ret; +} + +void disposeGCSafe(T, Allocator)(Allocator allocator, T obj) +{ + import core.memory : GC; + import std.traits : hasIndirections; + + static if (hasIndirections!T) + GC.removeRange(cast(void*)obj); + allocator.dispose(obj); +} diff --git a/source/vibe/internal/list.d b/source/vibe/internal/list.d new file mode 100644 index 0000000..cfc4135 --- /dev/null +++ b/source/vibe/internal/list.d @@ -0,0 +1,192 @@ +module vibe.internal.list; + +import core.atomic; + +struct CircularDList(T) +{ + private { + T m_pivot; + } + + this(T pivot) + { + assert(pivot.prev is null && pivot.next is null); + pivot.next = pivot.prev = pivot; + m_pivot = pivot; + assert(this.empty); + } + + bool empty() const { return m_pivot.next is m_pivot; } + + + @property T front() { return m_pivot.next is m_pivot ? null : m_pivot.next; } + @property T back() { return m_pivot.prev is m_pivot ? null : m_pivot.prev; } + + void remove(T elem) + { + assert(elem !is m_pivot); + elem.prev.next = elem.next; + elem.next.prev = elem.prev; + elem.prev = elem.next = null; + } + + void insertBefore(T elem, T pivot) + { + assert(elem.prev is null && elem.next is null); + elem.prev = pivot.prev; + elem.next = pivot; + pivot.prev.next = elem; + pivot.prev = elem; + } + + void insertFront(T elem) { insertBefore(elem, m_pivot.next); } + + void insertBack(T elem) { insertBefore(elem, m_pivot); } + + // NOTE: allowed to remove the current element + int opApply(int delegate(T) @safe nothrow del) + @safe nothrow { + T prev = m_pivot; + debug size_t counter = 0; + while (prev.next !is m_pivot) { + auto el = prev.next; + if (auto ret = del(el)) + return ret; + if (prev.next is el) + prev = prev.next; + debug assert (++counter < 1_000_000, "Cycle in list?"); + } + return 0; + } +} + +unittest { + static final class C { + C prev, next; + int i; + this(int i) { this.i = i; } + } + + alias L = CircularDList!C; + auto l = L(new C(0)); + assert(l.empty); + + auto c = new C(1); + l.insertBack(c); + assert(!l.empty); + assert(l.front is c); + assert(l.back is c); + foreach (c; l) assert(c.i == 1); + + auto c2 = new C(2); + l.insertFront(c2); + assert(!l.empty); + assert(l.front is c2); + assert(l.back is c); + foreach (c; l) assert(c.i == 1 || c.i == 2); + + l.remove(c); + assert(!l.empty); + assert(l.front is c2); + assert(l.back is c2); + foreach (c; l) assert(c.i == 2); + + l.remove(c2); + assert(l.empty); +} + + +struct StackSList(T) +{ +@safe nothrow: + + import core.atomic : cas; + + private T m_first; + + @property T first() { return m_first; } + @property T front() { return m_first; } + + bool empty() const { return m_first is null; } + + void add(T elem) + { + debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); + elem.next = m_first; + m_first = elem; + } + + bool remove(T elem) + { + debug uint counter = 0; + T w = m_first, wp; + while (w !is elem) { + if (!w) return false; + wp = w; + w = w.next; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + if (wp) wp.next = w.next; + else m_first = w.next; + return true; + } + + void filter(scope bool delegate(T el) @safe nothrow del) + { + debug uint counter = 0; + T w = m_first, pw; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) { + if (pw) pw.next = wnext; + else m_first = wnext; + } else pw = w; + w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + } + + void iterate(scope bool delegate(T el) @safe nothrow del) + { + debug uint counter = 0; + T w = m_first; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) break; + w = wnext; + debug assert(++counter < 1_000_000, "Cycle in linked list?"); + } + } +} + +unittest { + static final class C { + C next; + int i; + this(int i) { this.i = i; } + } + + alias L = StackSList!C; + L l; + assert(l.empty); + + auto c = new C(1); + l.add(c); + assert(!l.empty); + assert(l.front is c); + l.iterate((el) { assert(el.i == 1); return true; }); + + auto c2 = new C(2); + l.add(c2); + assert(!l.empty); + assert(l.front is c2); + l.iterate((el) { assert(el.i == 1 || el.i == 2); return true; }); + + l.remove(c); + assert(!l.empty); + assert(l.front is c2); + l.iterate((el) { assert(el.i == 2); return true; }); + + l.filter((el) => el.i == 0); + assert(l.empty); +}