Re-implement ManualEvent.

This simplifies the logic by separating thread local notifications from cross-thread notifications, as well as replacing lockless operations by a spin lock. The thread local variant of ManualEvent is now also separated into a LocalManualEvent type.
This commit is contained in:
Sönke Ludwig 2017-01-13 22:24:18 +01:00
parent e28c6950d7
commit fdfbb55aa8
3 changed files with 438 additions and 318 deletions

View file

@ -1333,6 +1333,10 @@ static this()
synchronized (st_threadsMutex) synchronized (st_threadsMutex)
if (!st_threads.any!(c => c.thread is thisthr)) if (!st_threads.any!(c => c.thread is thisthr))
st_threads ~= ThreadContext(thisthr, false); st_threads ~= ThreadContext(thisthr, false);
import vibe.core.sync : SpinLock;
SpinLock.setup();
} }
static ~this() static ~this()

View file

@ -546,7 +546,7 @@ private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, Input
*/ */
struct DirectoryWatcher { // TODO: avoid all those heap allocations! struct DirectoryWatcher { // TODO: avoid all those heap allocations!
import std.array : Appender, appender; import std.array : Appender, appender;
import vibe.core.sync : ManualEvent; import vibe.core.sync : LocalManualEvent;
@safe: @safe:
@ -554,7 +554,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
Path path; Path path;
bool recursive; bool recursive;
Appender!(DirectoryChange[]) changes; Appender!(DirectoryChange[]) changes;
ManualEvent changeEvent; LocalManualEvent changeEvent;
} }
private { private {

View file

@ -21,9 +21,9 @@ import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers. /** Creates a new signal that can be shared between fibers.
*/ */
ManualEvent createManualEvent() LocalManualEvent createManualEvent()
@safe { @safe {
return ManualEvent.init; return LocalManualEvent.init;
} }
/// ditto /// ditto
shared(ManualEvent) createSharedManualEvent() shared(ManualEvent) createSharedManualEvent()
@ -147,7 +147,7 @@ class LocalTaskSemaphore
private { private {
static struct ThreadWaiter { static struct ThreadWaiter {
ManualEvent signal; LocalManualEvent signal;
ubyte priority; ubyte priority;
uint seq; uint seq;
} }
@ -669,54 +669,28 @@ final class InterruptibleTaskCondition {
} }
/** A manually triggered cross-task event. /** A manually triggered single threaded cross-task event.
Note: the ownership can be shared between multiple fibers and threads. Note: the ownership can be shared between multiple fibers of the same thread.
*/ */
struct ManualEvent { struct LocalManualEvent {
import core.thread : Thread; import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
@safe: @safe:
private { private {
static struct ThreadWaiter {
ThreadWaiter* next;
EventID event;
EventDriver driver;
Thread thread;
StackSList!LocalWaiter tasks;
}
static struct LocalWaiter {
LocalWaiter* next;
Task task;
void delegate() @safe nothrow notifier;
bool cancelled = false;
void wait(void delegate() @safe nothrow del) @safe nothrow {
assert(notifier is null, "Local waiter is used twice!");
notifier = del;
}
void cancel() @safe nothrow { cancelled = true; notifier = null; }
}
int m_emitCount; int m_emitCount;
ThreadWaiter* m_waiters; ThreadLocalWaiter m_waiter;
} }
// thread destructor in vibe.core.core will decrement the ref. count // thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent; package static EventID ms_threadEvent;
enum EmitMode {
single,
all
}
//@disable this(this); // FIXME: commenting this out this is not a good idea... //@disable this(this); // FIXME: commenting this out this is not a good idea...
deprecated("ManualEvent is always non-null!") deprecated("LocalManualEvent is always non-null!")
bool opCast() const nothrow { return true; } bool opCast() const nothrow { return true; }
deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; }
/// A counter that is increased with every emit() call /// A counter that is increased with every emit() call
int emitCount() const nothrow { return m_emitCount; } int emitCount() const nothrow { return m_emitCount; }
@ -724,70 +698,20 @@ struct ManualEvent {
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal. /// Emits the signal, waking up all owners of the signal.
int emit(EmitMode mode = EmitMode.all) int emit()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
logTrace("emit shared");
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
final switch (mode) {
case EmitMode.all:
// FIXME: would be nice to have atomicSwap instead
auto w = cast(ThreadWaiter*)atomicLoad(m_waiters);
if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) {
logTrace("Another thread emitted concurrently - returning.");
return ec;
}
while (w !is null) {
// Note: emitForThisThread can result in w getting deallocated at any
// time, so we need to copy any fields first
auto wnext = w.next;
atomicStore((cast(shared)w).next, null);
assert(wnext !is w, "Same waiter enqueued twice!?");
if (w.driver is eventDriver) {
logTrace("Same thread emit (%s/%s)", cast(void*)w, cast(void*)w.tasks.first);
emitForThisThread(w.tasks.m_first, mode);
} else {
logTrace("Foreign thread \"%s\" notify: %s", w.thread.name, w.event);
auto drv = w.driver;
auto evt = w.event;
if (evt != EventID.init)
(cast(shared)drv.events).trigger(evt, true);
}
w = wnext;
}
break;
case EmitMode.single:
assert(false);
}
logTrace("emit shared done");
return ec;
}
/// ditto
int emit(EmitMode mode = EmitMode.all)
nothrow { nothrow {
auto ec = m_emitCount++;
logTrace("unshared emit"); logTrace("unshared emit");
auto ec = m_emitCount++;
m_waiter.emit();
return ec;
}
final switch (mode) { /// Emits the signal, waking up a single owners of the signal.
case EmitMode.all: int emitSingle()
auto w = m_waiters; nothrow {
m_waiters = null; logTrace("unshared single emit");
if (w !is null) { auto ec = m_emitCount++;
assert(w.driver is eventDriver, "Unshared ManualEvent has waiters in foreign thread!"); m_waiter.emitSingle();
assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!");
emitForThisThread(w.tasks.m_first, EmitMode.all);
}
break;
case EmitMode.single:
assert(false);
}
return ec; return ec;
} }
@ -801,8 +725,6 @@ struct ManualEvent {
using $(D Task.interrupt()). using $(D Task.interrupt()).
*/ */
int wait() { return wait(this.emitCount); } int wait() { return wait(this.emitCount); }
/// ditto
int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the /** Acquires ownership and waits until the emit count differs from the
given one or until a timeout is reached. given one or until a timeout is reached.
@ -813,9 +735,144 @@ struct ManualEvent {
*/ */
int wait(int emit_count) { return doWait!true(Duration.max, emit_count); } int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
/// ditto /// ditto
int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
/// ditto
int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); } int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
private int doWait(bool interruptible)(Duration timeout, int emit_count)
{
import std.datetime : Clock, SysTime, UTC;
SysTime target_timeout, now;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
while (m_emitCount <= emit_count) {
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
return m_emitCount;
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
auto e = createManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
/** A manually triggered multi threaded cross-task event.
Note: the ownership can be shared between multiple fibers and threads.
*/
struct ManualEvent {
import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
@safe:
private {
int m_emitCount;
static struct Waiters {
StackSList!ThreadLocalWaiter active;
StackSList!ThreadLocalWaiter free;
}
Monitor!(Waiters, SpinLock) m_waiters;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
enum EmitMode {
single,
all
}
@disable this(this); // FIXME: commenting this out this is not a good idea...
deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; }
/// A counter that is increased with every emit() call
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal.
int emit()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadLocalWaiter* lw;
auto drv = eventDriver;
m_waiters.lock((ref waiters) {
waiters.active.filter((ThreadLocalWaiter* w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.m_driver is drv) lw = w;
else {
try {
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event);
} catch (Exception e) assert(false, e.msg);
}
return true;
});
});
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) lw.emit();
logTrace("emit shared done");
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the
given one or until a timeout is reached.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
/// ditto /// ditto
int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); } int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
@ -824,55 +881,21 @@ struct ManualEvent {
This method is annotated $(D nothrow) at the expense that it cannot be This method is annotated $(D nothrow) at the expense that it cannot be
interrupted. interrupted.
*/ */
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
///
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
/// ditto /// ditto
int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); } int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
/// ditto /// ditto
int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); } int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
private int doWait(bool interruptible)(Duration timeout, int emit_count)
{
import std.datetime : SysTime, Clock, UTC;
SysTime target_timeout, now;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
int ec = this.emitCount;
while (ec <= emit_count) {
ThreadWaiter w;
LocalWaiter lw;
() @trusted { acquireWaiter(&w, &lw); } ();
Waitable!(
cb => lw.wait(cb),
cb => lw.cancel()
) waitable;
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
ec = this.emitCount;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
}
return ec;
}
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
shared { shared {
import std.datetime : SysTime, Clock, UTC; import std.datetime : SysTime, Clock, UTC;
() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
if (ms_threadEvent is EventID.invalid)
ms_threadEvent = eventDriver.events.create();
SysTime target_timeout, now; SysTime target_timeout, now;
if (timeout != Duration.max) { if (timeout != Duration.max) {
try now = Clock.currTime(UTC()); try now = Clock.currTime(UTC());
@ -881,57 +904,10 @@ struct ManualEvent {
} }
int ec = this.emitCount; int ec = this.emitCount;
acquireThreadWaiter((ref ThreadLocalWaiter w) {
while (ec <= emit_count) { while (ec <= emit_count) {
shared(ThreadWaiter) w; w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count);
LocalWaiter lw;
() @trusted { acquireWaiter(&w, &lw); } ();
() @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } ();
scope (exit) {
shared(ThreadWaiter)* pw = atomicLoad(m_waiters);
while (pw !is null) {
assert(pw !is () @trusted { return &w; } (), "Thread waiter was not removed from queue.");
pw = pw.next;
}
}
if (lw.next) {
// if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread
Waitable!(
cb => lw.wait(cb),
cb => lw.cancel()
) waitable;
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
if (waitable.cancelled) break; // timeout
} else {
again:
// if we are the first waiter for this thread,
// wait for the thread event to get emitted
Waitable!(
cb => eventDriver.events.wait(ms_threadEvent, cb),
cb => eventDriver.events.cancelWait(ms_threadEvent, cb),
EventID
) eventwaiter;
Waitable!(
cb => lw.wait(cb),
cb => lw.cancel()
) localwaiter;
logDebugV("Wait on event %s", ms_threadEvent);
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter);
if (!eventwaiter.cancelled) {
if (() @trusted { return atomicLoad(w.next); } () is null)
emitForThisThread(() @trusted { return cast(LocalWaiter*)w.tasks.m_first; } (), EmitMode.all); // FIXME: use proper emit mode
else goto again;
} else if (localwaiter.cancelled) break; // timeout
}
() @trusted {
assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w,
"Waiter did not get removed from waiter queue.");
}();
ec = this.emitCount; ec = this.emitCount;
if (timeout != Duration.max) { if (timeout != Duration.max) {
@ -940,17 +916,241 @@ struct ManualEvent {
if (now >= target_timeout) break; if (now >= target_timeout) break;
} }
} }
});
return ec; return ec;
} }
private static bool emitForThisThread(LocalWaiter* waiters, EmitMode mode) private void acquireThreadWaiter(DEL)(scope DEL del)
nothrow { shared {
if (!waiters) return false; import vibe.internal.allocator : theAllocator, make;
import core.memory : GC;
logTrace("emitForThisThread"); ThreadLocalWaiter* w;
auto drv = eventDriver;
final switch (mode) { m_waiters.lock((ref waiters) {
case EmitMode.all: waiters.active.filter((aw) {
if (aw.m_driver is drv) {
w = aw;
}
return true;
});
if (!w) {
waiters.free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
return false;
}
return true;
});
if (!w) {
() @trusted {
try {
w = theAllocator.make!ThreadLocalWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
} catch (Exception e) {
assert(false, "Failed to allocate thread waiter.");
}
} ();
}
waiters.active.add(w);
}
});
scope (exit) {
if (w.unused) {
m_waiters.lock((ref waiters) {
waiters.active.remove(w);
waiters.free.add(w);
// TODO: cap size of m_freeWaiters
});
}
}
del(*w);
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
auto e = createSharedManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); }
auto w2 = runWorkerTaskH(&w, &e);
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
private shared struct Monitor(T, M)
{
private {
shared T m_data;
shared M m_mutex;
}
void lock(scope void delegate(ref T) @safe nothrow access)
shared {
m_mutex.lock();
scope (exit) m_mutex.unlock();
access(*() @trusted { return cast(T*)&m_data; } ());
}
}
package struct SpinLock {
private shared int locked;
static int threadID = 0;
static void setup()
{
import core.thread : Thread;
threadID = cast(int)cast(void*)Thread.getThis();
}
@safe nothrow @nogc shared:
bool tryLock()
@trusted {
assert(atomicLoad(locked) != threadID, "Recursive lock attempt.");
return cas(&locked, 0, threadID);
}
void lock()
{
while (!tryLock()) {}
}
void unlock()
@trusted {
assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread.");
atomicStore(locked, 0);
}
}
private struct ThreadLocalWaiter {
private {
static struct Waiter {
Waiter* next;
Task task;
void delegate() @safe nothrow notifier;
bool cancelled;
void wait(void delegate() @safe nothrow del) @safe nothrow {
assert(notifier is null, "Local waiter is used twice!");
notifier = del;
}
void cancel() @safe nothrow { cancelled = true; notifier = null; }
}
ThreadLocalWaiter* next;
NativeEventDriver m_driver;
EventID m_event;
Waiter* m_waiters;
}
this(this)
@safe nothrow {
if (m_event != EventID.invalid)
m_driver.events.addRef(m_event);
}
~this()
@safe nothrow {
if (m_event != EventID.invalid)
m_driver.events.releaseRef(m_event);
}
@property bool unused() const @safe nothrow { return m_waiters is null; }
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
@safe {
import std.datetime : SysTime, Clock, UTC;
import vibe.internal.async : Waitable, asyncAwaitAny;
Waiter w;
Waiter* pw = () @trusted { return &w; } ();
w.next = m_waiters;
m_waiters = pw;
SysTime target_timeout, now;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
Waitable!(
cb => w.wait(cb),
cb => w.cancel()
) waitable;
void removeWaiter()
@safe nothrow {
Waiter* piw = m_waiters, ppiw = null;
while (piw !is null) {
if (piw is pw) {
if (ppiw) ppiw.next = piw.next;
else m_waiters = piw.next;
break;
}
ppiw = piw;
piw = piw.next;
}
}
scope (failure) removeWaiter();
if (evt != EventID.invalid) {
Waitable!(
(cb) {
eventDriver.events.wait(evt, cb);
// check for exit codition *after* starting to wait for the event
// to avoid a race condition
if (exit_condition()) {
eventDriver.events.cancelWait(evt, cb);
cb(evt);
}
},
cb => eventDriver.events.cancelWait(evt, cb),
EventID
) ewaitable;
asyncAwaitAny!interruptible(timeout, waitable, ewaitable);
} else {
asyncAwaitAny!interruptible(timeout, waitable);
}
if (waitable.cancelled) {
removeWaiter();
return false;
} else debug {
Waiter* piw = m_waiters;
while (piw !is null) {
assert(piw !is pw, "Thread local waiter still in queue after it got notified!?");
piw = piw.next;
}
}
return true;
}
bool emit()
@safe nothrow {
if (!m_waiters) return false;
Waiter* waiters = m_waiters;
m_waiters = null;
while (waiters) { while (waiters) {
auto wnext = waiters.next; auto wnext = waiters.next;
assert(wnext !is waiters); assert(wnext !is waiters);
@ -961,131 +1161,36 @@ struct ManualEvent {
} else logTrace("notify callback is null"); } else logTrace("notify callback is null");
waiters = wnext; waiters = wnext;
} }
break;
case EmitMode.single:
assert(false, "TODO!");
}
return true; return true;
} }
private void acquireWaiter(ThreadWaiter* w, LocalWaiter* lw) bool emitSingle()
nothrow { @safe nothrow {
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling if (!m_waiters) return false;
lw.task = Task.getThis();
if (m_waiters) { auto w = m_waiters;
m_waiters.tasks.add(lw); m_waiters = w.next;
} else { if (w.notifier !is null) {
m_waiters = w; logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
w.notifier();
w.notifier = null;
} else logTrace("notify callback is null");
return true;
} }
} }
private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* lw)
nothrow shared {
lw.task = Task.getThis();
if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.events.create();
auto sdriver = () @trusted { return cast(shared)eventDriver; } ();
shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } ();
size_t cnt = 0;
while (pw !is null) {
assert(pw !is w, "Waiter is already registered!");
if (pw.driver is sdriver)
break;
assert(cnt++ < 1000, "Recursive waiter?!");
pw = () @trusted { return atomicLoad(pw.next); } ();
}
if (!pw) {
pw = w;
shared(ThreadWaiter)* wn;
do {
wn = () @trusted { return atomicLoad(m_waiters); } ();
w.next = wn;
w.event = ms_threadEvent;
w.driver = sdriver;
w.thread = () @trusted { return cast(shared)Thread.getThis(); } ();
} while (!() @trusted { return cas(&m_waiters, wn, w); } ());
}
() @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(lw);
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
logInfo("A");
auto e = createManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
runTask({
sleep(200.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
logInfo("B");
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
logInfo("C");
auto e = createSharedManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
static void w(shared(ManualEvent) e){e.wait(500.msecs, e.emitCount);}
auto w2 = runWorkerTaskH(&w, e);
runTask({
sleep(200.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
logInfo("D");
}
private struct StackSList(T) private struct StackSList(T)
{ {
@safe nothrow:
import core.atomic : cas; import core.atomic : cas;
private T* m_first; private T* m_first;
@property T* first() { return m_first; } @property T* first() { return m_first; }
@property shared(T)* first() shared { return atomicLoad(m_first); } @property shared(T)* first() shared @trusted { return atomicLoad(m_first); }
void add(shared(T)* elem)
shared {
do elem.next = atomicLoad(m_first);
while (cas(&m_first, elem.next, elem));
}
void remove(shared(T)* elem)
shared {
while (true) {
shared(T)* w = atomicLoad(m_first), wp;
while (w !is elem) {
wp = w;
w = atomicLoad(w.next);
}
if (wp !is null) {
if (cas(&wp.next, w, w.next))
break;
} else {
if (cas(&m_first, w, w.next))
break;
}
}
}
bool empty() const { return m_first is null; } bool empty() const { return m_first is null; }
@ -1095,16 +1200,30 @@ private struct StackSList(T)
m_first = elem; m_first = elem;
} }
void remove(T* elem) bool remove(T* elem)
{ {
T* w = m_first, wp; T* w = m_first, wp;
while (w !is elem) { while (w !is elem) {
assert(w !is null); if (!w) return false;
wp = w; wp = w;
w = w.next; w = w.next;
} }
if (wp) wp.next = w.next; if (wp) wp.next = w.next;
else m_first = w.next; else m_first = w.next;
return true;
}
void filter(scope bool delegate(T* el) @safe nothrow del)
{
T* w = m_first, pw;
while (w !is null) {
auto wnext = w.next;
if (!del(w)) {
if (pw) pw.next = wnext;
else m_first = wnext;
} else pw = w;
w = wnext;
}
} }
} }
@ -1118,7 +1237,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_signal = createSharedManualEvent();
} }
@ -1173,7 +1291,6 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_signal = createSharedManualEvent();
m_mutex = new core.sync.mutex.Mutex; m_mutex = new core.sync.mutex.Mutex;
} }
@ -1249,7 +1366,6 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
void setup(LOCKABLE mtx) void setup(LOCKABLE mtx)
{ {
m_mutex = mtx; m_mutex = mtx;
m_signal = createSharedManualEvent();
} }
@property LOCKABLE mutex() { return m_mutex; } @property LOCKABLE mutex() { return m_mutex; }