diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index f6fa8ac..1815bc4 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -44,7 +44,7 @@ struct Waitable(alias wait, alias cancel, Results...) { void cancelCallback(Callback cb) { cancel(cb); } } -void asyncAwaitAny(bool interruptible, string func, Waitables...)(Duration timeout, ref Waitables waitables) +void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables) { if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables); else { @@ -62,7 +62,7 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(Duration timeo } } -void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables waitables) +void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(ref Waitables waitables) if (Waitables.length >= 1 && !is(Waitables[0] == Duration)) { import std.meta : staticMap; @@ -71,24 +71,35 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables /*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates bool[Waitables.length] fired; + ScopeGuard[Waitables.length] scope_guards; + bool any_fired = false; Task t; + logTrace("Performing %s async operations in %s", waitables.length, func); + foreach (i, W; Waitables) { callbacks[i] = (typeof(Waitables[i].results) results) @safe nothrow { - logTrace("Waitable %s fired.", i); + logTrace("Waitable %s in %s fired.", i, func); fired[i] = true; + any_fired = true; waitables[i].results = results; if (t != Task.init) switchToTask(t); }; + logTrace("Starting operation %s", i); waitables[i].waitCallback(callbacks[i]); - scope (exit) { + scope_guards[i] = ScopeGuard({ if (!fired[i]) { + logTrace("Cancelling operation %s", i); waitables[i].cancelCallback(callbacks[i]); - assert(fired[i], "The cancellation callback didn't invoke the result callback!"); + any_fired = true; + fired[i] = true; } + }); + if (any_fired) { + logTrace("Returning without waiting."); + return; } - if (fired[i]) return; } logTrace("Need to wait..."); @@ -97,15 +108,24 @@ void asyncAwaitAny(bool interruptible, string func, Waitables...)(ref Waitables static if (interruptible) { bool interrupted = false; hibernate(() @safe nothrow { - logTrace("Got interrupted."); + logTrace("Got interrupted in %s.", func); interrupted = true; }); if (interrupted) throw new InterruptException; } else hibernate(); - } while (!fired[].any()); + } while (!any_fired); - logTrace("Return result."); + logTrace("Return result for %s.", func); } private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow; + +private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op !is null) op(); } } + +@safe nothrow /*@nogc*/ unittest { + int cnt = 0; + auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); }); + assert(ret[0] == 42); + assert(cnt == 1); +} \ No newline at end of file