Make the callback type the first argument to Waitable!().

This commit is contained in:
Sönke Ludwig 2017-01-16 00:23:37 +01:00
parent 964d72f3b5
commit 9d4e8086ff
3 changed files with 20 additions and 27 deletions

View file

@ -53,10 +53,9 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr
enforce(use_dns, "Malformed IP address string."); enforce(use_dns, "Malformed IP address string.");
NetworkAddress res; NetworkAddress res;
bool success = false; bool success = false;
Waitable!( Waitable!(DNSLookupCallback,
cb => eventDriver.dns.lookupHost(host, cb), cb => eventDriver.dns.lookupHost(host, cb),
(cb, id) => eventDriver.dns.cancelLookup(id), (cb, id) => eventDriver.dns.cancelLookup(id),
DNSLookupCallback,
(DNSLookupID, DNSStatus status, scope RefAddress[] addrs) { (DNSLookupID, DNSStatus status, scope RefAddress[] addrs) {
if (status == DNSStatus.ok && addrs.length > 0) { if (status == DNSStatus.ok && addrs.length > 0) {
try res = NetworkAddress(addrs[0]); try res = NetworkAddress(addrs[0]);
@ -473,10 +472,9 @@ mixin(tracer);
if (m_context.readBuffer.length > 0) return true; if (m_context.readBuffer.length > 0) return true;
auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
Waitable!( Waitable!(IOCallback,
cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb),
cb => eventDriver.sockets.cancelRead(m_socket), cb => eventDriver.sockets.cancelRead(m_socket)
IOCallback
) waiter; ) waiter;
asyncAwaitAny!true(timeout, waiter); asyncAwaitAny!true(timeout, waiter);
@ -718,10 +716,9 @@ struct UDPConnection {
IOStatus status; IOStatus status;
size_t nbytes; size_t nbytes;
Waitable!( Waitable!(DatagramIOCallback,
cb => eventDriver.sockets.send(m_socket, data, IOMode.once, addrc, cb), cb => eventDriver.sockets.send(m_socket, data, IOMode.once, addrc, cb),
cb => eventDriver.sockets.cancelSend(m_socket), cb => eventDriver.sockets.cancelSend(m_socket),
DatagramIOCallback,
(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
{ {
status = status_; status = status_;
@ -755,10 +752,9 @@ struct UDPConnection {
IOStatus status; IOStatus status;
size_t nbytes; size_t nbytes;
Waitable!( Waitable!(DatagramIOCallback,
cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
cb => eventDriver.sockets.cancelReceive(m_socket), cb => eventDriver.sockets.cancelReceive(m_socket),
DatagramIOCallback,
(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
{ {
status = status_; status = status_;

View file

@ -1091,10 +1091,9 @@ private struct ThreadLocalWaiter {
target_timeout = now + timeout; target_timeout = now + timeout;
} }
Waitable!( Waitable!(typeof(Waiter.notifier),
cb => w.wait(cb), cb => w.wait(cb),
cb => w.cancel(), cb => w.cancel(),
typeof(Waiter.notifier)
) waitable; ) waitable;
void removeWaiter() void removeWaiter()
@ -1114,7 +1113,7 @@ private struct ThreadLocalWaiter {
scope (failure) removeWaiter(); scope (failure) removeWaiter();
if (evt != EventID.invalid) { if (evt != EventID.invalid) {
Waitable!( Waitable!(EventCallback,
(cb) { (cb) {
eventDriver.events.wait(evt, cb); eventDriver.events.wait(evt, cb);
// check for exit codition *after* starting to wait for the event // check for exit codition *after* starting to wait for the event
@ -1124,8 +1123,7 @@ private struct ThreadLocalWaiter {
cb(evt); cb(evt);
} }
}, },
cb => eventDriver.events.cancelWait(evt, cb), cb => eventDriver.events.cancelWait(evt, cb)
EventCallback
) ewaitable; ) ewaitable;
asyncAwaitAny!interruptible(timeout, waitable, ewaitable); asyncAwaitAny!interruptible(timeout, waitable, ewaitable);
} else { } else {

View file

@ -10,14 +10,14 @@ import core.time : Duration, seconds;
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)() auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)()
if (!is(Object == Duration)) { if (!is(Object == Duration)) {
Waitable!(action, cancel, Callback) waitable; Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(true, func)(waitable); asyncAwaitAny!(true, func)(waitable);
return tuple(waitable.results); return tuple(waitable.results);
} }
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
{ {
Waitable!(action, cancel, Callback) waitable; Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(true, func)(timeout, waitable); asyncAwaitAny!(true, func)(timeout, waitable);
static struct R { static struct R {
bool completed; bool completed;
@ -30,19 +30,19 @@ auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION_
nothrow { nothrow {
static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); } static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); } else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); }
Waitable!(action, cancel, Callback) waitable; Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(false, func)(waitable); asyncAwaitAny!(false, func)(waitable);
return tuple(waitable.results); return tuple(waitable.results);
} }
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
nothrow { nothrow {
Waitable!(action, cancel, Callback) waitable; Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(false, func)(timeout, waitable); asyncAwaitAny!(false, func)(timeout, waitable);
return tuple(waitable.results); return tuple(waitable.results);
} }
struct Waitable(alias wait, alias cancel, CB, on_result...) struct Waitable(CB, alias wait, alias cancel, on_result...)
if (on_result.length <= 1) if (on_result.length <= 1)
{ {
import std.traits : ReturnType; import std.traits : ReturnType;
@ -77,10 +77,9 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
auto tm = eventDriver.timers.create(); auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, timeout, 0.seconds); eventDriver.timers.set(tm, timeout, 0.seconds);
scope (exit) eventDriver.timers.releaseRef(tm); scope (exit) eventDriver.timers.releaseRef(tm);
Waitable!( Waitable!(TimerCallback,
cb => eventDriver.timers.wait(tm, cb), cb => eventDriver.timers.wait(tm, cb),
cb => eventDriver.timers.cancelWait(tm), cb => eventDriver.timers.cancelWait(tm)
TimerCallback
) timerwaitable; ) timerwaitable;
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables); asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);
} }
@ -188,19 +187,19 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op
@safe nothrow /*@nogc*/ unittest { @safe nothrow /*@nogc*/ unittest {
int a, b, c; int a, b, c;
Waitable!( Waitable!(
void delegate(int) @safe nothrow,
(cb) { a++; cb(42); }, (cb) { a++; cb(42); },
(cb) { assert(false); }, (cb) { assert(false); }
void delegate(int) @safe nothrow
) w1; ) w1;
Waitable!( Waitable!(
void delegate(int) @safe nothrow,
(cb) { b++; }, (cb) { b++; },
(cb) { c++; }, (cb) { c++; }
void delegate(int) @safe nothrow
) w2; ) w2;
Waitable!( Waitable!(
void delegate(int) @safe nothrow,
(cb) { c++; cb(42); }, (cb) { c++; cb(42); },
(cb) { assert(false); }, (cb) { assert(false); },
void delegate(int) @safe nothrow,
(int n) { assert(n == 42); } (int n) { assert(n == 42); }
) w3; ) w3;