From 3d81854214e2411a6055cf34b64690b883bfcec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 27 Jan 2017 22:09:05 +0100 Subject: [PATCH] Add StreamListenOptions. Can be used to set the SO_REUSEPORT option for multi-thread/multi-process incoming TCP connection distribution. --- source/eventcore/driver.d | 11 ++++++++- source/eventcore/drivers/libasync.d | 3 ++- source/eventcore/drivers/posix/sockets.d | 27 ++++++++++++++++++----- source/eventcore/drivers/winapi/sockets.d | 3 ++- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index efada2a..a0d9843 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -144,7 +144,11 @@ interface EventDriverSockets { StreamSocketFD adoptStream(int socket); /// Creates a socket listening for incoming stream connections. - StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); + StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept); + + final StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) { + return listenStream(bind_address, StreamListenOptions.defaults, on_accept); + } /// Starts to wait for incoming connections on a listening socket. void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); @@ -504,6 +508,11 @@ enum ConnectionState { closed } +enum StreamListenOptions { + defaults = 0, + reusePort = 1<<0, +} + /** Specifies how a file is manipulated on disk. */ diff --git a/source/eventcore/drivers/libasync.d b/source/eventcore/drivers/libasync.d index 09e8da9..1d9d18b 100644 --- a/source/eventcore/drivers/libasync.d +++ b/source/eventcore/drivers/libasync.d @@ -94,7 +94,8 @@ final class LibasyncEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } - override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) + alias listenStream = EventDriverSockets.listenStream; + override StreamListenSocketFD listenStream(scope Address bind_address, ListenStreamOptions options, AcceptCallback on_accept) { assert(false, "TODO!"); } diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index b5fc204..278e98b 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -17,6 +17,9 @@ version (Posix) { import core.sys.posix.unistd : close, read, write; import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sys.posix.fcntl; + + version (linux) enum SO_REUSEPORT = 15; + else enum SO_REUSEPORT = 0x200; } version (Windows) { import core.sys.windows.windows; @@ -117,7 +120,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) + alias listenStream = EventDriverSockets.listenStream; + final override StreamListenSocketFD listenStream(scope Address address, StreamListenOptions options, AcceptCallback on_accept) { log("Listen stream"); auto sockfd = createSocket(address.addressFamily, SOCK_STREAM); @@ -131,15 +135,28 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets int tmp_reuse = 1; // FIXME: error handling! if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { - log("setsockopt failed."); + log("setsockopt SO_REUSEADDR failed."); invalidateSocket(); - } else if (bind(sockfd, address.name, address.nameLen) != 0) { + return; + } + version (Windows) {} else { + if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) { + log("setsockopt SO_REUSEPORT failed."); + invalidateSocket(); + return; + } + } + if (bind(sockfd, address.name, address.nameLen) != 0) { log("bind failed."); invalidateSocket(); - } else if (listen(sockfd, 128) != 0) { + return; + } + if (listen(sockfd, 128) != 0) { log("listen failed."); invalidateSocket(); - } else log("Success!"); + return; + } + log("Success!"); } (); if (sock == StreamListenSocketFD.invalid) diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 14fb3cd..555e5ef 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -19,7 +19,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } - override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) + alias listenStream = EventDriverSockets.listenStream; + override StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept) { assert(false, "TODO!"); }