diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index ddd196b..ed9059c 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -215,6 +215,22 @@ interface EventDriverSockets { /// Sets to `SO_KEEPALIVE` socket option on a socket. void setKeepAlive(StreamSocketFD socket, bool enable); + /** Enables keepalive for the TCP socket and sets additional parameters. + Silently ignores unsupported systems (anything but Windows and Linux). + + Params: + socket = Socket file descriptor to set options on. + idle = The time the connection needs to remain idle + before TCP starts sending keepalive probes. + interval = The time between individual keepalive probes. + probeCount = The maximum number of keepalive probes TCP should send + before dropping the connection. Has no effect on Windows. + */ + void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount = 5); + + /// Sets `TCP_USER_TIMEOUT` socket option (linux only). https://tools.ietf.org/html/rfc5482 + void setUserTimeout(StreamSocketFD socket, Duration timeout); + /** Reads data from a stream socket. Note that only a single read operation is allowed at once. The caller diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 12247a4..6137c0f 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -8,6 +8,8 @@ import eventcore.internal.utils; import std.algorithm.comparison : among, min, max; import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress; +import core.time: Duration; + version (Posix) { import std.socket : UnixAddress; import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo; @@ -45,6 +47,20 @@ version (linux) { } else import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; + + // Linux-specific TCP options + // https://github.com/torvalds/linux/blob/master/include/uapi/linux/tcp.h#L95 + // Some day we should siply import core.sys.linux.netinet.tcp; + static if (!is(typeof(SOL_TCP))) + enum SOL_TCP = 6; + static if (!is(typeof(TCP_KEEPIDLE))) + enum TCP_KEEPIDLE = 4; + static if (!is(typeof(TCP_KEEPINTVL))) + enum TCP_KEEPINTVL = 5; + static if (!is(typeof(TCP_KEEPCNT))) + enum TCP_KEEPCNT = 6; + static if (!is(typeof(TCP_USER_TIMEOUT))) + enum TCP_USER_TIMEOUT = 18; } version(OSX) { static if (__VERSION__ < 2077) { @@ -294,10 +310,45 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets () @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); } - final override void setKeepAlive(StreamSocketFD socket, bool enable) + override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted { - ubyte opt = enable; - () @trusted { setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, cast(char*)&opt, opt.sizeof); } (); + int opt = enable; + int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof); + if (err != 0) + print("sock error in setKeepAlive: %s", getSocketError); + } + + override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted + { + // dunnno about BSD\OSX, maybe someone should fix it for them later + version (linux) { + setKeepAlive(socket, true); + int int_opt = cast(int) idle.total!"seconds"(); + int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPIDLE, &int_opt, int.sizeof); + if (err != 0) { + print("sock error on setsockopt TCP_KEEPIDLE: %s", getSocketError); + return; + } + int_opt = cast(int) interval.total!"seconds"(); + err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPINTVL, &int_opt, int.sizeof); + if (err != 0) { + print("sock error on setsockopt TCP_KEEPINTVL: %s", getSocketError); + return; + } + err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPCNT, &probeCount, int.sizeof); + if (err != 0) + print("sock error on setsockopt TCP_KEEPCNT: %s", getSocketError); + } + } + + override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted + { + version (linux) { + uint tmsecs = cast(uint) timeout.total!"msecs"; + int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof); + if (err != 0) + print("sock error on setsockopt TCP_USER_TIMEOUT %s", getSocketError); + } } final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 7200dd9..28512d3 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -8,6 +8,8 @@ import eventcore.internal.win32; import eventcore.internal.utils : AlgebraicChoppedVector, print, nogc_assert; import std.socket : Address; +import core.time: Duration; + private enum WM_USER_SOCKET = WM_USER + 1; @@ -221,6 +223,18 @@ final class WinAPIEventDriverSockets : EventDriverSockets { setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof); } + override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted + { + tcp_keepalive opts = tcp_keepalive(1, cast(c_ulong) idle.total!"msecs"(), + cast(c_ulong) interval.total!"msecs"); + int result = WSAIoctl(socket, SIO_KEEPALIVE_VALS, &opts, cast(DWORD) tcp_keepalive.sizeof, + null, 0, null, null, null); + if (result != 0) + print("WSAIoctl error on SIO_KEEPALIVE_VALS: %d", WSAGetLastError()); + } + + override void setUserTimeout(StreamSocketFD socket, Duration timeout) {} + override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); diff --git a/source/eventcore/internal/win32.d b/source/eventcore/internal/win32.d index af068b3..9a4a599 100644 --- a/source/eventcore/internal/win32.d +++ b/source/eventcore/internal/win32.d @@ -4,6 +4,7 @@ version(Windows): public import core.sys.windows.windows; public import core.sys.windows.winsock2; +public import core.stdc.config: c_ulong; extern(System) nothrow: @@ -104,6 +105,29 @@ struct ADDRINFOW { ADDRINFOW* ai_next; } +// https://msdn.microsoft.com/en-us/library/windows/desktop/dd877220(v=vs.85).aspx +struct tcp_keepalive { + c_ulong onoff; + c_ulong keepalivetime; + c_ulong keepaliveinterval; +}; + +// https://gist.github.com/piscisaureus/906386#file-winsock2-h-L1099 +enum : DWORD { + IOC_VENDOR = 0x18000000, + IOC_OUT = 0x40000000, + IOC_IN = 0x80000000 +} + +DWORD _WSAIOW(DWORD x, DWORD y) pure @safe +{ + return IOC_IN | x | y; +} + +enum : DWORD { + SIO_KEEPALIVE_VALS = _WSAIOW(IOC_VENDOR, 4) +} + struct WSAPROTOCOL_INFO { DWORD dwServiceFlags1; DWORD dwServiceFlags2; @@ -154,6 +178,7 @@ void FreeAddrInfoExW(ADDRINFOEXW* pAddrInfo); void freeaddrinfo(ADDRINFOA* ai); BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags); BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped); +int WSAIoctl(SOCKET s, DWORD dwIoControlCode, void* lpvInBuffer, DWORD cbInBuffer, void* lpvOutBuffer, DWORD cbOutBuffer, DWORD* lpcbBytesReturned, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine); /*struct WSAOVERLAPPEDX { ULONG_PTR Internal; diff --git a/source/eventcore/socket.d b/source/eventcore/socket.d index 837cded..0dc573f 100644 --- a/source/eventcore/socket.d +++ b/source/eventcore/socket.d @@ -2,6 +2,7 @@ module eventcore.socket; import eventcore.core : eventDriver; import eventcore.driver; +import core.time: Duration; import std.exception : enforce; import std.socket : Address; @@ -49,6 +50,11 @@ struct StreamSocket { @property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); } @property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); } + void setKeepAlive(bool enable) { eventDriver.sockets.setKeepAlive(m_fd, enable); } + void setKeepAliveParams(Duration idle, Duration interval, int probeCount = 5) { + eventDriver.sockets.setKeepAliveParams(m_fd, idle, interval, probeCount); + } + void setUserTimeout(Duration timeout) { eventDriver.sockets.setUserTimeout(m_fd, timeout); } } void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode) diff --git a/tests/0-tcp-readwait.d b/tests/0-tcp-readwait.d index 3b8bfae..e668280 100644 --- a/tests/0-tcp-readwait.d +++ b/tests/0-tcp-readwait.d @@ -19,7 +19,7 @@ void main() writeln("This doesn't work on macOS. Skipping this test until it is determined that this special case should stay supported."); return; } else { - + static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; auto baddr = new InternetAddress(0x7F000001, 40002); diff --git a/tests/0-tcp.d b/tests/0-tcp.d index 75cc479..e7f1760 100644 --- a/tests/0-tcp.d +++ b/tests/0-tcp.d @@ -8,7 +8,7 @@ import eventcore.core; import eventcore.socket; import eventcore.internal.utils : print; import std.socket : InternetAddress; -import core.time : Duration, msecs; +import core.time : Duration, msecs, seconds; ubyte[256] s_rbuf; bool s_done; @@ -69,6 +69,10 @@ void main() client = sock; assert(status == ConnectStatus.connected); assert(sock.state == ConnectionState.connected); + print("Setting keepalive and timeout options"); + client.setKeepAlive(true); + client.setKeepAliveParams(10.seconds, 10.seconds, 4); + client.setUserTimeout(5.seconds); print("Initial write"); client.write!((wstatus, bytes) { print("Initial write done");