From 2d37e550bdb997749d35835b6b255da2c476a2ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 2 Nov 2016 20:58:00 +0100 Subject: [PATCH] Safe-ify net module, and extend functionality. - Custom bind address for outgoing stream connections - reusePort flag - Full OutputStream interface for TCPConnection. --- source/vibe/core/net.d | 80 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 15 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index e2c1a00..8577894 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -16,7 +16,9 @@ import vibe.core.log; import vibe.core.stream; import vibe.internal.async; import core.time : Duration; - + +@safe: + /** Resolves the given host name/IP address string. @@ -43,8 +45,8 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr ret.family = addr.addressFamily; switch (addr.addressFamily) with(AddressFamily) { default: throw new Exception("Unsupported address family"); - case INET: *ret.sockAddrInet4 = *cast(sockaddr_in*)addr.name; break; - case INET6: *ret.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; break; + case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break; + case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break; } return ret; } else { @@ -85,6 +87,7 @@ TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, st { auto addr = resolveHost(address); addr.port = port; + assert(options == TCPListenOptions.defaults, "TODO"); auto sock = eventDriver.sockets.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow { import vibe.core.core : runTask; runTask(connection_callback, TCPConnection(s)); @@ -110,26 +113,54 @@ TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, /** Establishes a connection to the given host/port. */ -TCPConnection connectTCP(string host, ushort port) +TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0) { NetworkAddress addr = resolveHost(host); addr.port = port; - return connectTCP(addr); + if (addr.family != AddressFamily.UNIX) + addr.port = port; + + NetworkAddress bind_address; + if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family); + else { + bind_address.family = addr.family; + if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; + else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; + } + if (addr.family != AddressFamily.UNIX) + bind_address.port = bind_port; + + return connectTCP(addr, bind_address); } /// ditto -TCPConnection connectTCP(NetworkAddress addr) +TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress) { import std.conv : to; - scope uaddr = new UnknownAddress; - addr.toUnknownAddress(uaddr); - // FIXME: make this interruptible - auto result = asyncAwaitUninterruptible!(ConnectCallback, - cb => eventDriver.sockets.connectStream(uaddr, cb) - //cb => eventDriver.sockets.cancelConnect(cb) - ); - enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); - return TCPConnection(result[0]); + if (bind_address.family == AddressFamily.UNSPEC) { + bind_address.family = addr.family; + if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; + else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; + if (bind_address.family != AddressFamily.UNIX) + bind_address.port = 0; + } + enforce(addr.family == bind_address.family, "Destination address and bind address have different address families."); + + return () @trusted { // scope + scope uaddr = new UnknownAddress; + addr.toUnknownAddress(uaddr); + + scope baddr = new UnknownAddress; + bind_address.toUnknownAddress(baddr); + + // FIXME: make this interruptible + auto result = asyncAwaitUninterruptible!(ConnectCallback, + cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) + //cb => eventDriver.sockets.cancelConnect(cb) + ); + enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); + return TCPConnection(result[0]); + } (); } @@ -143,6 +174,13 @@ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") return UDPConnection(addr); } +NetworkAddress anyAddress() +{ + NetworkAddress ret; + ret.family = AddressFamily.UNSPEC; + return ret; +} + /// Callback invoked for incoming TCP connections. @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream); @@ -365,6 +403,8 @@ struct TCPConnection { eventDriver.sockets.releaseRef(m_socket); } + bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } + @property void tcpNoDelay(bool enabled) { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); } @property bool tcpNoDelay() const { assert(false); } @property void keepAlive(bool enable) { assert(false); } @@ -466,6 +506,9 @@ mixin(tracer); } } + void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } + void write(InputStream stream) { write(stream, 0); } + void flush() { mixin(tracer); } @@ -518,6 +561,8 @@ struct TCPListener { m_socket = socket; } + bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } + /// The local address at which TCP connections are accepted. @property NetworkAddress bindAddress() { @@ -558,6 +603,7 @@ struct UDPConnection { eventDriver.sockets.releaseRef(m_socket); } + bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } /** Returns the address to which the UDP socket is bound. */ @@ -637,6 +683,10 @@ enum TCPListenOptions { distribute = 1<<0, /// Disables automatic closing of the connection when the connection callback exits disableAutoClose = 1<<1, + /** Enable port reuse on linux kernel version >=3.9, do nothing on other OS + Does not affect libasync driver because it is always enabled by libasync. + */ + reusePort = 1<<2, } private pure nothrow {