Improve assertion/test coverage of ManualEvent.

This commit is contained in:
Sönke Ludwig 2017-01-11 17:46:36 +01:00
parent 35a94412d0
commit ece639ee01

View file

@ -711,7 +711,7 @@ struct ManualEvent {
all all
} }
//@disable this(this); //@disable this(this); // FIXME: commenting this out this is not a good idea...
deprecated("ManualEvent is always non-null!") deprecated("ManualEvent is always non-null!")
bool opCast() const nothrow { return true; } bool opCast() const nothrow { return true; }
@ -850,12 +850,12 @@ struct ManualEvent {
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
ThreadWaiter w; ThreadWaiter w;
LocalWaiter tw; LocalWaiter lw;
() @trusted { acquireWaiter(&w, &tw); } (); () @trusted { acquireWaiter(&w, &lw); } ();
Waitable!( Waitable!(
cb => tw.wait(cb), cb => lw.wait(cb),
cb => tw.cancel() cb => lw.cancel()
) waitable; ) waitable;
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
ec = this.emitCount; ec = this.emitCount;
@ -883,16 +883,24 @@ struct ManualEvent {
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
shared(ThreadWaiter) w; shared(ThreadWaiter) w;
LocalWaiter tw; LocalWaiter lw;
() @trusted { acquireWaiter(&w, &tw); } (); () @trusted { acquireWaiter(&w, &lw); } ();
() @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } (); () @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } ();
if (tw.next) { scope (exit) {
shared(ThreadWaiter)* pw = atomicLoad(m_waiters);
while (pw !is null) {
assert(pw !is () @trusted { return &w; } (), "Thread waiter was not removed from queue.");
pw = pw.next;
}
}
if (lw.next) {
// if we are not the first waiter for this thread, // if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread // wait for getting resumed by emitForThisThread
Waitable!( Waitable!(
cb => tw.wait(cb), cb => lw.wait(cb),
cb => tw.cancel() cb => lw.cancel()
) waitable; ) waitable;
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
if (waitable.cancelled) break; // timeout if (waitable.cancelled) break; // timeout
@ -906,8 +914,8 @@ struct ManualEvent {
EventID EventID
) eventwaiter; ) eventwaiter;
Waitable!( Waitable!(
cb => tw.wait(cb), cb => lw.wait(cb),
cb => tw.cancel() cb => lw.cancel()
) localwaiter; ) localwaiter;
logDebugV("Wait on event %s", ms_threadEvent); logDebugV("Wait on event %s", ms_threadEvent);
asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter);
@ -961,21 +969,21 @@ struct ManualEvent {
return true; return true;
} }
private void acquireWaiter(ThreadWaiter* w, LocalWaiter* tw) private void acquireWaiter(ThreadWaiter* w, LocalWaiter* lw)
nothrow { nothrow {
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling
tw.task = Task.getThis(); lw.task = Task.getThis();
if (m_waiters) { if (m_waiters) {
m_waiters.tasks.add(tw); m_waiters.tasks.add(lw);
} else { } else {
m_waiters = w; m_waiters = w;
} }
} }
private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* tw) private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* lw)
nothrow shared { nothrow shared {
tw.task = Task.getThis(); lw.task = Task.getThis();
if (ms_threadEvent == EventID.init) if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.events.create(); ms_threadEvent = eventDriver.events.create();
@ -983,10 +991,12 @@ struct ManualEvent {
auto sdriver = () @trusted { return cast(shared)eventDriver; } (); auto sdriver = () @trusted { return cast(shared)eventDriver; } ();
shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } (); shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } ();
assert(pw !is w, "Waiter is already registered!"); size_t cnt = 0;
while (pw !is null) { while (pw !is null) {
assert(pw !is w, "Waiter is already registered!");
if (pw.driver is sdriver) if (pw.driver is sdriver)
break; break;
assert(cnt++ < 1000, "Recursive waiter?!");
pw = () @trusted { return atomicLoad(pw.next); } (); pw = () @trusted { return atomicLoad(pw.next); } ();
} }
@ -1002,10 +1012,47 @@ struct ManualEvent {
} while (!() @trusted { return cas(&m_waiters, wn, w); } ()); } while (!() @trusted { return cas(&m_waiters, wn, w); } ());
} }
() @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(tw); () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(lw);
} }
} }
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
logInfo("A");
auto e = createManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
runTask({
sleep(200.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
logInfo("B");
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
logInfo("C");
auto e = createSharedManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
static void w(shared(ManualEvent) e){e.wait(500.msecs, e.emitCount);}
auto w2 = runWorkerTaskH(&w, e);
runTask({
sleep(200.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
logInfo("D");
}
private struct StackSList(T) private struct StackSList(T)
{ {