From 4db9b3f1007b508b8867c797ba772f85adef9762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 25 Oct 2016 00:27:51 +0200 Subject: [PATCH] Implement DNS lookups and partially implement UDP. --- source/vibe/core/net.d | 87 +++++++++++++++++++++++++++++++++--- source/vibe/internal/async.d | 38 +++++++++++++--- 2 files changed, 111 insertions(+), 14 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 0bb4cf0..2e7943d 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -49,7 +49,12 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr return ret; } else { enforce(use_dns, "Malformed IP address string."); - assert(false, "DNS lookup not implemented."); // TODO + auto res = asyncAwait!(DNSLookupCallback, + cb => eventDriver.dns.lookupHost(host, cb), + (cb, id) => eventDriver.dns.cancelLookup(id) + ); + enforce(res[1] == DNSStatus.ok && res[2].length > 0, "Failed to lookup host '"~host~"'."); + return NetworkAddress(res[2][0]); } } @@ -133,7 +138,9 @@ TCPConnection connectTCP(NetworkAddress addr) */ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") { - assert(false); + auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false); + addr.port = port; + return UDPConnection(addr); } @@ -147,6 +154,8 @@ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") Represents a network/socket address. */ struct NetworkAddress { + import std.socket : Address; + version (Windows) import std.c.windows.winsock; else import core.sys.posix.netinet.in_; @@ -158,6 +167,25 @@ struct NetworkAddress { sockaddr_in6 addr_ip6; } + this(Address addr) + @trusted + { + assert(addr !is null); + switch (addr.addressFamily) { + default: throw new Exception("Unsupported address family."); + case AddressFamily.INET: + this.family = AddressFamily.INET; + assert(addr.nameLen >= sockaddr_in.sizeof); + *this.sockAddrInet4 = *cast(sockaddr_in*)addr.name; + break; + case AddressFamily.INET6: + this.family = AddressFamily.INET6; + assert(addr.nameLen >= sockaddr_in6.sizeof); + *this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; + break; + } + } + /** Family of the socket address. */ @property ushort family() const pure nothrow { return addr.sa_family; } @@ -274,14 +302,14 @@ struct NetworkAddress { } UnknownAddress toUnknownAddress() - const { + const nothrow { auto ret = new UnknownAddress; toUnknownAddress(ret); return ret; } void toUnknownAddress(scope UnknownAddress addr) - const { + const nothrow { *addr.name = *this.sockAddr; } @@ -506,6 +534,29 @@ struct TCPListener { Represents a bound and possibly 'connected' UDP socket. */ struct UDPConnection { + private { + DatagramSocketFD m_socket; + } + + private this(ref NetworkAddress bind_address) + { + m_socket = eventDriver.sockets.createDatagramSocket(bind_address.toUnknownAddress(), null); + } + + + this(this) + nothrow { + if (m_socket != StreamSocketFD.invalid) + eventDriver.sockets.addRef(m_socket); + } + + ~this() + nothrow { + if (m_socket != StreamSocketFD.invalid) + eventDriver.sockets.releaseRef(m_socket); + } + + /** Returns the address to which the UDP socket is bound. */ @property string bindAddress() const { assert(false); } @@ -537,7 +588,14 @@ struct UDPConnection { If peer_address is given, the packet is send to that address. Otherwise the packet will be sent to the address specified by a call to connect(). */ - void send(in ubyte[] data, in NetworkAddress* peer_address = null) { assert(false); } + void send(in ubyte[] data, in NetworkAddress* peer_address = null) { + auto ret = asyncAwait!(DatagramIOCallback, + cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? peer_address.toUnknownAddress : null, cb), + cb => eventDriver.sockets.cancelSend(m_socket) + ); + enforce(ret[1] == IOStatus.ok, "Failed to send packet."); + enforce(ret[2] == data.length, "Packet was only sent partially."); + } /** Receives a single packet. @@ -546,9 +604,24 @@ struct UDPConnection { The timeout overload will throw an Exception if no data arrives before the specified duration has elapsed. */ - ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) { assert(false); } + ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) + { + return recv(Duration.max, buf, peer_address); + } /// ditto - ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) { assert(false); } + ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) + { + import std.socket : Address; + if (buf.length == 0) buf = new ubyte[65536]; + auto res = asyncAwait!(DatagramIOCallback, + cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), + cb => eventDriver.sockets.cancelReceive(m_socket) + )(timeout); + enforce(res.completed, "Receive timeout."); + enforce(res.results[1] == IOStatus.ok, "Failed to receive packet."); + if (peer_address) *peer_address = NetworkAddress(res.results[3]); + return buf[0 .. res.results[2]]; + } } diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 03a4b05..9bd2c03 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -19,12 +19,18 @@ auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__ { Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable; asyncAwaitAny!(true, func)(timeout, waitable); - return tuple(waitable.results); + static struct R { + bool completed; + typeof(waitable.results) results; + } + return R(!waitable.cancelled, waitable.results); } auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)() nothrow { - Waitable!(action, (cb) { assert(false, "Action cannot be cancelled."); }, ParameterTypeTuple!Callback) waitable; + static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); } + else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); } + Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable; asyncAwaitAny!(false, func)(waitable); return tuple(waitable.results); } @@ -37,11 +43,17 @@ nothrow { } struct Waitable(alias wait, alias cancel, Results...) { + import std.traits : ReturnType; + alias Callback = void delegate(Results) @safe nothrow; Results results; bool cancelled; - void waitCallback(Callback cb) { wait(cb); } - void cancelCallback(Callback cb) { cancel(cb); } + auto waitCallback(Callback cb) nothrow { return wait(cb); } + + static if (is(ReturnType!waitCallback == void)) + void cancelCallback(Callback cb) nothrow { cancel(cb); } + else + void cancelCallback(Callback cb, ReturnType!waitCallback r) nothrow { cancel(cb, r); } } void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables) @@ -67,6 +79,7 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) { import std.meta : staticMap; import std.algorithm.searching : any; + import std.traits : ReturnType; /*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates @@ -80,10 +93,13 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", waitables.length, func); + () @trusted { logDebugV("si %x", &still_inside); } (); + foreach (i, W; Waitables) { /*scope*/auto cb = (typeof(Waitables[i].results) results) @safe nothrow { + () @trusted { logDebugV("siw %x", &still_inside); } (); + debug(VibeAsyncLog) logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init); assert(still_inside, "Notification fired after asyncAwait had already returned!"); - logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init); fired[i] = true; any_fired = true; waitables[i].results = results; @@ -92,12 +108,18 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) callbacks[i] = cb; debug(VibeAsyncLog) logDebugV("Starting operation %s", i); - waitables[i].waitCallback(callbacks[i]); + static if (is(ReturnType!(W.waitCallback) == void)) + waitables[i].waitCallback(callbacks[i]); + else + auto wr = waitables[i].waitCallback(callbacks[i]); scope ccb = () @safe nothrow { if (!fired[i]) { debug(VibeAsyncLog) logDebugV("Cancelling operation %s", i); - waitables[i].cancelCallback(callbacks[i]); + static if (is(ReturnType!(W.waitCallback) == void)) + waitables[i].cancelCallback(callbacks[i]); + else + waitables[i].cancelCallback(callbacks[i], wr); waitables[i].cancelled = true; any_fired = true; fired[i] = true; @@ -115,6 +137,8 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) t = Task.getThis(); + debug (VibeAsyncLog) scope (failure) logDebugV("Aborting wait due to exception"); + do { static if (interruptible) { bool interrupted = false;