diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 64f70cf..190fc71 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1333,6 +1333,10 @@ static this() synchronized (st_threadsMutex) if (!st_threads.any!(c => c.thread is thisthr)) st_threads ~= ThreadContext(thisthr, false); + + + import vibe.core.sync : SpinLock; + SpinLock.setup(); } static ~this() diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index 4cd5f8d..ea0d463 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -546,7 +546,7 @@ private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, Input */ struct DirectoryWatcher { // TODO: avoid all those heap allocations! import std.array : Appender, appender; - import vibe.core.sync : ManualEvent; + import vibe.core.sync : LocalManualEvent; @safe: @@ -554,7 +554,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations! Path path; bool recursive; Appender!(DirectoryChange[]) changes; - ManualEvent changeEvent; + LocalManualEvent changeEvent; } private { diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index bb4b2cf..1f1885f 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -21,9 +21,9 @@ import std.traits : ReturnType; /** Creates a new signal that can be shared between fibers. */ -ManualEvent createManualEvent() +LocalManualEvent createManualEvent() @safe { - return ManualEvent.init; + return LocalManualEvent.init; } /// ditto shared(ManualEvent) createSharedManualEvent() @@ -147,7 +147,7 @@ class LocalTaskSemaphore private { static struct ThreadWaiter { - ManualEvent signal; + LocalManualEvent signal; ubyte priority; uint seq; } @@ -669,54 +669,28 @@ final class InterruptibleTaskCondition { } -/** A manually triggered cross-task event. +/** A manually triggered single threaded cross-task event. - Note: the ownership can be shared between multiple fibers and threads. + Note: the ownership can be shared between multiple fibers of the same thread. */ -struct ManualEvent { +struct LocalManualEvent { import core.thread : Thread; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; @safe: private { - static struct ThreadWaiter { - ThreadWaiter* next; - EventID event; - EventDriver driver; - Thread thread; - StackSList!LocalWaiter tasks; - } - static struct LocalWaiter { - LocalWaiter* next; - Task task; - void delegate() @safe nothrow notifier; - bool cancelled = false; - - void wait(void delegate() @safe nothrow del) @safe nothrow { - assert(notifier is null, "Local waiter is used twice!"); - notifier = del; - } - void cancel() @safe nothrow { cancelled = true; notifier = null; } - } int m_emitCount; - ThreadWaiter* m_waiters; + ThreadLocalWaiter m_waiter; } // thread destructor in vibe.core.core will decrement the ref. count package static EventID ms_threadEvent; - enum EmitMode { - single, - all - } - //@disable this(this); // FIXME: commenting this out this is not a good idea... - deprecated("ManualEvent is always non-null!") + deprecated("LocalManualEvent is always non-null!") bool opCast() const nothrow { return true; } - deprecated("ManualEvent is always non-null!") - bool opCast() const shared nothrow { return true; } /// A counter that is increased with every emit() call int emitCount() const nothrow { return m_emitCount; } @@ -724,70 +698,20 @@ struct ManualEvent { int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } /// Emits the signal, waking up all owners of the signal. - int emit(EmitMode mode = EmitMode.all) - shared nothrow @trusted { - import core.atomic : atomicOp, cas; - - logTrace("emit shared"); - - auto ec = atomicOp!"+="(m_emitCount, 1); - auto thisthr = Thread.getThis(); - - final switch (mode) { - case EmitMode.all: - // FIXME: would be nice to have atomicSwap instead - auto w = cast(ThreadWaiter*)atomicLoad(m_waiters); - if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) { - logTrace("Another thread emitted concurrently - returning."); - return ec; - } - while (w !is null) { - // Note: emitForThisThread can result in w getting deallocated at any - // time, so we need to copy any fields first - auto wnext = w.next; - atomicStore((cast(shared)w).next, null); - assert(wnext !is w, "Same waiter enqueued twice!?"); - if (w.driver is eventDriver) { - logTrace("Same thread emit (%s/%s)", cast(void*)w, cast(void*)w.tasks.first); - emitForThisThread(w.tasks.m_first, mode); - } else { - logTrace("Foreign thread \"%s\" notify: %s", w.thread.name, w.event); - auto drv = w.driver; - auto evt = w.event; - if (evt != EventID.init) - (cast(shared)drv.events).trigger(evt, true); - } - w = wnext; - } - break; - case EmitMode.single: - assert(false); - } - - logTrace("emit shared done"); - + int emit() + nothrow { + logTrace("unshared emit"); + auto ec = m_emitCount++; + m_waiter.emit(); return ec; } - /// ditto - int emit(EmitMode mode = EmitMode.all) + + /// Emits the signal, waking up a single owners of the signal. + int emitSingle() nothrow { + logTrace("unshared single emit"); auto ec = m_emitCount++; - - logTrace("unshared emit"); - - final switch (mode) { - case EmitMode.all: - auto w = m_waiters; - m_waiters = null; - if (w !is null) { - assert(w.driver is eventDriver, "Unshared ManualEvent has waiters in foreign thread!"); - assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!"); - emitForThisThread(w.tasks.m_first, EmitMode.all); - } - break; - case EmitMode.single: - assert(false); - } + m_waiter.emitSingle(); return ec; } @@ -801,8 +725,6 @@ struct ManualEvent { using $(D Task.interrupt()). */ int wait() { return wait(this.emitCount); } - /// ditto - int wait() shared { return wait(this.emitCount); } /** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reached. @@ -813,9 +735,144 @@ struct ManualEvent { */ int wait(int emit_count) { return doWait!true(Duration.max, emit_count); } /// ditto - int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); } - /// ditto int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); } + + /** Same as $(D wait), but defers throwing any $(D InterruptException). + + This method is annotated $(D nothrow) at the expense that it cannot be + interrupted. + */ + int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); } + /// ditto + int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); } + /// ditto + int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); } + + private int doWait(bool interruptible)(Duration timeout, int emit_count) + { + import std.datetime : Clock, SysTime, UTC; + + SysTime target_timeout, now; + if (timeout != Duration.max) { + try now = Clock.currTime(UTC()); + catch (Exception e) { assert(false, e.msg); } + target_timeout = now + timeout; + } + + while (m_emitCount <= emit_count) { + m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); + try now = Clock.currTime(UTC()); + catch (Exception e) { assert(false, e.msg); } + if (now >= target_timeout) break; + } + + return m_emitCount; + } +} + +unittest { + import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; + + auto e = createManualEvent(); + auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); + auto w2 = runTask({ e.wait(500.msecs, e.emitCount); }); + runTask({ + sleep(50.msecs); + e.emit(); + sleep(50.msecs); + assert(!w1.running && !w2.running); + exitEventLoop(); + }); + runEventLoop(); +} + + +/** A manually triggered multi threaded cross-task event. + + Note: the ownership can be shared between multiple fibers and threads. +*/ +struct ManualEvent { + import core.thread : Thread; + import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; + + @safe: + + private { + int m_emitCount; + static struct Waiters { + StackSList!ThreadLocalWaiter active; + StackSList!ThreadLocalWaiter free; + } + Monitor!(Waiters, SpinLock) m_waiters; + } + + // thread destructor in vibe.core.core will decrement the ref. count + package static EventID ms_threadEvent; + + enum EmitMode { + single, + all + } + + @disable this(this); // FIXME: commenting this out this is not a good idea... + + deprecated("ManualEvent is always non-null!") + bool opCast() const shared nothrow { return true; } + + /// A counter that is increased with every emit() call + int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } + + /// Emits the signal, waking up all owners of the signal. + int emit() + shared nothrow @trusted { + import core.atomic : atomicOp, cas; + + () @trusted { logTrace("emit shared %s", cast(void*)&this); } (); + + auto ec = atomicOp!"+="(m_emitCount, 1); + auto thisthr = Thread.getThis(); + + ThreadLocalWaiter* lw; + auto drv = eventDriver; + m_waiters.lock((ref waiters) { + waiters.active.filter((ThreadLocalWaiter* w) { + () @trusted { logTrace("waiter %s", cast(void*)w); } (); + if (w.m_driver is drv) lw = w; + else { + try { + () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event); + } catch (Exception e) assert(false, e.msg); + } + return true; + }); + }); + () @trusted { logTrace("lw %s", cast(void*)lw); } (); + if (lw) lw.emit(); + + logTrace("emit shared done"); + + return ec; + } + + /** Acquires ownership and waits until the signal is emitted. + + Note that in order not to miss any emits it is necessary to use the + overload taking an integer. + + Throws: + May throw an $(D InterruptException) if the task gets interrupted + using $(D Task.interrupt()). + */ + int wait() shared { return wait(this.emitCount); } + + /** Acquires ownership and waits until the emit count differs from the + given one or until a timeout is reached. + + Throws: + May throw an $(D InterruptException) if the task gets interrupted + using $(D Task.interrupt()). + */ + int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); } /// ditto int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); } @@ -824,55 +881,21 @@ struct ManualEvent { This method is annotated $(D nothrow) at the expense that it cannot be interrupted. */ - int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); } - /// int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } /// ditto - int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); } - /// ditto int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); } /// ditto - int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); } - /// ditto int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); } - private int doWait(bool interruptible)(Duration timeout, int emit_count) - { - import std.datetime : SysTime, Clock, UTC; - - SysTime target_timeout, now; - if (timeout != Duration.max) { - try now = Clock.currTime(UTC()); - catch (Exception e) { assert(false, e.msg); } - target_timeout = now + timeout; - } - - int ec = this.emitCount; - while (ec <= emit_count) { - ThreadWaiter w; - LocalWaiter lw; - () @trusted { acquireWaiter(&w, &lw); } (); - - Waitable!( - cb => lw.wait(cb), - cb => lw.cancel() - ) waitable; - asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); - ec = this.emitCount; - - if (timeout != Duration.max) { - try now = Clock.currTime(UTC()); - catch (Exception e) { assert(false, e.msg); } - if (now >= target_timeout) break; - } -} - return ec; - } - private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) shared { import std.datetime : SysTime, Clock, UTC; + () @trusted { logTrace("wait shared %s", cast(void*)&this); } (); + + if (ms_threadEvent is EventID.invalid) + ms_threadEvent = eventDriver.events.create(); + SysTime target_timeout, now; if (timeout != Duration.max) { try now = Clock.currTime(UTC()); @@ -881,211 +904,293 @@ struct ManualEvent { } int ec = this.emitCount; - while (ec <= emit_count) { - shared(ThreadWaiter) w; - LocalWaiter lw; - () @trusted { acquireWaiter(&w, &lw); } (); - () @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } (); - scope (exit) { - shared(ThreadWaiter)* pw = atomicLoad(m_waiters); - while (pw !is null) { - assert(pw !is () @trusted { return &w; } (), "Thread waiter was not removed from queue."); - pw = pw.next; + acquireThreadWaiter((ref ThreadLocalWaiter w) { + while (ec <= emit_count) { + w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count); + ec = this.emitCount; + + if (timeout != Duration.max) { + try now = Clock.currTime(UTC()); + catch (Exception e) { assert(false, e.msg); } + if (now >= target_timeout) break; } } + }); - if (lw.next) { - // if we are not the first waiter for this thread, - // wait for getting resumed by emitForThisThread - Waitable!( - cb => lw.wait(cb), - cb => lw.cancel() - ) waitable; - asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); - if (waitable.cancelled) break; // timeout - } else { - again: - // if we are the first waiter for this thread, - // wait for the thread event to get emitted - Waitable!( - cb => eventDriver.events.wait(ms_threadEvent, cb), - cb => eventDriver.events.cancelWait(ms_threadEvent, cb), - EventID - ) eventwaiter; - Waitable!( - cb => lw.wait(cb), - cb => lw.cancel() - ) localwaiter; - logDebugV("Wait on event %s", ms_threadEvent); - asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter); - - if (!eventwaiter.cancelled) { - if (() @trusted { return atomicLoad(w.next); } () is null) - emitForThisThread(() @trusted { return cast(LocalWaiter*)w.tasks.m_first; } (), EmitMode.all); // FIXME: use proper emit mode - else goto again; - } else if (localwaiter.cancelled) break; // timeout - } - - () @trusted { - assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w, - "Waiter did not get removed from waiter queue."); - }(); - - ec = this.emitCount; - - if (timeout != Duration.max) { - try now = Clock.currTime(UTC()); - catch (Exception e) { assert(false, e.msg); } - if (now >= target_timeout) break; - } - } return ec; } - private static bool emitForThisThread(LocalWaiter* waiters, EmitMode mode) - nothrow { - if (!waiters) return false; + private void acquireThreadWaiter(DEL)(scope DEL del) + shared { + import vibe.internal.allocator : theAllocator, make; + import core.memory : GC; - logTrace("emitForThisThread"); + ThreadLocalWaiter* w; + auto drv = eventDriver; - final switch (mode) { - case EmitMode.all: - while (waiters) { - auto wnext = waiters.next; - assert(wnext !is waiters); - if (waiters.notifier !is null) { - logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr); - waiters.notifier(); - waiters.notifier = null; - } else logTrace("notify callback is null"); - waiters = wnext; + m_waiters.lock((ref waiters) { + waiters.active.filter((aw) { + if (aw.m_driver is drv) { + w = aw; } - break; - case EmitMode.single: - assert(false, "TODO!"); + return true; + }); + + if (!w) { + waiters.free.filter((fw) { + if (fw.m_driver is drv) { + w = fw; + return false; + } + return true; + }); + + if (!w) { + () @trusted { + try { + w = theAllocator.make!ThreadLocalWaiter; + w.m_driver = drv; + w.m_event = ms_threadEvent; + GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); + } catch (Exception e) { + assert(false, "Failed to allocate thread waiter."); + } + } (); + } + + waiters.active.add(w); + } + }); + + scope (exit) { + if (w.unused) { + m_waiters.lock((ref waiters) { + waiters.active.remove(w); + waiters.free.add(w); + // TODO: cap size of m_freeWaiters + }); + } + } + + del(*w); + } +} + +unittest { + import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep; + + auto e = createSharedManualEvent(); + auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); + static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); } + auto w2 = runWorkerTaskH(&w, &e); + runTask({ + sleep(50.msecs); + e.emit(); + sleep(50.msecs); + assert(!w1.running && !w2.running); + exitEventLoop(); + }); + runEventLoop(); +} + +private shared struct Monitor(T, M) +{ + private { + shared T m_data; + shared M m_mutex; + } + + void lock(scope void delegate(ref T) @safe nothrow access) + shared { + m_mutex.lock(); + scope (exit) m_mutex.unlock(); + access(*() @trusted { return cast(T*)&m_data; } ()); + } +} + +package struct SpinLock { + private shared int locked; + static int threadID = 0; + + static void setup() + { + import core.thread : Thread; + threadID = cast(int)cast(void*)Thread.getThis(); + } + + @safe nothrow @nogc shared: + + bool tryLock() + @trusted { + assert(atomicLoad(locked) != threadID, "Recursive lock attempt."); + return cas(&locked, 0, threadID); + } + + void lock() + { + while (!tryLock()) {} + } + + void unlock() + @trusted { + assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread."); + atomicStore(locked, 0); + } +} + +private struct ThreadLocalWaiter { + private { + static struct Waiter { + Waiter* next; + Task task; + void delegate() @safe nothrow notifier; + bool cancelled; + + void wait(void delegate() @safe nothrow del) @safe nothrow { + assert(notifier is null, "Local waiter is used twice!"); + notifier = del; + } + void cancel() @safe nothrow { cancelled = true; notifier = null; } + } + + ThreadLocalWaiter* next; + NativeEventDriver m_driver; + EventID m_event; + Waiter* m_waiters; + } + + this(this) + @safe nothrow { + if (m_event != EventID.invalid) + m_driver.events.addRef(m_event); + } + + ~this() + @safe nothrow { + if (m_event != EventID.invalid) + m_driver.events.releaseRef(m_event); + } + + @property bool unused() const @safe nothrow { return m_waiters is null; } + + bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) + @safe { + import std.datetime : SysTime, Clock, UTC; + import vibe.internal.async : Waitable, asyncAwaitAny; + + Waiter w; + Waiter* pw = () @trusted { return &w; } (); + w.next = m_waiters; + m_waiters = pw; + + SysTime target_timeout, now; + if (timeout != Duration.max) { + try now = Clock.currTime(UTC()); + catch (Exception e) { assert(false, e.msg); } + target_timeout = now + timeout; + } + + Waitable!( + cb => w.wait(cb), + cb => w.cancel() + ) waitable; + + void removeWaiter() + @safe nothrow { + Waiter* piw = m_waiters, ppiw = null; + while (piw !is null) { + if (piw is pw) { + if (ppiw) ppiw.next = piw.next; + else m_waiters = piw.next; + break; + } + ppiw = piw; + piw = piw.next; + } + } + + scope (failure) removeWaiter(); + + if (evt != EventID.invalid) { + Waitable!( + (cb) { + eventDriver.events.wait(evt, cb); + // check for exit codition *after* starting to wait for the event + // to avoid a race condition + if (exit_condition()) { + eventDriver.events.cancelWait(evt, cb); + cb(evt); + } + }, + cb => eventDriver.events.cancelWait(evt, cb), + EventID + ) ewaitable; + asyncAwaitAny!interruptible(timeout, waitable, ewaitable); + } else { + asyncAwaitAny!interruptible(timeout, waitable); + } + + if (waitable.cancelled) { + removeWaiter(); + return false; + } else debug { + Waiter* piw = m_waiters; + while (piw !is null) { + assert(piw !is pw, "Thread local waiter still in queue after it got notified!?"); + piw = piw.next; + } } return true; } - private void acquireWaiter(ThreadWaiter* w, LocalWaiter* lw) - nothrow { - // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling - lw.task = Task.getThis(); + bool emit() + @safe nothrow { + if (!m_waiters) return false; - if (m_waiters) { - m_waiters.tasks.add(lw); - } else { - m_waiters = w; + Waiter* waiters = m_waiters; + m_waiters = null; + while (waiters) { + auto wnext = waiters.next; + assert(wnext !is waiters); + if (waiters.notifier !is null) { + logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr); + waiters.notifier(); + waiters.notifier = null; + } else logTrace("notify callback is null"); + waiters = wnext; } + + return true; } - private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* lw) - nothrow shared { - lw.task = Task.getThis(); + bool emitSingle() + @safe nothrow { + if (!m_waiters) return false; - if (ms_threadEvent == EventID.init) - ms_threadEvent = eventDriver.events.create(); + auto w = m_waiters; + m_waiters = w.next; + if (w.notifier !is null) { + logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); + w.notifier(); + w.notifier = null; + } else logTrace("notify callback is null"); - auto sdriver = () @trusted { return cast(shared)eventDriver; } (); - - shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } (); - size_t cnt = 0; - while (pw !is null) { - assert(pw !is w, "Waiter is already registered!"); - if (pw.driver is sdriver) - break; - assert(cnt++ < 1000, "Recursive waiter?!"); - pw = () @trusted { return atomicLoad(pw.next); } (); - } - - if (!pw) { - pw = w; - shared(ThreadWaiter)* wn; - do { - wn = () @trusted { return atomicLoad(m_waiters); } (); - w.next = wn; - w.event = ms_threadEvent; - w.driver = sdriver; - w.thread = () @trusted { return cast(shared)Thread.getThis(); } (); - } while (!() @trusted { return cas(&m_waiters, wn, w); } ()); - } - - () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(lw); + return true; } } -unittest { - import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; - logInfo("A"); - - auto e = createManualEvent(); - auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); - auto w2 = runTask({ e.wait(500.msecs, e.emitCount); }); - runTask({ - sleep(200.msecs); - e.emit(); - sleep(50.msecs); - assert(!w1.running && !w2.running); - exitEventLoop(); - }); - runEventLoop(); - logInfo("B"); -} - -unittest { - import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep; - logInfo("C"); - - auto e = createSharedManualEvent(); - auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); - static void w(shared(ManualEvent) e){e.wait(500.msecs, e.emitCount);} - auto w2 = runWorkerTaskH(&w, e); - runTask({ - sleep(200.msecs); - e.emit(); - sleep(50.msecs); - assert(!w1.running && !w2.running); - exitEventLoop(); - }); - runEventLoop(); - logInfo("D"); -} - - private struct StackSList(T) { +@safe nothrow: + import core.atomic : cas; private T* m_first; @property T* first() { return m_first; } - @property shared(T)* first() shared { return atomicLoad(m_first); } - - void add(shared(T)* elem) - shared { - do elem.next = atomicLoad(m_first); - while (cas(&m_first, elem.next, elem)); - } - - void remove(shared(T)* elem) - shared { - while (true) { - shared(T)* w = atomicLoad(m_first), wp; - while (w !is elem) { - wp = w; - w = atomicLoad(w.next); - } - if (wp !is null) { - if (cas(&wp.next, w, w.next)) - break; - } else { - if (cas(&m_first, w, w.next)) - break; - } - } - } + @property shared(T)* first() shared @trusted { return atomicLoad(m_first); } bool empty() const { return m_first is null; } @@ -1095,16 +1200,30 @@ private struct StackSList(T) m_first = elem; } - void remove(T* elem) + bool remove(T* elem) { T* w = m_first, wp; while (w !is elem) { - assert(w !is null); + if (!w) return false; wp = w; w = w.next; } if (wp) wp.next = w.next; else m_first = w.next; + return true; + } + + void filter(scope bool delegate(T* el) @safe nothrow del) + { + T* w = m_first, pw; + while (w !is null) { + auto wnext = w.next; + if (!del(w)) { + if (pw) pw.next = wnext; + else m_first = wnext; + } else pw = w; + w = wnext; + } } } @@ -1118,7 +1237,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { void setup() { - m_signal = createSharedManualEvent(); } @@ -1173,7 +1291,6 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { void setup() { - m_signal = createSharedManualEvent(); m_mutex = new core.sync.mutex.Mutex; } @@ -1249,7 +1366,6 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { void setup(LOCKABLE mtx) { m_mutex = mtx; - m_signal = createSharedManualEvent(); } @property LOCKABLE mutex() { return m_mutex; }