Safe-ify net module, and extend functionality.

- Custom bind address for outgoing stream connections
- reusePort flag
- Full OutputStream interface for TCPConnection.
This commit is contained in:
Sönke Ludwig 2016-11-02 20:58:00 +01:00
parent 831ef743f2
commit 2d37e550bd

View file

@ -17,6 +17,8 @@ import vibe.core.stream;
import vibe.internal.async;
import core.time : Duration;
@safe:
/**
Resolves the given host name/IP address string.
@ -43,8 +45,8 @@ NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = tr
ret.family = addr.addressFamily;
switch (addr.addressFamily) with(AddressFamily) {
default: throw new Exception("Unsupported address family");
case INET: *ret.sockAddrInet4 = *cast(sockaddr_in*)addr.name; break;
case INET6: *ret.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; break;
case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break;
case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break;
}
return ret;
} else {
@ -85,6 +87,7 @@ TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, st
{
auto addr = resolveHost(address);
addr.port = port;
assert(options == TCPListenOptions.defaults, "TODO");
auto sock = eventDriver.sockets.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow {
import vibe.core.core : runTask;
runTask(connection_callback, TCPConnection(s));
@ -110,26 +113,54 @@ TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback,
/**
Establishes a connection to the given host/port.
*/
TCPConnection connectTCP(string host, ushort port)
TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0)
{
NetworkAddress addr = resolveHost(host);
addr.port = port;
return connectTCP(addr);
if (addr.family != AddressFamily.UNIX)
addr.port = port;
NetworkAddress bind_address;
if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family);
else {
bind_address.family = addr.family;
if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
}
if (addr.family != AddressFamily.UNIX)
bind_address.port = bind_port;
return connectTCP(addr, bind_address);
}
/// ditto
TCPConnection connectTCP(NetworkAddress addr)
TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress)
{
import std.conv : to;
if (bind_address.family == AddressFamily.UNSPEC) {
bind_address.family = addr.family;
if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
if (bind_address.family != AddressFamily.UNIX)
bind_address.port = 0;
}
enforce(addr.family == bind_address.family, "Destination address and bind address have different address families.");
return () @trusted { // scope
scope uaddr = new UnknownAddress;
addr.toUnknownAddress(uaddr);
scope baddr = new UnknownAddress;
bind_address.toUnknownAddress(baddr);
// FIXME: make this interruptible
auto result = asyncAwaitUninterruptible!(ConnectCallback,
cb => eventDriver.sockets.connectStream(uaddr, cb)
cb => eventDriver.sockets.connectStream(uaddr, baddr, cb)
//cb => eventDriver.sockets.cancelConnect(cb)
);
enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string);
return TCPConnection(result[0]);
} ();
}
@ -143,6 +174,13 @@ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
return UDPConnection(addr);
}
NetworkAddress anyAddress()
{
NetworkAddress ret;
ret.family = AddressFamily.UNSPEC;
return ret;
}
/// Callback invoked for incoming TCP connections.
@safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream);
@ -365,6 +403,8 @@ struct TCPConnection {
eventDriver.sockets.releaseRef(m_socket);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; }
@property void tcpNoDelay(bool enabled) { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); }
@property bool tcpNoDelay() const { assert(false); }
@property void keepAlive(bool enable) { assert(false); }
@ -466,6 +506,9 @@ mixin(tracer);
}
}
void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); }
void write(InputStream stream) { write(stream, 0); }
void flush() {
mixin(tracer);
}
@ -518,6 +561,8 @@ struct TCPListener {
m_socket = socket;
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; }
/// The local address at which TCP connections are accepted.
@property NetworkAddress bindAddress()
{
@ -558,6 +603,7 @@ struct UDPConnection {
eventDriver.sockets.releaseRef(m_socket);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; }
/** Returns the address to which the UDP socket is bound.
*/
@ -637,6 +683,10 @@ enum TCPListenOptions {
distribute = 1<<0,
/// Disables automatic closing of the connection when the connection callback exits
disableAutoClose = 1<<1,
/** Enable port reuse on linux kernel version >=3.9, do nothing on other OS
Does not affect libasync driver because it is always enabled by libasync.
*/
reusePort = 1<<2,
}
private pure nothrow {