Implement DNS lookups and partially implement UDP.

This commit is contained in:
Sönke Ludwig 2016-10-25 00:27:51 +02:00
parent 35761dfc9a
commit 4db9b3f100
2 changed files with 111 additions and 14 deletions

View file

@ -49,7 +49,12 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr
return ret;
} else {
enforce(use_dns, "Malformed IP address string.");
assert(false, "DNS lookup not implemented."); // TODO
auto res = asyncAwait!(DNSLookupCallback,
cb => eventDriver.dns.lookupHost(host, cb),
(cb, id) => eventDriver.dns.cancelLookup(id)
);
enforce(res[1] == DNSStatus.ok && res[2].length > 0, "Failed to lookup host '"~host~"'.");
return NetworkAddress(res[2][0]);
}
}
@ -133,7 +138,9 @@ TCPConnection connectTCP(NetworkAddress addr)
*/
UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
{
assert(false);
auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false);
addr.port = port;
return UDPConnection(addr);
}
@ -147,6 +154,8 @@ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
Represents a network/socket address.
*/
struct NetworkAddress {
import std.socket : Address;
version (Windows) import std.c.windows.winsock;
else import core.sys.posix.netinet.in_;
@ -158,6 +167,25 @@ struct NetworkAddress {
sockaddr_in6 addr_ip6;
}
this(Address addr)
@trusted
{
assert(addr !is null);
switch (addr.addressFamily) {
default: throw new Exception("Unsupported address family.");
case AddressFamily.INET:
this.family = AddressFamily.INET;
assert(addr.nameLen >= sockaddr_in.sizeof);
*this.sockAddrInet4 = *cast(sockaddr_in*)addr.name;
break;
case AddressFamily.INET6:
this.family = AddressFamily.INET6;
assert(addr.nameLen >= sockaddr_in6.sizeof);
*this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name;
break;
}
}
/** Family of the socket address.
*/
@property ushort family() const pure nothrow { return addr.sa_family; }
@ -274,14 +302,14 @@ struct NetworkAddress {
}
UnknownAddress toUnknownAddress()
const {
const nothrow {
auto ret = new UnknownAddress;
toUnknownAddress(ret);
return ret;
}
void toUnknownAddress(scope UnknownAddress addr)
const {
const nothrow {
*addr.name = *this.sockAddr;
}
@ -506,6 +534,29 @@ struct TCPListener {
Represents a bound and possibly 'connected' UDP socket.
*/
struct UDPConnection {
private {
DatagramSocketFD m_socket;
}
private this(ref NetworkAddress bind_address)
{
m_socket = eventDriver.sockets.createDatagramSocket(bind_address.toUnknownAddress(), null);
}
this(this)
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.addRef(m_socket);
}
~this()
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket);
}
/** Returns the address to which the UDP socket is bound.
*/
@property string bindAddress() const { assert(false); }
@ -537,7 +588,14 @@ struct UDPConnection {
If peer_address is given, the packet is send to that address. Otherwise the packet
will be sent to the address specified by a call to connect().
*/
void send(in ubyte[] data, in NetworkAddress* peer_address = null) { assert(false); }
void send(in ubyte[] data, in NetworkAddress* peer_address = null) {
auto ret = asyncAwait!(DatagramIOCallback,
cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? peer_address.toUnknownAddress : null, cb),
cb => eventDriver.sockets.cancelSend(m_socket)
);
enforce(ret[1] == IOStatus.ok, "Failed to send packet.");
enforce(ret[2] == data.length, "Packet was only sent partially.");
}
/** Receives a single packet.
@ -546,9 +604,24 @@ struct UDPConnection {
The timeout overload will throw an Exception if no data arrives before the
specified duration has elapsed.
*/
ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) { assert(false); }
ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
{
return recv(Duration.max, buf, peer_address);
}
/// ditto
ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) { assert(false); }
ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
{
import std.socket : Address;
if (buf.length == 0) buf = new ubyte[65536];
auto res = asyncAwait!(DatagramIOCallback,
cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
cb => eventDriver.sockets.cancelReceive(m_socket)
)(timeout);
enforce(res.completed, "Receive timeout.");
enforce(res.results[1] == IOStatus.ok, "Failed to receive packet.");
if (peer_address) *peer_address = NetworkAddress(res.results[3]);
return buf[0 .. res.results[2]];
}
}

View file

@ -19,12 +19,18 @@ auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__
{
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
asyncAwaitAny!(true, func)(timeout, waitable);
return tuple(waitable.results);
static struct R {
bool completed;
typeof(waitable.results) results;
}
return R(!waitable.cancelled, waitable.results);
}
auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)()
nothrow {
Waitable!(action, (cb) { assert(false, "Action cannot be cancelled."); }, ParameterTypeTuple!Callback) waitable;
static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); }
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
asyncAwaitAny!(false, func)(waitable);
return tuple(waitable.results);
}
@ -37,11 +43,17 @@ nothrow {
}
struct Waitable(alias wait, alias cancel, Results...) {
import std.traits : ReturnType;
alias Callback = void delegate(Results) @safe nothrow;
Results results;
bool cancelled;
void waitCallback(Callback cb) { wait(cb); }
void cancelCallback(Callback cb) { cancel(cb); }
auto waitCallback(Callback cb) nothrow { return wait(cb); }
static if (is(ReturnType!waitCallback == void))
void cancelCallback(Callback cb) nothrow { cancel(cb); }
else
void cancelCallback(Callback cb, ReturnType!waitCallback r) nothrow { cancel(cb, r); }
}
void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables)
@ -67,6 +79,7 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
{
import std.meta : staticMap;
import std.algorithm.searching : any;
import std.traits : ReturnType;
/*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates
@ -80,10 +93,13 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", waitables.length, func);
() @trusted { logDebugV("si %x", &still_inside); } ();
foreach (i, W; Waitables) {
/*scope*/auto cb = (typeof(Waitables[i].results) results) @safe nothrow {
() @trusted { logDebugV("siw %x", &still_inside); } ();
debug(VibeAsyncLog) logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init);
assert(still_inside, "Notification fired after asyncAwait had already returned!");
logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init);
fired[i] = true;
any_fired = true;
waitables[i].results = results;
@ -92,12 +108,18 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
callbacks[i] = cb;
debug(VibeAsyncLog) logDebugV("Starting operation %s", i);
waitables[i].waitCallback(callbacks[i]);
static if (is(ReturnType!(W.waitCallback) == void))
waitables[i].waitCallback(callbacks[i]);
else
auto wr = waitables[i].waitCallback(callbacks[i]);
scope ccb = () @safe nothrow {
if (!fired[i]) {
debug(VibeAsyncLog) logDebugV("Cancelling operation %s", i);
waitables[i].cancelCallback(callbacks[i]);
static if (is(ReturnType!(W.waitCallback) == void))
waitables[i].cancelCallback(callbacks[i]);
else
waitables[i].cancelCallback(callbacks[i], wr);
waitables[i].cancelled = true;
any_fired = true;
fired[i] = true;
@ -115,6 +137,8 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
t = Task.getThis();
debug (VibeAsyncLog) scope (failure) logDebugV("Aborting wait due to exception");
do {
static if (interruptible) {
bool interrupted = false;