Fix all ManualEvent related tests.

This commit is contained in:
Sönke Ludwig 2016-06-17 22:33:04 +02:00
parent f9579a5dd2
commit 1005f5c674
4 changed files with 313 additions and 190 deletions

View file

@ -102,12 +102,12 @@ int runEventLoop()
runTask(toDelegate(&watchExitFlag)); runTask(toDelegate(&watchExitFlag));
} (); } ();
while ((s_scheduler.scheduledTaskCount || eventDriver.waiterCount) && !s_exitEventLoop) { while (true) {
logTrace("process events"); auto er = s_scheduler.waitAndProcess();
if (eventDriver.processEvents() == ExitReason.exited) { if (er != ExitReason.idle || s_exitEventLoop) {
logDebug("Event loop exit reason (exit flag=%s): %s", s_exitEventLoop, er);
break; break;
} }
logTrace("idle processing");
performIdleProcessing(); performIdleProcessing();
} }
@ -156,18 +156,18 @@ void exitEventLoop(bool shutdown_all_threads = false)
*/ */
bool processEvents() bool processEvents()
@safe nothrow { @safe nothrow {
if (!eventDriver.processEvents(0.seconds)) return false; return !s_scheduler.process().among(ExitReason.exited, ExitReason.outOfWaiters);
performIdleProcessing();
return true;
} }
/** /**
Wait once for events and process them. Wait once for events and process them.
*/ */
void runEventLoopOnce() ExitReason runEventLoopOnce()
@safe nothrow { @safe nothrow {
eventDriver.processEvents(Duration.max); auto ret = s_scheduler.waitAndProcess();
performIdleProcessing(); if (ret == ExitReason.idle)
performIdleProcessing();
return ret;
} }
/** /**
@ -560,7 +560,7 @@ public void setupWorkerThreads(uint num = logicalProcessorCount())
foreach (i; 0 .. num) { foreach (i; 0 .. num) {
auto thr = new Thread(&workerThreadFunc); auto thr = new Thread(&workerThreadFunc);
thr.name = format("Vibe Task Worker #%s", i); thr.name = format("vibe-%s", i);
st_threads ~= ThreadContext(thr, true); st_threads ~= ThreadContext(thr, true);
thr.start(); thr.start();
} }
@ -747,13 +747,11 @@ unittest {
*/ */
Timer createTimer(void delegate() nothrow @safe callback) Timer createTimer(void delegate() nothrow @safe callback)
@safe nothrow { @safe nothrow {
void cb(TimerID tm)
nothrow @safe {
if (callback !is null)
callback();
}
auto ret = Timer(eventDriver.createTimer()); auto ret = Timer(eventDriver.createTimer());
eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure! if (callback !is null) {
void cb(TimerID tm) nothrow @safe { callback(); }
eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure!
}
return ret; return ret;
} }
@ -945,6 +943,7 @@ struct Timer {
private this(TimerID id) private this(TimerID id)
nothrow { nothrow {
assert(id != TimerID.init, "Invalid timer ID.");
m_driver = eventDriver; m_driver = eventDriver;
m_id = id; m_id = id;
} }
@ -1022,7 +1021,7 @@ package(vibe) void performIdleProcessing()
if (again) { if (again) {
auto er = eventDriver.processEvents(0.seconds); auto er = eventDriver.processEvents(0.seconds);
if (er.among!(ExitReason.exited, ExitReason.idle)) { if (er.among!(ExitReason.exited, ExitReason.outOfWaiters)) {
logDebug("Setting exit flag due to driver signalling exit"); logDebug("Setting exit flag due to driver signalling exit");
s_exitEventLoop = true; s_exitEventLoop = true;
return; return;
@ -1158,7 +1157,7 @@ shared static this()
st_threadShutdownCondition = new Condition(st_threadsMutex); st_threadShutdownCondition = new Condition(st_threadsMutex);
auto thisthr = Thread.getThis(); auto thisthr = Thread.getThis();
thisthr.name = "Main"; thisthr.name = "main";
assert(st_threads.length == 0, "Main thread not the first thread!?"); assert(st_threads.length == 0, "Main thread not the first thread!?");
st_threads ~= ThreadContext(thisthr, false); st_threads ~= ThreadContext(thisthr, false);

View file

@ -7,6 +7,7 @@
*/ */
module vibe.core.sync; module vibe.core.sync;
import vibe.core.log : logDebugV, logTrace, logInfo;
import vibe.core.task; import vibe.core.task;
import core.atomic; import core.atomic;
@ -314,7 +315,6 @@ unittest { // test deferred throwing
}); });
auto t2 = runTask({ auto t2 = runTask({
scope (failure) assert(false, "Only InterruptException supposed to be thrown!");
mutex.lock(); mutex.lock();
scope (exit) mutex.unlock(); scope (exit) mutex.unlock();
try { try {
@ -322,6 +322,8 @@ unittest { // test deferred throwing
assert(false, "Yield is supposed to have thrown an InterruptException."); assert(false, "Yield is supposed to have thrown an InterruptException.");
} catch (InterruptException) { } catch (InterruptException) {
// as expected! // as expected!
} catch (Exception) {
assert(false, "Only InterruptException supposed to be thrown!");
} }
}); });
@ -567,6 +569,7 @@ class TaskCondition : core.sync.condition.Condition {
*/ */
unittest { unittest {
import vibe.core.core; import vibe.core.core;
import vibe.core.log;
__gshared Mutex mutex; __gshared Mutex mutex;
__gshared TaskCondition condition; __gshared TaskCondition condition;
@ -576,6 +579,8 @@ unittest {
mutex = new Mutex; mutex = new Mutex;
condition = new TaskCondition(mutex); condition = new TaskCondition(mutex);
logDebug("SETTING UP TASKS");
// start up the workers and count how many are running // start up the workers and count how many are running
foreach (i; 0 .. 4) { foreach (i; 0 .. 4) {
workers_still_running++; workers_still_running++;
@ -584,16 +589,23 @@ unittest {
sleep(100.msecs); sleep(100.msecs);
// notify the waiter that we're finished // notify the waiter that we're finished
synchronized (mutex) synchronized (mutex) {
workers_still_running--; workers_still_running--;
logDebug("DECREMENT %s", workers_still_running);
}
logDebug("NOTIFY");
condition.notify(); condition.notify();
}); });
} }
logDebug("STARTING WAIT LOOP");
// wait until all tasks have decremented the counter back to zero // wait until all tasks have decremented the counter back to zero
synchronized (mutex) { synchronized (mutex) {
while (workers_still_running > 0) while (workers_still_running > 0) {
logDebug("STILL running %s", workers_still_running);
condition.wait(); condition.wait();
}
} }
} }
@ -649,9 +661,9 @@ struct ManualEvent {
private { private {
static struct ThreadWaiter { static struct ThreadWaiter {
ThreadWaiter* next; ThreadWaiter* next;
/*immutable*/ EventID event; EventID event;
/*immutable*/ EventDriver driver; EventDriver driver;
//immutable Thread thread; Thread thread;
StackSList!LocalWaiter tasks; StackSList!LocalWaiter tasks;
} }
static struct LocalWaiter { static struct LocalWaiter {
@ -660,24 +672,14 @@ struct ManualEvent {
void delegate() @safe nothrow notifier; void delegate() @safe nothrow notifier;
bool cancelled = false; bool cancelled = false;
void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; } void wait(void delegate() @safe nothrow del) @safe nothrow {
void cancel() @safe nothrow { cancelled = true; auto n = notifier; notifier = null; n(); } assert(notifier is null, "Local waiter is used twice!");
void wait(void delegate() @safe nothrow del)
shared @safe nothrow {
notifier = del; notifier = del;
if (!next) eventDriver.waitForEvent(ms_threadEvent, &onEvent);
}
private void onEvent(EventID event)
@safe nothrow {
assert(event == ms_threadEvent);
notifier();
} }
void cancel() @safe nothrow { cancelled = true; notifier = null; }
} }
int m_emitCount; int m_emitCount;
ThreadWaiter* m_waiters; ThreadWaiter* m_waiters;
} }
// thread destructor in vibe.core.core will decrement the ref. count // thread destructor in vibe.core.core will decrement the ref. count
@ -705,6 +707,8 @@ struct ManualEvent {
shared nothrow { shared nothrow {
import core.atomic : atomicOp, cas; import core.atomic : atomicOp, cas;
logTrace("emit shared");
auto ec = atomicOp!"+="(m_emitCount, 1); auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis(); auto thisthr = Thread.getThis();
@ -712,27 +716,35 @@ struct ManualEvent {
case EmitMode.all: case EmitMode.all:
// FIXME: would be nice to have atomicSwap instead // FIXME: would be nice to have atomicSwap instead
auto w = cast(ThreadWaiter*)atomicLoad(m_waiters); auto w = cast(ThreadWaiter*)atomicLoad(m_waiters);
if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) {
logTrace("Another thread emitted concurrently - returning.");
return ec; return ec;
}
while (w !is null) { while (w !is null) {
// Note: emitForThisThread can result in w getting deallocated at any
// time, so we need to copy any fields first
auto wnext = w.next;
atomicStore((cast(shared)w).next, null);
assert(wnext !is w, "Same waiter enqueued twice!?");
if (w.driver is eventDriver) { if (w.driver is eventDriver) {
// Note: emitForThisThread can result in w getting deallocated at any logTrace("Same thread emit (%s/%s)", cast(void*)w, cast(void*)w.tasks.first);
// time, so we need to copy any fields first
auto tasks = w.tasks;
w = w.next;
emitForThisThread(w.tasks.m_first, mode); emitForThisThread(w.tasks.m_first, mode);
} else { } else {
logTrace("Foreign thread \"%s\" notify: %s", w.thread.name, w.event);
auto drv = w.driver; auto drv = w.driver;
auto evt = w.event; auto evt = w.event;
w = w.next;
if (evt != EventID.init) if (evt != EventID.init)
drv.triggerEvent(evt, true); (cast(shared)drv).triggerEvent(evt, true);
} }
w = wnext;
} }
break; break;
case EmitMode.single: case EmitMode.single:
assert(false); assert(false);
} }
logTrace("emit shared done");
return ec; return ec;
} }
/// ditto /// ditto
@ -740,6 +752,8 @@ struct ManualEvent {
nothrow { nothrow {
auto ec = m_emitCount++; auto ec = m_emitCount++;
logTrace("unshared emit");
final switch (mode) { final switch (mode) {
case EmitMode.all: case EmitMode.all:
auto w = m_waiters; auto w = m_waiters;
@ -758,6 +772,9 @@ struct ManualEvent {
/** Acquires ownership and waits until the signal is emitted. /** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws: Throws:
May throw an $(D InterruptException) if the task gets interrupted May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()). using $(D Task.interrupt()).
@ -766,79 +783,20 @@ struct ManualEvent {
/// ditto /// ditto
int wait() shared { return wait(this.emitCount); } int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the given one. /** Acquires ownership and waits until the emit count differs from the
given one or until a timeout is reached.
Throws: Throws:
May throw an $(D InterruptException) if the task gets interrupted May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()). using $(D Task.interrupt()).
*/ */
int wait(int emit_count) { return wait(Duration.max, emit_count); } int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
/// ditto /// ditto
int wait(int emit_count) shared { return wait(Duration.max, emit_count); } int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
/** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reaced.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(Duration timeout, int emit_count)
{
ThreadWaiter w;
LocalWaiter tw;
int ec = this.emitCount;
while (ec <= emit_count) {
// wait for getting resumed directly by emit/emitForThisThread
acquireWaiter(w, tw);
asyncAwait!(void delegate() @safe nothrow,
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
}
return ec;
}
/// ditto /// ditto
int wait(Duration timeout, int emit_count) int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
shared { /// ditto
shared(ThreadWaiter) w; int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
LocalWaiter tw;
int ec = this.emitCount;
while (ec <= emit_count) {
acquireWaiter(w, tw);
if (tw.next) {
// if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread
asyncAwait!(void delegate() @safe nothrow,
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
} else {
// if we are the first waiter for this thread,
// wait for the thread event to get emitted
Waitable!(
cb => eventDriver.waitForEvent(ms_threadEvent, cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb),
EventID
) eventwaiter;
Waitable!(
cb => tw.wait(cb),
cb => tw.cancel()
) localwaiter;
asyncAwaitAny!true(timeout, eventwaiter, localwaiter);
ec = this.emitCount;
if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode
else if (localwaiter.cancelled) break; // timeout
}
}
return ec;
}
/** Same as $(D wait), but defers throwing any $(D InterruptException). /** Same as $(D wait), but defers throwing any $(D InterruptException).
@ -849,45 +807,51 @@ struct ManualEvent {
/// ///
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
/// ditto /// ditto
int waitUninterruptible(int emit_count) nothrow { return waitUninterruptible(Duration.max, emit_count); } int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
/// ditto /// ditto
int waitUninterruptible(int emit_count) shared nothrow { return waitUninterruptible(Duration.max, emit_count); } int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
/// ditto /// ditto
int waitUninterruptible(Duration timeout, int emit_count) int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
nothrow { /// ditto
ThreadWaiter w; int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
LocalWaiter tw;
acquireWaiter(w, tw);
private int doWait(bool interruptible)(Duration timeout, int emit_count)
{
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
asyncAwaitUninterruptible!(void delegate(), ThreadWaiter w;
LocalWaiter tw;
acquireWaiter(w, tw);
Waitable!(
cb => tw.wait(cb), cb => tw.wait(cb),
cb => tw.cancel() cb => tw.cancel()
)(timeout); ) waitable;
asyncAwaitAny!interruptible(timeout, waitable);
ec = this.emitCount; ec = this.emitCount;
} }
return ec; return ec;
} }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count)
shared nothrow {
shared(ThreadWaiter) w;
LocalWaiter tw;
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
shared {
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
shared(ThreadWaiter) w;
LocalWaiter tw;
acquireWaiter(w, tw); acquireWaiter(w, tw);
logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next);
if (tw.next) { if (tw.next) {
// if we are not the first waiter for this thread, // if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread // wait for getting resumed by emitForThisThread
asyncAwaitUninterruptible!(void delegate() @safe nothrow, Waitable!(
cb => tw.wait(cb), cb => tw.wait(cb),
cb => tw.cancel() cb => tw.cancel()
)(timeout); ) waitable;
ec = this.emitCount; asyncAwaitAny!interruptible(timeout, waitable);
} else { } else {
again:
// if we are the first waiter for this thread, // if we are the first waiter for this thread,
// wait for the thread event to get emitted // wait for the thread event to get emitted
Waitable!( Waitable!(
@ -899,13 +863,20 @@ struct ManualEvent {
cb => tw.wait(cb), cb => tw.wait(cb),
cb => tw.cancel() cb => tw.cancel()
) localwaiter; ) localwaiter;
asyncAwaitAny!false(timeout, eventwaiter, localwaiter); logDebugV("Wait on event %s", ms_threadEvent);
asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter);
ec = this.emitCount; if (!eventwaiter.cancelled) {
if (atomicLoad(w.next) == null)
if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode
else if (localwaiter.cancelled) break; // timeout else goto again;
} else if (localwaiter.cancelled) break; // timeout
} }
assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w,
"Waiter did not get removed from waiter queue.");
ec = this.emitCount;
} }
return ec; return ec;
} }
@ -914,12 +885,19 @@ struct ManualEvent {
nothrow { nothrow {
if (!waiters) return false; if (!waiters) return false;
logTrace("emitForThisThread");
final switch (mode) { final switch (mode) {
case EmitMode.all: case EmitMode.all:
while (waiters) { while (waiters) {
if (waiters.notifier !is null) auto wnext = waiters.next;
assert(wnext !is waiters);
if (waiters.notifier !is null) {
logTrace("notify task %s %s %s", cast(void*)waiters, cast(void*)waiters.notifier.funcptr, waiters.notifier.ptr);
waiters.notifier(); waiters.notifier();
waiters = waiters.next; waiters.notifier = null;
} else logTrace("notify callback is null");
waiters = wnext;
} }
break; break;
case EmitMode.single: case EmitMode.single:
@ -950,20 +928,27 @@ struct ManualEvent {
auto sdriver = cast(shared)eventDriver; auto sdriver = cast(shared)eventDriver;
if (m_waiters) { shared(ThreadWaiter)* pw = atomicLoad(m_waiters);
shared(ThreadWaiter)* pw = m_waiters; assert(pw !is &w, "Waiter is already registered!");
while (pw !is null) { while (pw !is null) {
if (pw.driver is sdriver) { if (pw.driver is sdriver)
(cast(ThreadWaiter*)pw).tasks.add(&tw); break;
break; pw = atomicLoad(pw.next);
}
pw = atomicLoad(pw.next);
}
} else {
m_waiters = &w;
w.event = ms_threadEvent;
w.driver = sdriver;
} }
if (!pw) {
pw = &w;
shared(ThreadWaiter)* wn;
do {
wn = atomicLoad(m_waiters);
w.next = wn;
w.event = ms_threadEvent;
w.driver = sdriver;
w.thread = cast(shared)Thread.getThis();
} while (!cas(&m_waiters, wn, &w));
}
(cast(ThreadWaiter*)pw).tasks.add(&tw);
} }
} }
@ -1023,7 +1008,6 @@ private struct StackSList(T)
} }
private struct TaskMutexImpl(bool INTERRUPTIBLE) { private struct TaskMutexImpl(bool INTERRUPTIBLE) {
import std.stdio;
private { private {
shared(bool) m_locked = false; shared(bool) m_locked = false;
shared(uint) m_waiters = 0; shared(uint) m_waiters = 0;
@ -1041,7 +1025,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
{ {
if (cas(&m_locked, false, true)) { if (cas(&m_locked, false, true)) {
debug m_owner = Task.getThis(); debug m_owner = Task.getThis();
version(MutexPrint) writefln("mutex %s lock %s", cast(void*)this, atomicLoad(m_waiters)); debug(VibeMutexPrint) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
return true; return true;
} }
return false; return false;
@ -1052,7 +1036,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
if (tryLock()) return; if (tryLock()) return;
debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
atomicOp!"+="(m_waiters, 1); atomicOp!"+="(m_waiters, 1);
version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); debug(VibeMutexPrint) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1); scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount(); auto ecnt = m_signal.emitCount();
while (!tryLock()) { while (!tryLock()) {
@ -1069,7 +1053,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
m_owner = Task(); m_owner = Task();
} }
atomicStore!(MemoryOrder.rel)(m_locked, false); atomicStore!(MemoryOrder.rel)(m_locked, false);
version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); debug(VibeMutexPrint) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0) if (atomicLoad(m_waiters) > 0)
m_signal.emit(); m_signal.emit();
} }
@ -1113,7 +1097,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
{ {
if (tryLock()) return; if (tryLock()) return;
atomicOp!"+="(m_waiters, 1); atomicOp!"+="(m_waiters, 1);
version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); debug(VibeMutexPrint) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1); scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount(); auto ecnt = m_signal.emitCount();
while (!tryLock()) { while (!tryLock()) {
@ -1133,7 +1117,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
m_owner = Task.init; m_owner = Task.init;
} }
}); });
version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); debug(VibeMutexPrint) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0) if (atomicLoad(m_waiters) > 0)
m_signal.emit(); m_signal.emit();
} }

View file

@ -79,11 +79,29 @@ struct Task {
T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; } T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; }
void join() { if (running) taskFiber.join(); } void join() { if (running) taskFiber.join(m_taskCounter); }
void interrupt() { if (running) taskFiber.interrupt(); } void interrupt() { if (running) taskFiber.interrupt(m_taskCounter); }
string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); } string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); }
void getDebugID(R)(ref R dst)
{
import std.digest.md : MD5;
import std.bitmanip : nativeToLittleEndian;
import std.base64 : Base64;
if (!m_fiber) {
dst.put("----");
return;
}
MD5 md;
md.start();
md.put(nativeToLittleEndian(cast(size_t)cast(void*)m_fiber));
md.put(nativeToLittleEndian(cast(size_t)cast(void*)m_taskCounter));
Base64.encode(md.finish()[0 .. 3], dst);
}
bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
} }
@ -261,7 +279,7 @@ final package class TaskFiber : Fiber {
shared size_t m_taskCounter; shared size_t m_taskCounter;
shared bool m_running; shared bool m_running;
Task[] m_joiners; shared(ManualEvent) m_onExit;
// task local storage // task local storage
BitArray m_flsInit; BitArray m_flsInit;
@ -352,12 +370,8 @@ final package class TaskFiber : Fiber {
this.tidInfo.ident = Tid.init; // clear message box this.tidInfo.ident = Tid.init; // clear message box
foreach (t; m_joiners) { logTrace("Notifying joining tasks.");
logTrace("Resuming joining task."); m_onExit.emit();
taskScheduler.switchTo(t);
}
m_joiners.length = 0;
m_joiners.assumeSafeAppend();
// make sure that the task does not get left behind in the yielder queue if terminated during yield() // make sure that the task does not get left behind in the yielder queue if terminated during yield()
if (m_queue) m_queue.remove(this); if (m_queue) m_queue.remove(this);
@ -395,28 +409,21 @@ final package class TaskFiber : Fiber {
/** Blocks until the task has ended. /** Blocks until the task has ended.
*/ */
void join() void join(size_t task_counter)
{ {
import vibe.core.core : hibernate, yield; while (m_running && m_taskCounter == task_counter)
m_onExit.wait();
auto caller = Task.getThis();
if (!m_running) return;
if (caller != Task.init) {
assert(caller.fiber !is this, "A task cannot join itself.");
assert(caller.thread is this.thread, "Joining tasks in foreign threads is currently not supported.");
m_joiners ~= caller;
} else assert(Thread.getThis() is this.thread, "Joining tasks in different threads is not yet supported.");
auto run_count = m_taskCounter;
if (caller == Task.init) vibe.core.core.yield(); // let the task continue (it must be yielded currently)
while (m_running && run_count == m_taskCounter) hibernate();
} }
/** Throws an InterruptExeption within the task as soon as it calls an interruptible function. /** Throws an InterruptExeption within the task as soon as it calls an interruptible function.
*/ */
void interrupt() void interrupt(size_t task_counter)
{ {
import vibe.core.core : taskScheduler; import vibe.core.core : taskScheduler;
if (m_taskCounter != task_counter)
return;
auto caller = Task.getThis(); auto caller = Task.getThis();
if (caller != Task.init) { if (caller != Task.init) {
assert(caller != this.task, "A task cannot interrupt itself."); assert(caller != this.task, "A task cannot interrupt itself.");
@ -481,6 +488,9 @@ package struct TaskFuncInfo {
} }
package struct TaskScheduler { package struct TaskScheduler {
import eventcore.driver : ExitReason;
import eventcore.core : eventDriver;
private { private {
TaskFiberQueue m_taskQueue; TaskFiberQueue m_taskQueue;
TaskFiber m_markerTask; TaskFiber m_markerTask;
@ -512,6 +522,99 @@ package struct TaskScheduler {
nothrow: nothrow:
/** Performs a single round of scheduling without blocking.
This will execute scheduled tasks and process events from the
event queue, as long as possible without having to wait.
Returns:
A reason is returned:
$(UL
$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
$(LI `ExitReason.outOfWaiters`: There are no more scheduled
tasks or events, so the application would do nothing from
now on)
$(LI `ExitReason.idle`: All scheduled tasks and pending events
have been processed normally)
$(LI `ExitReason.timeout`: Scheduled tasks have been processed,
but there were no pending events present.)
)
*/
ExitReason process()
{
bool any_events = false;
while (true) {
// process pending tasks
schedule();
logTrace("Processing pending events...");
ExitReason er = eventDriver.processEvents(0.seconds);
logTrace("Done.");
final switch (er) {
case ExitReason.exited: return ExitReason.exited;
case ExitReason.outOfWaiters:
if (!scheduledTaskCount)
return ExitReason.outOfWaiters;
break;
case ExitReason.timeout:
if (!scheduledTaskCount)
return any_events ? ExitReason.idle : ExitReason.timeout;
break;
case ExitReason.idle:
any_events = true;
if (!scheduledTaskCount)
return ExitReason.idle;
break;
}
}
}
/** Performs a single round of scheduling, blocking if necessary.
Returns:
A reason is returned:
$(UL
$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
$(LI `ExitReason.outOfWaiters`: There are no more scheduled
tasks or events, so the application would do nothing from
now on)
$(LI `ExitReason.idle`: All scheduled tasks and pending events
have been processed normally)
)
*/
ExitReason waitAndProcess()
{
// first, process tasks without blocking
auto er = process();
final switch (er) {
case ExitReason.exited, ExitReason.outOfWaiters: return er;
case ExitReason.idle: return ExitReason.idle;
case ExitReason.timeout: break;
}
// if the first run didn't process any events, block and
// process one chunk
logTrace("Wait for new events to process...");
er = eventDriver.processEvents();
logTrace("Done.");
final switch (er) {
case ExitReason.exited: return ExitReason.exited;
case ExitReason.outOfWaiters:
if (!scheduledTaskCount)
return ExitReason.outOfWaiters;
break;
case ExitReason.timeout: assert(false, "Unexpected return code");
case ExitReason.idle: break;
}
// finally, make sure that all scheduled tasks are run
er = process();
if (er == ExitReason.timeout) return ExitReason.idle;
else return er;
}
void yieldUninterruptible() void yieldUninterruptible()
{ {
auto t = Task.getThis(); auto t = Task.getThis();

View file

@ -75,48 +75,63 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
bool any_fired = false; bool any_fired = false;
Task t; Task t;
logTrace("Performing %s async operations in %s", waitables.length, func); bool still_inside = true;
scope (exit) still_inside = false;
logDebugV("Performing %s async operations in %s", waitables.length, func);
foreach (i, W; Waitables) { foreach (i, W; Waitables) {
callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow { /*scope*/auto cb = (typeof(Waitables[i].results) results) @safe nothrow {
logTrace("Waitable %s in %s fired.", i, func); assert(still_inside, "Notification fired after asyncAwait had already returned!");
logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init);
fired[i] = true; fired[i] = true;
any_fired = true; any_fired = true;
waitables[i].results = results; waitables[i].results = results;
if (t != Task.init) switchToTask(t); if (t != Task.init) switchToTask(t);
}; };
callbacks[i] = cb;
logTrace("Starting operation %s", i); logDebugV("Starting operation %s", i);
waitables[i].waitCallback(callbacks[i]); waitables[i].waitCallback(callbacks[i]);
scope_guards[i] = ScopeGuard({
scope ccb = {
if (!fired[i]) { if (!fired[i]) {
logTrace("Cancelling operation %s", i); logDebugV("Cancelling operation %s", i);
waitables[i].cancelCallback(callbacks[i]); waitables[i].cancelCallback(callbacks[i]);
waitables[i].cancelled = true;
any_fired = true; any_fired = true;
fired[i] = true; fired[i] = true;
} }
}); };
scope_guards[i] = ScopeGuard(ccb);
if (any_fired) { if (any_fired) {
logTrace("Returning without waiting."); logDebugV("Returning to %s without waiting.", func);
return; return;
} }
} }
logTrace("Need to wait..."); logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible");
t = Task.getThis(); t = Task.getThis();
do { do {
static if (interruptible) { static if (interruptible) {
bool interrupted = false; bool interrupted = false;
hibernate(() @safe nothrow { hibernate(() @safe nothrow {
logTrace("Got interrupted in %s.", func); logDebugV("Got interrupted in %s.", func);
interrupted = true; interrupted = true;
}); });
logDebugV("Task resumed (fired=%s, interrupted=%s)", fired, interrupted);
if (interrupted) if (interrupted)
throw new InterruptException; throw new InterruptException;
} else hibernate(); } else {
hibernate();
logDebugV("Task resumed (fired=%s)", fired);
}
} while (!any_fired); } while (!any_fired);
logTrace("Return result for %s.", func); logDebugV("Return result for %s.", func);
} }
private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow; private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow;
@ -128,4 +143,26 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op
auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); }); auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); });
assert(ret[0] == 42); assert(ret[0] == 42);
assert(cnt == 1); assert(cnt == 1);
} }
@safe nothrow /*@nogc*/ unittest {
int a, b, c;
Waitable!(
(cb) { a++; cb(42); },
(cb) { assert(false); },
int
) w1;
Waitable!(
(cb) { b++; },
(cb) { c++; },
int
) w2;
asyncAwaitAny!false(w1, w2);
assert(w1.results[0] == 42 && w2.results[0] == 0);
assert(a == 1 && b == 0 && c == 0);
asyncAwaitAny!false(w2, w1);
assert(w1.results[0] == 42 && w2.results[0] == 0);
assert(a == 2 && b == 1 && c == 1);
}