/** TCP/UDP connection and server handling. Copyright: © 2012-2016 RejectedSoftware e.K. Authors: Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. */ module vibe.core.net; import eventcore.core; import std.exception : enforce; import std.format : format; import std.functional : toDelegate; import std.socket : AddressFamily, UnknownAddress; import vibe.core.log; import vibe.core.stream; import vibe.internal.async; import core.time : Duration; @safe: /** Resolves the given host name/IP address string. Setting use_dns to false will only allow IP address strings but also guarantees that the call will not block. */ NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFamily.UNSPEC, bool use_dns = true) { return resolveHost(host, cast(ushort)address_family, use_dns); } /// ditto NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true) { import std.socket : parseAddress; version (Windows) import std.c.windows.winsock : sockaddr_in, sockaddr_in6; else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; enforce(host.length > 0, "Host name must not be empty."); if (host[0] == ':' || host[0] >= '0' && host[0] <= '9') { auto addr = parseAddress(host); enforce(address_family == AddressFamily.UNSPEC || addr.addressFamily == address_family); NetworkAddress ret; ret.family = addr.addressFamily; switch (addr.addressFamily) with(AddressFamily) { default: throw new Exception("Unsupported address family"); case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break; case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break; } return ret; } else { enforce(use_dns, "Malformed IP address string."); auto res = asyncAwait!(DNSLookupCallback, cb => eventDriver.dns.lookupHost(host, cb), (cb, id) => eventDriver.dns.cancelLookup(id) ); enforce(res[1] == DNSStatus.ok && res[2].length > 0, "Failed to lookup host '"~host~"'."); return NetworkAddress(res[2][0]); } } /** Starts listening on the specified port. 'connection_callback' will be called for each client that connects to the server socket. Each new connection gets its own fiber. The stream parameter then allows to perform blocking I/O on the client socket. The address parameter can be used to specify the network interface on which the server socket is supposed to listen for connections. By default, all IPv4 and IPv6 interfaces will be used. */ TCPListener[] listenTCP(ushort port, TCPConnectionDelegate connection_callback, TCPListenOptions options = TCPListenOptions.defaults) { TCPListener[] ret; try ret ~= listenTCP(port, connection_callback, "::", options); catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg); try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options); catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg); enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port)); return ret; } /// ditto TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) { auto addr = resolveHost(address); addr.port = port; assert(options == TCPListenOptions.defaults, "TODO"); auto sock = eventDriver.sockets.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow { import vibe.core.core : runTask; runTask(connection_callback, TCPConnection(s)); }); return TCPListener(sock); } /** Starts listening on the specified port. This function is the same as listenTCP but takes a function callback instead of a delegate. */ TCPListener[] listenTCP_s(ushort port, TCPConnectionFunction connection_callback, TCPListenOptions options = TCPListenOptions.defaults) { return listenTCP(port, toDelegate(connection_callback), options); } /// ditto TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) { return listenTCP(port, toDelegate(connection_callback), address, options); } /** Establishes a connection to the given host/port. */ TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0) { NetworkAddress addr = resolveHost(host); addr.port = port; if (addr.family != AddressFamily.UNIX) addr.port = port; NetworkAddress bind_address; if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family); else { bind_address.family = addr.family; if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; } if (addr.family != AddressFamily.UNIX) bind_address.port = bind_port; return connectTCP(addr, bind_address); } /// ditto TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress) { import std.conv : to; if (bind_address.family == AddressFamily.UNSPEC) { bind_address.family = addr.family; if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; if (bind_address.family != AddressFamily.UNIX) bind_address.port = 0; } enforce(addr.family == bind_address.family, "Destination address and bind address have different address families."); return () @trusted { // scope scope uaddr = new UnknownAddress; addr.toUnknownAddress(uaddr); scope baddr = new UnknownAddress; bind_address.toUnknownAddress(baddr); // FIXME: make this interruptible auto result = asyncAwaitUninterruptible!(ConnectCallback, cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) //cb => eventDriver.sockets.cancelConnect(cb) ); enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); return TCPConnection(result[0]); } (); } /** Creates a bound UDP socket suitable for sending and receiving packets. */ UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") { auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false); addr.port = port; return UDPConnection(addr); } NetworkAddress anyAddress() { NetworkAddress ret; ret.family = AddressFamily.UNSPEC; return ret; } /// Callback invoked for incoming TCP connections. @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream); /// ditto @safe nothrow alias TCPConnectionFunction = void delegate(TCPConnection stream); /** Represents a network/socket address. */ struct NetworkAddress { import std.socket : Address; version (Windows) import core.sys.windows.winsock2; else import core.sys.posix.netinet.in_; @safe: private union { sockaddr addr; sockaddr_in addr_ip4; sockaddr_in6 addr_ip6; } this(Address addr) @trusted { assert(addr !is null); switch (addr.addressFamily) { default: throw new Exception("Unsupported address family."); case AddressFamily.INET: this.family = AddressFamily.INET; assert(addr.nameLen >= sockaddr_in.sizeof); *this.sockAddrInet4 = *cast(sockaddr_in*)addr.name; break; case AddressFamily.INET6: this.family = AddressFamily.INET6; assert(addr.nameLen >= sockaddr_in6.sizeof); *this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; break; } } /** Family of the socket address. */ @property ushort family() const pure nothrow { return addr.sa_family; } /// ditto @property void family(AddressFamily val) pure nothrow { addr.sa_family = cast(ubyte)val; } /// ditto @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } /** The port in host byte order. */ @property ushort port() const pure nothrow { ushort nport; switch (this.family) { default: assert(false, "port() called for invalid address family."); case AF_INET: nport = addr_ip4.sin_port; break; case AF_INET6: nport = addr_ip6.sin6_port; break; } return () @trusted { return ntoh(nport); } (); } /// ditto @property void port(ushort val) pure nothrow { auto nport = () @trusted { return hton(val); } (); switch (this.family) { default: assert(false, "port() called for invalid address family."); case AF_INET: addr_ip4.sin_port = nport; break; case AF_INET6: addr_ip6.sin6_port = nport; break; } } /** A pointer to a sockaddr struct suitable for passing to socket functions. */ @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } /** Size of the sockaddr struct that is returned by sockAddr(). */ @property int sockAddrLen() const pure nothrow { switch (this.family) { default: assert(false, "sockAddrLen() called for invalid address family."); case AF_INET: return addr_ip4.sizeof; case AF_INET6: return addr_ip6.sizeof; } } @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow in { assert (family == AF_INET); } body { return &addr_ip4; } @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow in { assert (family == AF_INET6); } body { return &addr_ip6; } /** Returns a string representation of the IP address */ string toAddressString() const { import std.array : appender; auto ret = appender!string(); ret.reserve(40); toAddressString(str => ret.put(str)); return ret.data; } /// ditto void toAddressString(scope void delegate(const(char)[]) @safe sink) const { import std.array : appender; import std.format : formattedWrite; ubyte[2] _dummy = void; // Workaround for DMD regression in master switch (this.family) { default: assert(false, "toAddressString() called for invalid address family."); case AF_INET: ubyte[4] ip = () @trusted { return (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; } (); sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); break; case AF_INET6: ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; foreach (i; 0 .. 8) { if (i > 0) sink(":"); _dummy[] = ip[i*2 .. i*2+2]; sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy)); } break; } } /** Returns a full string representation of the address, including the port number. */ string toString() const { import std.array : appender; auto ret = appender!string(); toString(str => ret.put(str)); return ret.data; } /// ditto void toString(scope void delegate(const(char)[]) @safe sink) const { import std.format : formattedWrite; switch (this.family) { default: assert(false, "toString() called for invalid address family."); case AF_INET: toAddressString(sink); sink.formattedWrite(":%s", port); break; case AF_INET6: sink("["); toAddressString(sink); sink.formattedWrite("]:%s", port); break; } } UnknownAddress toUnknownAddress() const nothrow { auto ret = new UnknownAddress; toUnknownAddress(ret); return ret; } void toUnknownAddress(scope UnknownAddress addr) const nothrow { *addr.name = *this.sockAddr; } version(Have_libev) {} else { unittest { void test(string ip) { auto res = () @trusted { return resolveHost(ip, AF_UNSPEC, false); } ().toAddressString(); assert(res == ip, "IP "~ip~" yielded wrong string representation: "~res); } test("1.2.3.4"); test("102:304:506:708:90a:b0c:d0e:f10"); } } } /** Represents a single TCP connection. */ struct TCPConnection { @safe: import core.time : seconds; import vibe.internal.array : BatchBuffer; //static assert(isConnectionStream!TCPConnection); struct Context { BatchBuffer!ubyte readBuffer; } private { StreamSocketFD m_socket; Context* m_context; } private this(StreamSocketFD socket) nothrow { m_socket = socket; m_context = () @trusted { return &eventDriver.core.userData!Context(socket); } (); m_context.readBuffer.capacity = 4096; } this(this) nothrow { if (m_socket != StreamSocketFD.invalid) eventDriver.sockets.addRef(m_socket); } ~this() nothrow { if (m_socket != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(m_socket); } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } @property void tcpNoDelay(bool enabled) { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); } @property bool tcpNoDelay() const { assert(false); } @property void keepAlive(bool enable) { assert(false); } @property bool keepAlive() const { assert(false); } @property void readTimeout(Duration duration) { } @property Duration readTimeout() const { assert(false); } @property string peerAddress() const { return ""; } @property NetworkAddress localAddress() const { return NetworkAddress.init; } @property NetworkAddress remoteAddress() const { return NetworkAddress.init; } @property bool connected() const { if (m_socket == StreamSocketFD.invalid) return false; auto s = eventDriver.sockets.getConnectionState(m_socket); return s >= ConnectionState.connected && s < ConnectionState.activeClose; } @property bool empty() { return leastSize == 0; } @property ulong leastSize() { waitForData(); return m_context.readBuffer.length; } @property bool dataAvailableForRead() { return waitForData(0.seconds); } void close() nothrow { //logInfo("close %s", cast(int)m_fd); if (m_socket != StreamSocketFD.invalid) { eventDriver.sockets.shutdown(m_socket, true, true); eventDriver.sockets.releaseRef(m_socket); m_socket = StreamSocketFD.invalid; m_context = null; } } bool waitForData(Duration timeout = Duration.max) { mixin(tracer); // TODO: timeout!! if (m_context.readBuffer.length > 0) return true; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; auto res = asyncAwait!(IOCallback, cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), cb => eventDriver.sockets.cancelRead(m_socket) ); logTrace("Socket %s, read %s bytes: %s", res[0], res[2], res[1]); assert(m_context.readBuffer.length == 0); m_context.readBuffer.putN(res[2]); switch (res[1]) { default: logInfo("read status %s", res[1]); throw new Exception("Error reading data from socket."); case IOStatus.ok: break; case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break; case IOStatus.disconnected: break; } return m_context.readBuffer.length > 0; } const(ubyte)[] peek() { return m_context.readBuffer.peek(); } void skip(ulong count) { import std.algorithm.comparison : min; while (count > 0) { waitForData(); auto n = min(count, m_context.readBuffer.length); m_context.readBuffer.popFrontN(n); count -= n; } } void read(ubyte[] dst) { mixin(tracer); import std.algorithm.comparison : min; while (dst.length > 0) { enforce(waitForData(), "Reached end of stream while reading data."); assert(m_context.readBuffer.length > 0); auto l = min(dst.length, m_context.readBuffer.length); m_context.readBuffer.read(dst[0 .. l]); dst = dst[l .. $]; } } void write(in ubyte[] bytes) { mixin(tracer); if (bytes.length == 0) return; auto res = asyncAwait!(IOCallback, cb => eventDriver.sockets.write(m_socket, bytes, IOMode.all, cb), cb => eventDriver.sockets.cancelWrite(m_socket)); switch (res[1]) { default: throw new Exception("Error writing data to socket."); case IOStatus.ok: break; case IOStatus.disconnected: break; } } void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } void write(InputStream stream) { write(stream, 0); } void flush() { mixin(tracer); } void finalize() {} void write(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { writeDefault(stream, nbytes); } private void writeDefault(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { import std.algorithm.comparison : min; static struct Buffer { ubyte[64*1024 - 4*size_t.sizeof] bytes = void; } scope bufferobj = new Buffer; // FIXME: use heap allocation auto buffer = bufferobj.bytes[]; //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); if( nbytes == 0 ){ while( !stream.empty ){ size_t chunk = min(stream.leastSize, buffer.length); assert(chunk > 0, "leastSize returned zero for non-empty stream."); //logTrace("read pipe chunk %d", chunk); stream.read(buffer[0 .. chunk]); write(buffer[0 .. chunk]); } } else { while( nbytes > 0 ){ size_t chunk = min(nbytes, buffer.length); //logTrace("read pipe chunk %d", chunk); stream.read(buffer[0 .. chunk]); write(buffer[0 .. chunk]); nbytes -= chunk; } } } } mixin validateConnectionStream!TCPConnection; /** Represents a listening TCP socket. */ struct TCPListener { private { StreamListenSocketFD m_socket; } this(StreamListenSocketFD socket) { m_socket = socket; } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } /// The local address at which TCP connections are accepted. @property NetworkAddress bindAddress() { assert(false); } /// Stops listening and closes the socket. void stopListening() { assert(false); } } /** Represents a bound and possibly 'connected' UDP socket. */ struct UDPConnection { private { DatagramSocketFD m_socket; } private this(ref NetworkAddress bind_address) { m_socket = eventDriver.sockets.createDatagramSocket(bind_address.toUnknownAddress(), null); } this(this) nothrow { if (m_socket != StreamSocketFD.invalid) eventDriver.sockets.addRef(m_socket); } ~this() nothrow { if (m_socket != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(m_socket); } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } /** Returns the address to which the UDP socket is bound. */ @property string bindAddress() const { assert(false); } /** Determines if the socket is allowed to send to broadcast addresses. */ @property bool canBroadcast() const { assert(false); } /// ditto @property void canBroadcast(bool val) { assert(false); } /// The local/bind address of the underlying socket. @property NetworkAddress localAddress() const { assert(false); } /** Stops listening for datagrams and frees all resources. */ void close() { assert(false); } /** Locks the UDP connection to a certain peer. Once connected, the UDPConnection can only communicate with the specified peer. Otherwise communication with any reachable peer is possible. */ void connect(string host, ushort port) { assert(false); } /// ditto void connect(NetworkAddress address) { assert(false); } /** Sends a single packet. If peer_address is given, the packet is send to that address. Otherwise the packet will be sent to the address specified by a call to connect(). */ void send(in ubyte[] data, in NetworkAddress* peer_address = null) { auto ret = asyncAwait!(DatagramIOCallback, cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? peer_address.toUnknownAddress : null, cb), cb => eventDriver.sockets.cancelSend(m_socket) ); enforce(ret[1] == IOStatus.ok, "Failed to send packet."); enforce(ret[2] == data.length, "Packet was only sent partially."); } /** Receives a single packet. If a buffer is given, it must be large enough to hold the full packet. The timeout overload will throw an Exception if no data arrives before the specified duration has elapsed. */ ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) { return recv(Duration.max, buf, peer_address); } /// ditto ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) { import std.socket : Address; if (buf.length == 0) buf = new ubyte[65536]; auto res = asyncAwait!(DatagramIOCallback, cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), cb => eventDriver.sockets.cancelReceive(m_socket) )(timeout); enforce(res.completed, "Receive timeout."); enforce(res.results[1] == IOStatus.ok, "Failed to receive packet."); if (peer_address) *peer_address = NetworkAddress(res.results[3]); return buf[0 .. res.results[2]]; } } /** Flags to control the behavior of listenTCP. */ enum TCPListenOptions { /// Don't enable any particular option defaults = 0, /// Causes incoming connections to be distributed across the thread pool distribute = 1<<0, /// Disables automatic closing of the connection when the connection callback exits disableAutoClose = 1<<1, /** Enable port reuse on linux kernel version >=3.9, do nothing on other OS Does not affect libasync driver because it is always enabled by libasync. */ reusePort = 1<<2, } private pure nothrow { import std.bitmanip; ushort ntoh(ushort val) { version (LittleEndian) return swapEndian(val); else version (BigEndian) return val; else static assert(false, "Unknown endianness."); } ushort hton(ushort val) { version (LittleEndian) return swapEndian(val); else version (BigEndian) return val; else static assert(false, "Unknown endianness."); } } private enum tracer = "";