Merge pull request #76 from Boris-Barboris/keepalive_params
fix Keepalive on Linux, expose keepalive options, TCP_USER_TIMEOUT for linux.
This commit is contained in:
commit
c404cc2e5b
|
@ -215,6 +215,22 @@ interface EventDriverSockets {
|
||||||
/// Sets to `SO_KEEPALIVE` socket option on a socket.
|
/// Sets to `SO_KEEPALIVE` socket option on a socket.
|
||||||
void setKeepAlive(StreamSocketFD socket, bool enable);
|
void setKeepAlive(StreamSocketFD socket, bool enable);
|
||||||
|
|
||||||
|
/** Enables keepalive for the TCP socket and sets additional parameters.
|
||||||
|
Silently ignores unsupported systems (anything but Windows and Linux).
|
||||||
|
|
||||||
|
Params:
|
||||||
|
socket = Socket file descriptor to set options on.
|
||||||
|
idle = The time the connection needs to remain idle
|
||||||
|
before TCP starts sending keepalive probes.
|
||||||
|
interval = The time between individual keepalive probes.
|
||||||
|
probeCount = The maximum number of keepalive probes TCP should send
|
||||||
|
before dropping the connection. Has no effect on Windows.
|
||||||
|
*/
|
||||||
|
void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount = 5);
|
||||||
|
|
||||||
|
/// Sets `TCP_USER_TIMEOUT` socket option (linux only). https://tools.ietf.org/html/rfc5482
|
||||||
|
void setUserTimeout(StreamSocketFD socket, Duration timeout);
|
||||||
|
|
||||||
/** Reads data from a stream socket.
|
/** Reads data from a stream socket.
|
||||||
|
|
||||||
Note that only a single read operation is allowed at once. The caller
|
Note that only a single read operation is allowed at once. The caller
|
||||||
|
|
|
@ -8,6 +8,8 @@ import eventcore.internal.utils;
|
||||||
import std.algorithm.comparison : among, min, max;
|
import std.algorithm.comparison : among, min, max;
|
||||||
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
|
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
|
||||||
|
|
||||||
|
import core.time: Duration;
|
||||||
|
|
||||||
version (Posix) {
|
version (Posix) {
|
||||||
import std.socket : UnixAddress;
|
import std.socket : UnixAddress;
|
||||||
import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
|
import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
|
||||||
|
@ -45,6 +47,20 @@ version (linux) {
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
|
import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
|
||||||
|
|
||||||
|
// Linux-specific TCP options
|
||||||
|
// https://github.com/torvalds/linux/blob/master/include/uapi/linux/tcp.h#L95
|
||||||
|
// Some day we should siply import core.sys.linux.netinet.tcp;
|
||||||
|
static if (!is(typeof(SOL_TCP)))
|
||||||
|
enum SOL_TCP = 6;
|
||||||
|
static if (!is(typeof(TCP_KEEPIDLE)))
|
||||||
|
enum TCP_KEEPIDLE = 4;
|
||||||
|
static if (!is(typeof(TCP_KEEPINTVL)))
|
||||||
|
enum TCP_KEEPINTVL = 5;
|
||||||
|
static if (!is(typeof(TCP_KEEPCNT)))
|
||||||
|
enum TCP_KEEPCNT = 6;
|
||||||
|
static if (!is(typeof(TCP_USER_TIMEOUT)))
|
||||||
|
enum TCP_USER_TIMEOUT = 18;
|
||||||
}
|
}
|
||||||
version(OSX) {
|
version(OSX) {
|
||||||
static if (__VERSION__ < 2077) {
|
static if (__VERSION__ < 2077) {
|
||||||
|
@ -294,10 +310,45 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
() @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
|
() @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void setKeepAlive(StreamSocketFD socket, bool enable)
|
override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted
|
||||||
{
|
{
|
||||||
ubyte opt = enable;
|
int opt = enable;
|
||||||
() @trusted { setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, cast(char*)&opt, opt.sizeof); } ();
|
int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof);
|
||||||
|
if (err != 0)
|
||||||
|
print("sock error in setKeepAlive: %s", getSocketError);
|
||||||
|
}
|
||||||
|
|
||||||
|
override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted
|
||||||
|
{
|
||||||
|
// dunnno about BSD\OSX, maybe someone should fix it for them later
|
||||||
|
version (linux) {
|
||||||
|
setKeepAlive(socket, true);
|
||||||
|
int int_opt = cast(int) idle.total!"seconds"();
|
||||||
|
int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPIDLE, &int_opt, int.sizeof);
|
||||||
|
if (err != 0) {
|
||||||
|
print("sock error on setsockopt TCP_KEEPIDLE: %s", getSocketError);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int_opt = cast(int) interval.total!"seconds"();
|
||||||
|
err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPINTVL, &int_opt, int.sizeof);
|
||||||
|
if (err != 0) {
|
||||||
|
print("sock error on setsockopt TCP_KEEPINTVL: %s", getSocketError);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPCNT, &probeCount, int.sizeof);
|
||||||
|
if (err != 0)
|
||||||
|
print("sock error on setsockopt TCP_KEEPCNT: %s", getSocketError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted
|
||||||
|
{
|
||||||
|
version (linux) {
|
||||||
|
uint tmsecs = cast(uint) timeout.total!"msecs";
|
||||||
|
int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof);
|
||||||
|
if (err != 0)
|
||||||
|
print("sock error on setsockopt TCP_USER_TIMEOUT %s", getSocketError);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
|
final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
|
||||||
|
|
|
@ -8,6 +8,8 @@ import eventcore.internal.win32;
|
||||||
import eventcore.internal.utils : AlgebraicChoppedVector, print, nogc_assert;
|
import eventcore.internal.utils : AlgebraicChoppedVector, print, nogc_assert;
|
||||||
import std.socket : Address;
|
import std.socket : Address;
|
||||||
|
|
||||||
|
import core.time: Duration;
|
||||||
|
|
||||||
private enum WM_USER_SOCKET = WM_USER + 1;
|
private enum WM_USER_SOCKET = WM_USER + 1;
|
||||||
|
|
||||||
|
|
||||||
|
@ -221,6 +223,18 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
|
||||||
setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof);
|
setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted
|
||||||
|
{
|
||||||
|
tcp_keepalive opts = tcp_keepalive(1, cast(c_ulong) idle.total!"msecs"(),
|
||||||
|
cast(c_ulong) interval.total!"msecs");
|
||||||
|
int result = WSAIoctl(socket, SIO_KEEPALIVE_VALS, &opts, cast(DWORD) tcp_keepalive.sizeof,
|
||||||
|
null, 0, null, null, null);
|
||||||
|
if (result != 0)
|
||||||
|
print("WSAIoctl error on SIO_KEEPALIVE_VALS: %d", WSAGetLastError());
|
||||||
|
}
|
||||||
|
|
||||||
|
override void setUserTimeout(StreamSocketFD socket, Duration timeout) {}
|
||||||
|
|
||||||
override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
|
override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
|
||||||
{
|
{
|
||||||
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
|
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
|
||||||
|
|
|
@ -4,6 +4,7 @@ version(Windows):
|
||||||
|
|
||||||
public import core.sys.windows.windows;
|
public import core.sys.windows.windows;
|
||||||
public import core.sys.windows.winsock2;
|
public import core.sys.windows.winsock2;
|
||||||
|
public import core.stdc.config: c_ulong;
|
||||||
|
|
||||||
extern(System) nothrow:
|
extern(System) nothrow:
|
||||||
|
|
||||||
|
@ -104,6 +105,29 @@ struct ADDRINFOW {
|
||||||
ADDRINFOW* ai_next;
|
ADDRINFOW* ai_next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://msdn.microsoft.com/en-us/library/windows/desktop/dd877220(v=vs.85).aspx
|
||||||
|
struct tcp_keepalive {
|
||||||
|
c_ulong onoff;
|
||||||
|
c_ulong keepalivetime;
|
||||||
|
c_ulong keepaliveinterval;
|
||||||
|
};
|
||||||
|
|
||||||
|
// https://gist.github.com/piscisaureus/906386#file-winsock2-h-L1099
|
||||||
|
enum : DWORD {
|
||||||
|
IOC_VENDOR = 0x18000000,
|
||||||
|
IOC_OUT = 0x40000000,
|
||||||
|
IOC_IN = 0x80000000
|
||||||
|
}
|
||||||
|
|
||||||
|
DWORD _WSAIOW(DWORD x, DWORD y) pure @safe
|
||||||
|
{
|
||||||
|
return IOC_IN | x | y;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum : DWORD {
|
||||||
|
SIO_KEEPALIVE_VALS = _WSAIOW(IOC_VENDOR, 4)
|
||||||
|
}
|
||||||
|
|
||||||
struct WSAPROTOCOL_INFO {
|
struct WSAPROTOCOL_INFO {
|
||||||
DWORD dwServiceFlags1;
|
DWORD dwServiceFlags1;
|
||||||
DWORD dwServiceFlags2;
|
DWORD dwServiceFlags2;
|
||||||
|
@ -154,6 +178,7 @@ void FreeAddrInfoExW(ADDRINFOEXW* pAddrInfo);
|
||||||
void freeaddrinfo(ADDRINFOA* ai);
|
void freeaddrinfo(ADDRINFOA* ai);
|
||||||
BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags);
|
BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags);
|
||||||
BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped);
|
BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped);
|
||||||
|
int WSAIoctl(SOCKET s, DWORD dwIoControlCode, void* lpvInBuffer, DWORD cbInBuffer, void* lpvOutBuffer, DWORD cbOutBuffer, DWORD* lpcbBytesReturned, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine);
|
||||||
|
|
||||||
/*struct WSAOVERLAPPEDX {
|
/*struct WSAOVERLAPPEDX {
|
||||||
ULONG_PTR Internal;
|
ULONG_PTR Internal;
|
||||||
|
|
|
@ -2,6 +2,7 @@ module eventcore.socket;
|
||||||
|
|
||||||
import eventcore.core : eventDriver;
|
import eventcore.core : eventDriver;
|
||||||
import eventcore.driver;
|
import eventcore.driver;
|
||||||
|
import core.time: Duration;
|
||||||
import std.exception : enforce;
|
import std.exception : enforce;
|
||||||
import std.socket : Address;
|
import std.socket : Address;
|
||||||
|
|
||||||
|
@ -49,6 +50,11 @@ struct StreamSocket {
|
||||||
|
|
||||||
@property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); }
|
@property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); }
|
||||||
@property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); }
|
@property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); }
|
||||||
|
void setKeepAlive(bool enable) { eventDriver.sockets.setKeepAlive(m_fd, enable); }
|
||||||
|
void setKeepAliveParams(Duration idle, Duration interval, int probeCount = 5) {
|
||||||
|
eventDriver.sockets.setKeepAliveParams(m_fd, idle, interval, probeCount);
|
||||||
|
}
|
||||||
|
void setUserTimeout(Duration timeout) { eventDriver.sockets.setUserTimeout(m_fd, timeout); }
|
||||||
}
|
}
|
||||||
|
|
||||||
void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode)
|
void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode)
|
||||||
|
|
|
@ -19,7 +19,7 @@ void main()
|
||||||
writeln("This doesn't work on macOS. Skipping this test until it is determined that this special case should stay supported.");
|
writeln("This doesn't work on macOS. Skipping this test until it is determined that this special case should stay supported.");
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
|
static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||||
|
|
||||||
auto baddr = new InternetAddress(0x7F000001, 40002);
|
auto baddr = new InternetAddress(0x7F000001, 40002);
|
||||||
|
|
|
@ -8,7 +8,7 @@ import eventcore.core;
|
||||||
import eventcore.socket;
|
import eventcore.socket;
|
||||||
import eventcore.internal.utils : print;
|
import eventcore.internal.utils : print;
|
||||||
import std.socket : InternetAddress;
|
import std.socket : InternetAddress;
|
||||||
import core.time : Duration, msecs;
|
import core.time : Duration, msecs, seconds;
|
||||||
|
|
||||||
ubyte[256] s_rbuf;
|
ubyte[256] s_rbuf;
|
||||||
bool s_done;
|
bool s_done;
|
||||||
|
@ -69,6 +69,10 @@ void main()
|
||||||
client = sock;
|
client = sock;
|
||||||
assert(status == ConnectStatus.connected);
|
assert(status == ConnectStatus.connected);
|
||||||
assert(sock.state == ConnectionState.connected);
|
assert(sock.state == ConnectionState.connected);
|
||||||
|
print("Setting keepalive and timeout options");
|
||||||
|
client.setKeepAlive(true);
|
||||||
|
client.setKeepAliveParams(10.seconds, 10.seconds, 4);
|
||||||
|
client.setUserTimeout(5.seconds);
|
||||||
print("Initial write");
|
print("Initial write");
|
||||||
client.write!((wstatus, bytes) {
|
client.write!((wstatus, bytes) {
|
||||||
print("Initial write done");
|
print("Initial write done");
|
||||||
|
|
Loading…
Reference in a new issue