Implement generic asyncAwaitAny.

This enables waiting for multiple events at the same time. Generic timeout functionality is now also implemented.
This commit is contained in:
Sönke Ludwig 2016-06-14 09:26:12 +02:00
parent 8264070f19
commit 48131ce7b8

View file

@ -10,63 +10,102 @@ import core.time : Duration, seconds;
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)() auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)()
if (!is(Object == Duration)) { if (!is(Object == Duration)) {
return asyncAwaitImpl!(true, Callback, action, cancel, func)(Duration.max); Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
asyncAwaitAny!(true, func)(waitable);
return tuple(waitable.results);
} }
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
{ {
return asyncAwaitImpl!(true, Callback, action, cancel, func)(timeout); Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
asyncAwaitAny!(true, func)(timeout, waitable);
return tuple(waitable.results);
} }
auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)() auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)()
nothrow { nothrow {
return asyncAwaitImpl!(false, Callback, action, (cb) { assert(false); }, func)(Duration.max); Waitable!(action, (cb) { assert(false, "Action cannot be cancelled."); }, ParameterTypeTuple!Callback) waitable;
asyncAwaitAny!(false, func)(waitable);
return tuple(waitable.results);
} }
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
nothrow { nothrow {
assert(timeout >= 0.seconds); Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
asyncAwaitImpl!(false, Callback, action, cancel, func)(timeout); asyncAwaitAny!(false, func)(timeout, waitable);
return tuple(waitable.results);
} }
private auto asyncAwaitImpl(bool interruptible, Callback, alias action, alias cancel, string func)(Duration timeout) struct Waitable(alias wait, alias cancel, Results...) {
@safe if (!is(Object == Duration)) { alias Callback = void delegate(Results) @safe nothrow;
alias CBTypes = ParameterTypeTuple!Callback; Results results;
bool cancelled;
void waitCallback(Callback cb) { wait(cb); }
void cancelCallback(Callback cb) { cancel(cb); }
}
assert(timeout >= 0.seconds); void asyncAwaitAny(bool interruptible, string func, Waitables...)(Duration timeout, ref Waitables waitables)
assert(timeout == Duration.max, "TODO!"); {
if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables);
else {
import eventcore.core;
bool fired = false; auto tm = eventDriver.createTimer();
CBTypes ret; eventDriver.setTimer(tm, timeout);
scope (exit) eventDriver.releaseRef(tm);
Waitable!(
cb => eventDriver.waitTimer(tm, cb),
cb => eventDriver.cancelTimerWait(tm, cb),
TimerID
) timerwaitable;
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);
}
}
void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables waitables)
if (Waitables.length >= 1 && !is(Waitables[0] == Duration))
{
import std.meta : staticMap;
import std.algorithm.searching : any;
/*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates
bool[Waitables.length] fired;
Task t; Task t;
void callback(CBTypes params) foreach (i, W; Waitables) {
@safe nothrow { callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow {
logTrace("Got result."); logTrace("Waitable %s fired.", i);
fired = true; fired[i] = true;
ret = params; waitables[i].results = results;
if (t != Task.init) switchToTask(t); if (t != Task.init) switchToTask(t);
};
waitables[i].waitCallback(callbacks[i]);
scope (exit) {
if (!fired[i]) {
waitables[i].cancelCallback(callbacks[i]);
assert(fired[i], "The cancellation callback didn't invoke the result callback!");
}
}
if (fired[i]) return;
} }
scope cbdel = &callback;
logTrace("Calling async function in "~func);
action(cbdel);
if (!fired) {
logTrace("Need to wait..."); logTrace("Need to wait...");
t = Task.getThis(); t = Task.getThis();
do { do {
static if (interruptible) { static if (interruptible) {
bool interrupted = false; bool interrupted = false;
hibernate(() @safe nothrow { hibernate(() @safe nothrow {
cancel(cbdel); logTrace("Got interrupted.");
interrupted = true; interrupted = true;
}); });
if (interrupted) if (interrupted)
throw new InterruptException; // FIXME: the original operation needs to be stopped! or the callback will still be called" throw new InterruptException;
} else hibernate(); } else hibernate();
} while (!fired); } while (!fired[].any());
}
logTrace("Return result."); logTrace("Return result.");
return tuple(ret);
} }
private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow;