Fix handling of scoped callback parameters in eventcore callbacks.
This commit is contained in:
parent
32d360baac
commit
964d72f3b5
|
@ -51,12 +51,25 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr
|
||||||
return ret;
|
return ret;
|
||||||
} else {
|
} else {
|
||||||
enforce(use_dns, "Malformed IP address string.");
|
enforce(use_dns, "Malformed IP address string.");
|
||||||
auto res = asyncAwait!(DNSLookupCallback,
|
NetworkAddress res;
|
||||||
|
bool success = false;
|
||||||
|
Waitable!(
|
||||||
cb => eventDriver.dns.lookupHost(host, cb),
|
cb => eventDriver.dns.lookupHost(host, cb),
|
||||||
(cb, id) => eventDriver.dns.cancelLookup(id)
|
(cb, id) => eventDriver.dns.cancelLookup(id),
|
||||||
);
|
DNSLookupCallback,
|
||||||
enforce(res[1] == DNSStatus.ok && res[2].length > 0, "Failed to lookup host '"~host~"'.");
|
(DNSLookupID, DNSStatus status, scope RefAddress[] addrs) {
|
||||||
return NetworkAddress(res[2][0]);
|
if (status == DNSStatus.ok && addrs.length > 0) {
|
||||||
|
try res = NetworkAddress(addrs[0]);
|
||||||
|
catch (Exception e) { logDiagnostic("Failed to store address from DNS lookup: %s", e.msg); }
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
) waitable;
|
||||||
|
|
||||||
|
asyncAwaitAny!true(waitable);
|
||||||
|
|
||||||
|
enforce(success, "Failed to lookup host '"~host~"'.");
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -702,12 +715,24 @@ struct UDPConnection {
|
||||||
auto addr = () @trusted { return peer_address ? peer_address : &m_context.remoteAddress; } ();
|
auto addr = () @trusted { return peer_address ? peer_address : &m_context.remoteAddress; } ();
|
||||||
scope addrc = new RefAddress(() @trusted { return (cast(NetworkAddress*)addr).sockAddr; } (), addr.sockAddrLen);
|
scope addrc = new RefAddress(() @trusted { return (cast(NetworkAddress*)addr).sockAddr; } (), addr.sockAddrLen);
|
||||||
|
|
||||||
auto ret = asyncAwait!(DatagramIOCallback,
|
IOStatus status;
|
||||||
|
size_t nbytes;
|
||||||
|
|
||||||
|
Waitable!(
|
||||||
cb => eventDriver.sockets.send(m_socket, data, IOMode.once, addrc, cb),
|
cb => eventDriver.sockets.send(m_socket, data, IOMode.once, addrc, cb),
|
||||||
cb => eventDriver.sockets.cancelSend(m_socket)
|
cb => eventDriver.sockets.cancelSend(m_socket),
|
||||||
);
|
DatagramIOCallback,
|
||||||
enforce(ret[1] == IOStatus.ok, "Failed to send packet.");
|
(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
|
||||||
enforce(ret[2] == data.length, "Packet was only sent partially.");
|
{
|
||||||
|
status = status_;
|
||||||
|
nbytes = nbytes_;
|
||||||
|
}
|
||||||
|
) waitable;
|
||||||
|
|
||||||
|
asyncAwaitAny!true(waitable);
|
||||||
|
|
||||||
|
enforce(!waitable.cancelled && status == IOStatus.ok, "Failed to send packet.");
|
||||||
|
enforce(nbytes == data.length, "Packet was only sent partially.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Receives a single packet.
|
/** Receives a single packet.
|
||||||
|
@ -726,14 +751,29 @@ struct UDPConnection {
|
||||||
{
|
{
|
||||||
import std.socket : Address;
|
import std.socket : Address;
|
||||||
if (buf.length == 0) buf = new ubyte[65536];
|
if (buf.length == 0) buf = new ubyte[65536];
|
||||||
auto res = asyncAwait!(DatagramIOCallback,
|
|
||||||
|
IOStatus status;
|
||||||
|
size_t nbytes;
|
||||||
|
|
||||||
|
Waitable!(
|
||||||
cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
|
cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
|
||||||
cb => eventDriver.sockets.cancelReceive(m_socket)
|
cb => eventDriver.sockets.cancelReceive(m_socket),
|
||||||
)(timeout);
|
DatagramIOCallback,
|
||||||
enforce(res.completed, "Receive timeout.");
|
(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
|
||||||
enforce(res.results[1] == IOStatus.ok, "Failed to receive packet.");
|
{
|
||||||
if (peer_address) *peer_address = NetworkAddress(res.results[3]);
|
status = status_;
|
||||||
return buf[0 .. res.results[2]];
|
nbytes = nbytes_;
|
||||||
|
if (status_ == IOStatus.ok && peer_address) {
|
||||||
|
try *peer_address = NetworkAddress(addr);
|
||||||
|
catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
) waitable;
|
||||||
|
|
||||||
|
asyncAwaitAny!true(timeout, waitable);
|
||||||
|
enforce(!waitable.cancelled, "Receive timeout.");
|
||||||
|
enforce(status == IOStatus.ok, "Failed to receive packet.");
|
||||||
|
return buf[0 .. nbytes];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ struct Waitable(alias wait, alias cancel, CB, on_result...)
|
||||||
alias Callback = CB;
|
alias Callback = CB;
|
||||||
|
|
||||||
static if (on_result.length == 0) {
|
static if (on_result.length == 0) {
|
||||||
|
static assert(!hasAnyScopeParameter!Callback, "Need to retrieve results with a callback because of scoped parameter");
|
||||||
ParameterTypeTuple!Callback results;
|
ParameterTypeTuple!Callback results;
|
||||||
void setResult(ref ParameterTypeTuple!Callback r) { this.results = r; }
|
void setResult(ref ParameterTypeTuple!Callback r) { this.results = r; }
|
||||||
} else {
|
} else {
|
||||||
|
@ -245,3 +246,11 @@ private string generateParamNames(Fun)()
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private template hasAnyScopeParameter(Callback) {
|
||||||
|
import std.algorithm.searching : any;
|
||||||
|
import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
|
||||||
|
alias SC = ParameterStorageClassTuple!Callback;
|
||||||
|
static if (SC.length == 0) enum hasAnyScopeParameter = false;
|
||||||
|
else enum hasAnyScopeParameter = any!(c => c & ParameterStorageClass.scope_)([SC]);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue