Fix scope issues in asyncAwait.

This commit is contained in:
Sönke Ludwig 2016-06-15 18:21:04 +02:00
parent 48131ce7b8
commit a74a89cab7

View file

@ -44,7 +44,7 @@ struct Waitable(alias wait, alias cancel, Results...) {
void cancelCallback(Callback cb) { cancel(cb); } void cancelCallback(Callback cb) { cancel(cb); }
} }
void asyncAwaitAny(bool interruptible, string func, Waitables...)(Duration timeout, ref Waitables waitables) void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables)
{ {
if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables); if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables);
else { else {
@ -62,7 +62,7 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(Duration timeo
} }
} }
void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables waitables) void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(ref Waitables waitables)
if (Waitables.length >= 1 && !is(Waitables[0] == Duration)) if (Waitables.length >= 1 && !is(Waitables[0] == Duration))
{ {
import std.meta : staticMap; import std.meta : staticMap;
@ -71,24 +71,35 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables
/*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates /*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates
bool[Waitables.length] fired; bool[Waitables.length] fired;
ScopeGuard[Waitables.length] scope_guards;
bool any_fired = false;
Task t; Task t;
logTrace("Performing %s async operations in %s", waitables.length, func);
foreach (i, W; Waitables) { foreach (i, W; Waitables) {
callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow { callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow {
logTrace("Waitable %s fired.", i); logTrace("Waitable %s in %s fired.", i, func);
fired[i] = true; fired[i] = true;
any_fired = true;
waitables[i].results = results; waitables[i].results = results;
if (t != Task.init) switchToTask(t); if (t != Task.init) switchToTask(t);
}; };
logTrace("Starting operation %s", i);
waitables[i].waitCallback(callbacks[i]); waitables[i].waitCallback(callbacks[i]);
scope (exit) { scope_guards[i] = ScopeGuard({
if (!fired[i]) { if (!fired[i]) {
logTrace("Cancelling operation %s", i);
waitables[i].cancelCallback(callbacks[i]); waitables[i].cancelCallback(callbacks[i]);
assert(fired[i], "The cancellation callback didn't invoke the result callback!"); any_fired = true;
fired[i] = true;
} }
});
if (any_fired) {
logTrace("Returning without waiting.");
return;
} }
if (fired[i]) return;
} }
logTrace("Need to wait..."); logTrace("Need to wait...");
@ -97,15 +108,24 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables
static if (interruptible) { static if (interruptible) {
bool interrupted = false; bool interrupted = false;
hibernate(() @safe nothrow { hibernate(() @safe nothrow {
logTrace("Got interrupted."); logTrace("Got interrupted in %s.", func);
interrupted = true; interrupted = true;
}); });
if (interrupted) if (interrupted)
throw new InterruptException; throw new InterruptException;
} else hibernate(); } else hibernate();
} while (!fired[].any()); } while (!any_fired);
logTrace("Return result."); logTrace("Return result for %s.", func);
} }
private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow; private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow;
private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op !is null) op(); } }
@safe nothrow /*@nogc*/ unittest {
int cnt = 0;
auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); });
assert(ret[0] == 42);
assert(cnt == 1);
}