diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index a9d2684..b2c273f 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1010,14 +1010,17 @@ struct FileDescriptorEvent { assert((which & m_trigger) == Trigger.read, "Waiting for write event not yet supported."); - Waitable!(IOCallback, + bool got_data; + + alias readwaiter = Waitable!(IOCallback, cb => eventDriver.sockets.waitForData(m_socket, cb), - cb => eventDriver.sockets.cancelRead(m_socket) - ) readwaiter; + cb => eventDriver.sockets.cancelRead(m_socket), + (StreamSocketFD fd, IOStatus st, size_t nb) { got_data = st == IOStatus.ok; } + ); - asyncAwaitAny!true(timeout, readwaiter); + asyncAwaitAny!(true, readwaiter)(timeout); - return !readwaiter.cancelled; + return got_data; } } diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 14e0b39..837ff6d 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -53,7 +53,7 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr enforce(use_dns, "Malformed IP address string."); NetworkAddress res; bool success = false; - Waitable!(DNSLookupCallback, + alias waitable = Waitable!(DNSLookupCallback, cb => eventDriver.dns.lookupHost(host, cb), (cb, id) => eventDriver.dns.cancelLookup(id), (DNSLookupID, DNSStatus status, scope RefAddress[] addrs) { @@ -63,9 +63,9 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr success = true; } } - ) waitable; + ); - asyncAwaitAny!true(waitable); + asyncAwaitAny!(true, waitable); enforce(success, "Failed to lookup host '"~host~"'."); return res; @@ -542,22 +542,27 @@ mixin(tracer); if (m_context.readBuffer.length > 0) return true; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; - Waitable!(IOCallback, + bool cancelled; + IOStatus status; + size_t nbytes; + + alias waiter = Waitable!(IOCallback, cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), - cb => eventDriver.sockets.cancelRead(m_socket) - ) waiter; + (cb) { cancelled = true; eventDriver.sockets.cancelRead(m_socket); }, + (sock, st, nb) { assert(sock == m_socket); status = st; nbytes = nb; } + ); - asyncAwaitAny!true(timeout, waiter); + asyncAwaitAny!(true, waiter)(timeout); - if (waiter.cancelled) return false; + if (cancelled) return false; - logTrace("Socket %s, read %s bytes: %s", waiter.results[0], waiter.results[2], waiter.results[1]); + logTrace("Socket %s, read %s bytes: %s", m_socket, nbytes, status); assert(m_context.readBuffer.length == 0); - m_context.readBuffer.putN(waiter.results[2]); - switch (waiter.results[1]) { + m_context.readBuffer.putN(nbytes); + switch (status) { default: - logDebug("Error status when waiting for data: %s", waiter.results[1]); + logDebug("Error status when waiting for data: %s", status); break; case IOStatus.ok: break; case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break; @@ -837,20 +842,21 @@ struct UDPConnection { IOStatus status; size_t nbytes; + bool cancelled; - Waitable!(DatagramIOCallback, + alias waitable = Waitable!(DatagramIOCallback, cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? addrc : null, cb), - cb => eventDriver.sockets.cancelSend(m_socket), + (cb) { cancelled = true; eventDriver.sockets.cancelSend(m_socket); }, (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) { status = status_; nbytes = nbytes_; } - ) waitable; + ); - asyncAwaitAny!true(waitable); + asyncAwaitAny!(true, waitable); - enforce(!waitable.cancelled && status == IOStatus.ok, "Failed to send packet."); + enforce(!cancelled && status == IOStatus.ok, "Failed to send packet."); enforce(nbytes == data.length, "Packet was only sent partially."); } @@ -873,10 +879,11 @@ struct UDPConnection { IOStatus status; size_t nbytes; + bool cancelled; - Waitable!(DatagramIOCallback, + alias waitable = Waitable!(DatagramIOCallback, cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), - cb => eventDriver.sockets.cancelReceive(m_socket), + (cb) { cancelled = true; eventDriver.sockets.cancelReceive(m_socket); }, (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) { status = status_; @@ -886,10 +893,10 @@ struct UDPConnection { catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg); } } - ) waitable; + ); - asyncAwaitAny!true(timeout, waitable); - enforce(!waitable.cancelled, "Receive timeout."); + asyncAwaitAny!(true, waitable)(timeout); + enforce(!cancelled, "Receive timeout."); enforce(status == IOStatus.ok, "Failed to receive packet."); return buf[0 .. nbytes]; } diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 7e93b61..6881cd0 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1236,15 +1236,14 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { private { static struct TaskWaiter { TaskWaiter* prev, next; - Task task; void delegate() @safe nothrow notifier; - bool cancelled; void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null, "Local waiter is used twice!"); notifier = del; } - void cancel() @safe nothrow { cancelled = true; notifier = null; } + void cancel() @safe nothrow { notifier = null; } + void emit() @safe nothrow { auto n = notifier; notifier = null; n(); } } static if (EVENT_TRIGGERED) { @@ -1301,30 +1300,35 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { target_timeout = now + timeout; } - Waitable!(typeof(TaskWaiter.notifier), - cb => waiter.wait(cb), - cb => waiter.cancel(), - ) waitable; + bool cancelled; + + alias waitable = Waitable!(typeof(TaskWaiter.notifier), + (cb) { waiter.wait(cb); }, + (cb) { cancelled = true; waiter.cancel(); }, + () {} + ); + + alias ewaitable = Waitable!(EventCallback, + (cb) { + eventDriver.events.wait(evt, cb); + // check for exit condition *after* starting to wait for the event + // to avoid a race condition + if (exit_condition()) { + eventDriver.events.cancelWait(evt, cb); + cb(evt); + } + }, + (cb) { eventDriver.events.cancelWait(evt, cb); }, + (EventID) {} + ); if (evt != EventID.invalid) { - Waitable!(EventCallback, - (cb) { - eventDriver.events.wait(evt, cb); - // check for exit condition *after* starting to wait for the event - // to avoid a race condition - if (exit_condition()) { - eventDriver.events.cancelWait(evt, cb); - cb(evt); - } - }, - cb => eventDriver.events.cancelWait(evt, cb) - ) ewaitable; - asyncAwaitAny!interruptible(timeout, waitable, ewaitable); + asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout); } else { - asyncAwaitAny!interruptible(timeout, waitable); + asyncAwaitAny!(interruptible, waitable)(timeout); } - if (waitable.cancelled) { + if (cancelled) { assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); return false; } else { @@ -1363,6 +1367,19 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { bool emitSingle() @safe nothrow { if (m_waiters.empty) return false; + + TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); + + if (pivot.next) { // another emit in progress? + // shift pivot to the right, so that the other emit call will process another waiter + if (pivot !is m_waiters.back) { + auto n = pivot.next; + m_waiters.remove(pivot); + m_waiters.insertAfter(pivot, n); + } + return true; + } + emitWaiter(m_waiters.front); return true; } @@ -1373,8 +1390,7 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { if (w.notifier !is null) { logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); - w.notifier(); - w.notifier = null; + w.emit(); } else logTrace("notify callback is null"); } } diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 6c19b5f..a2369b8 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -8,85 +8,88 @@ import vibe.core.log; import core.time : Duration, seconds; -auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)() +auto asyncAwait(Callback, alias action, alias cancel)(string func = __FUNCTION__) if (!is(Object == Duration)) { - Waitable!(Callback, action, cancel) waitable; - asyncAwaitAny!(true, func)(waitable); - return tuple(waitable.results); + ParameterTypeTuple!Callback results; + alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); + asyncAwaitAny!(true, waitable)(func); + return tuple(results); } -auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) +auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__) { - Waitable!(Callback, action, cancel) waitable; - asyncAwaitAny!(true, func)(timeout, waitable); static struct R { - bool completed; + bool completed = true; typeof(waitable.results) results; } - return R(!waitable.cancelled, waitable.results); + R ret; + alias waitable = Waitable!(Callback, + action, + (cb) { ret.completed = false; cancel(cb); }, + (ParameterTypeTuple!Callback r) { ret.results = r; } + ); + asyncAwaitAny!(true, waitable)(timeout, func); + return ret; } -auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)() +auto asyncAwaitUninterruptible(Callback, alias action)(string func = __FUNCTION__) nothrow { static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); } - else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); } - Waitable!(Callback, action, cancel) waitable; - asyncAwaitAny!(false, func)(waitable); - return tuple(waitable.results); + else void cancel(Callback, typeof(action(Callback.init))) @safe @nogc nothrow { assert(false, "Action cannot be cancelled."); } + ParameterTypeTuple!Callback results; + alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); + asyncAwaitAny!(false, waitable)(func); + return tuple(results); } -auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) +auto asyncAwaitUninterruptible(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__) nothrow { - Waitable!(Callback, action, cancel) waitable; - asyncAwaitAny!(false, func)(timeout, waitable); - return tuple(waitable.results); + ParameterTypeTuple!Callback results; + alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); + asyncAwaitAny!(false, waitable)(timeout, func); + return tuple(results); } -struct Waitable(CB, alias wait, alias cancel, on_result...) - if (on_result.length <= 1) +template Waitable(CB, alias WAIT, alias CANCEL, alias DONE) { import std.traits : ReturnType; - alias Callback = CB; - - static if (on_result.length == 0) { - static assert(!hasAnyScopeParameter!Callback, "Need to retrieve results with a callback because of scoped parameter"); - ParameterTypeTuple!Callback results; - void setResult(ref ParameterTypeTuple!Callback r) { this.results = r; } - } else { - import std.format : format; - alias PTypes = ParameterTypeTuple!Callback; - mixin(q{void setResult(%s) { on_result[0](%s); }}.format(generateParamDecls!Callback, generateParamNames!Callback)); - } - - bool cancelled; - auto waitCallback(Callback cb) nothrow { return wait(cb); } - - static if (is(ReturnType!waitCallback == void)) - void cancelCallback(Callback cb) nothrow { cancel(cb); } + static assert(is(typeof(WAIT(CB.init))), "WAIT must be callable with a parameter of type "~CB.stringof); + static if (is(typeof(WAIT(CB.init)) == void)) + static assert(is(typeof(CANCEL(CB.init))), + "CANCEL must be callable with a parameter of type "~CB.stringof); else - void cancelCallback(Callback cb, ReturnType!waitCallback r) nothrow { cancel(cb, r); } + static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))), + "CANCEL must be callable with parameters ("~CB.stringof~", "~typeof(WAIT(CB.init)).stringof~")"); + static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))), + "DONE must be callable with types "~ParameterTypeTuple!CB.stringof); + + alias Callback = CB; + alias wait = WAIT; + alias cancel = CANCEL; + alias done = DONE; } -void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables) +void asyncAwaitAny(bool interruptible, Waitables...)(Duration timeout, string func = __FUNCTION__) { - if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables); + if (timeout == Duration.max) asyncAwaitAny!(interruptible, Waitables)(func); else { import eventcore.core; auto tm = eventDriver.timers.create(); eventDriver.timers.set(tm, timeout, 0.seconds); scope (exit) eventDriver.timers.releaseRef(tm); - Waitable!(TimerCallback, + alias timerwaitable = Waitable!(TimerCallback, cb => eventDriver.timers.wait(tm, cb), - cb => eventDriver.timers.cancelWait(tm) - ) timerwaitable; - asyncAwaitAny!(interruptible, func)(timerwaitable, waitables); + cb => eventDriver.timers.cancelWait(tm), + (tid) {} + ); + asyncAwaitAny!(interruptible, timerwaitable, Waitables)(func); } } -void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(ref Waitables waitables) - if (Waitables.length >= 1 && !is(Waitables[0] == Duration)) +void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__) + if (Waitables.length >= 1) { import std.meta : staticMap; import std.algorithm.searching : any; @@ -94,60 +97,60 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) import std.meta : AliasSeq; import std.traits : ReturnType; - /*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates - bool[Waitables.length] fired; - ScopeGuard[Waitables.length] scope_guards; bool any_fired = false; Task t; bool still_inside = true; scope (exit) still_inside = false; - debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", waitables.length, func); + debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", Waitables.length, func); () @trusted { logDebugV("si %x", &still_inside); } (); - foreach (i, W; Waitables) { - alias PTypes = ParameterTypeTuple!(CBDel!W); - /*scope*/auto cb = mixin(q{(%s) @safe nothrow { - () @trusted { logDebugV("siw %%x", &still_inside); } (); - debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", i, func, t != Task.init); - assert(still_inside, "Notification fired after asyncAwait had already returned!"); - fired[i] = true; - any_fired = true; - static if (PTypes.length) - waitables[i].setResult(%s); - if (t != Task.init) switchToTask(t); - }}.format(generateParamDecls!(CBDel!W), generateParamNames!(CBDel!W))); - callbacks[i] = cb; + static string waitableCode() + { + string ret; + foreach (i, W; Waitables) { + alias PTypes = ParameterTypeTuple!(CBDel!W); + ret ~= q{ + alias PT%1$s = ParameterTypeTuple!(Waitables[%1$s].Callback); + scope callback_%1$s = (%2$s) @safe nothrow { + () @trusted { logDebugV("siw %%x", &still_inside); } (); + debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", %1$s, func, t != Task.init); + assert(still_inside, "Notification fired after asyncAwait had already returned!"); + fired[%1$s] = true; + any_fired = true; + Waitables[%1$s].done(%3$s); + if (t != Task.init) switchToTask(t); + }; - debug(VibeAsyncLog) logDebugV("Starting operation %s", i); - static if (is(ReturnType!(W.waitCallback) == void)) - waitables[i].waitCallback(callbacks[i]); - else - auto wr = waitables[i].waitCallback(callbacks[i]); + debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s); + alias WR%1$s = typeof(Waitables[%1$s].wait(callback_%1$s)); + static if (is(WR%1$s == void)) Waitables[%1$s].wait(callback_%1$s); + else auto wr%1$s = Waitables[%1$s].wait(callback_%1$s); - scope ccb = () @safe nothrow { - if (!fired[i]) { - debug(VibeAsyncLog) logDebugV("Cancelling operation %s", i); - static if (is(ReturnType!(W.waitCallback) == void)) - waitables[i].cancelCallback(callbacks[i]); - else - waitables[i].cancelCallback(callbacks[i], wr); - waitables[i].cancelled = true; - any_fired = true; - fired[i] = true; - } - }; - scope_guards[i] = ScopeGuard(ccb); + scope (exit) { + if (!fired[%1$s]) { + debug(VibeAsyncLog) logDebugV("Cancelling operation %%s", %1$s); + static if (is(WR%1$s == void)) Waitables[%1$s].cancel(callback_%1$s); + else Waitables[%1$s].cancel(callback_%1$s, wr%1$s); + any_fired = true; + fired[%1$s] = true; + } + } - if (any_fired) { - debug(VibeAsyncLog) logDebugV("Returning to %s without waiting.", func); - return; + if (any_fired) { + debug(VibeAsyncLog) logDebugV("Returning to %%s without waiting.", func); + return; + } + }.format(i, generateParamDecls!(CBDel!W)(format("PT%s", i)), generateParamNames!(CBDel!W)); } + return ret; } + mixin(waitableCode()); + debug(VibeAsyncLog) logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible"); t = Task.getThis(); @@ -173,9 +176,7 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) debug(VibeAsyncLog) logDebugV("Return result for %s.", func); } -private alias CBDel(Waitable) = Waitable.Callback; - -private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op !is null) op(); } } +private alias CBDel(alias Waitable) = Waitable.Callback; @safe nothrow /*@nogc*/ unittest { int cnt = 0; @@ -186,40 +187,45 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op @safe nothrow /*@nogc*/ unittest { int a, b, c; - Waitable!( + int w1r, w2r; + alias w1 = Waitable!( void delegate(int) @safe nothrow, (cb) { a++; cb(42); }, - (cb) { assert(false); } - ) w1; - Waitable!( + (cb) { assert(false); }, + (i) { w1r = i; } + ); + alias w2 = Waitable!( void delegate(int) @safe nothrow, (cb) { b++; }, - (cb) { c++; } - ) w2; - Waitable!( + (cb) { c++; }, + (i) { w2r = i; } + ); + alias w3 = Waitable!( void delegate(int) @safe nothrow, (cb) { c++; cb(42); }, (cb) { assert(false); }, (int n) { assert(n == 42); } - ) w3; + ); - asyncAwaitAny!false(w1, w2); - assert(w1.results[0] == 42 && w2.results[0] == 0); + asyncAwaitAny!(false, w1, w2); + assert(w1r == 42 && w2r == 0); assert(a == 1 && b == 0 && c == 0); - asyncAwaitAny!false(w2, w1); - assert(w1.results[0] == 42 && w2.results[0] == 0); + asyncAwaitAny!(false, w2, w1); + assert(w1r == 42 && w2r == 0); assert(a == 2 && b == 1 && c == 1); - asyncAwaitAny!false(w3); + asyncAwaitAny!(false, w3); assert(c == 2); } -private string generateParamDecls(Fun)() +private string generateParamDecls(Fun)(string ptypes_name = "PTypes") { import std.format : format; import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple; + if (!__ctfe) assert(false); + alias Types = ParameterTypeTuple!Fun; alias SClasses = ParameterStorageClassTuple!Fun; string ret; @@ -229,7 +235,7 @@ private string generateParamDecls(Fun)() static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope "; static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out "; static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref "; - ret ~= format("PTypes[%s] param_%s", i, i); + ret ~= format("%s[%s] param_%s", ptypes_name, i, i); } return ret; } @@ -237,6 +243,7 @@ private string generateParamDecls(Fun)() private string generateParamNames(Fun)() { import std.format : format; + if (!__ctfe) assert(false); string ret; foreach (i, T; ParameterTypeTuple!Fun) {