Add StreamListenOptions.

Can be used to set the SO_REUSEPORT option for multi-thread/multi-process incoming TCP connection distribution.
This commit is contained in:
Sönke Ludwig 2017-01-27 22:09:05 +01:00
parent 3a5142baf0
commit 3d81854214
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
4 changed files with 36 additions and 8 deletions

View file

@ -144,7 +144,11 @@ interface EventDriverSockets {
StreamSocketFD adoptStream(int socket); StreamSocketFD adoptStream(int socket);
/// Creates a socket listening for incoming stream connections. /// Creates a socket listening for incoming stream connections.
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept);
final StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) {
return listenStream(bind_address, StreamListenOptions.defaults, on_accept);
}
/// Starts to wait for incoming connections on a listening socket. /// Starts to wait for incoming connections on a listening socket.
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
@ -504,6 +508,11 @@ enum ConnectionState {
closed closed
} }
enum StreamListenOptions {
defaults = 0,
reusePort = 1<<0,
}
/** /**
Specifies how a file is manipulated on disk. Specifies how a file is manipulated on disk.
*/ */

View file

@ -94,7 +94,8 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) alias listenStream = EventDriverSockets.listenStream;
override StreamListenSocketFD listenStream(scope Address bind_address, ListenStreamOptions options, AcceptCallback on_accept)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }

View file

@ -17,6 +17,9 @@ version (Posix) {
import core.sys.posix.unistd : close, read, write; import core.sys.posix.unistd : close, read, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
version (linux) enum SO_REUSEPORT = 15;
else enum SO_REUSEPORT = 0x200;
} }
version (Windows) { version (Windows) {
import core.sys.windows.windows; import core.sys.windows.windows;
@ -117,7 +120,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) alias listenStream = EventDriverSockets.listenStream;
final override StreamListenSocketFD listenStream(scope Address address, StreamListenOptions options, AcceptCallback on_accept)
{ {
log("Listen stream"); log("Listen stream");
auto sockfd = createSocket(address.addressFamily, SOCK_STREAM); auto sockfd = createSocket(address.addressFamily, SOCK_STREAM);
@ -131,15 +135,28 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
int tmp_reuse = 1; int tmp_reuse = 1;
// FIXME: error handling! // FIXME: error handling!
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
log("setsockopt failed."); log("setsockopt SO_REUSEADDR failed.");
invalidateSocket(); invalidateSocket();
} else if (bind(sockfd, address.name, address.nameLen) != 0) { return;
}
version (Windows) {} else {
if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) {
log("setsockopt SO_REUSEPORT failed.");
invalidateSocket();
return;
}
}
if (bind(sockfd, address.name, address.nameLen) != 0) {
log("bind failed."); log("bind failed.");
invalidateSocket(); invalidateSocket();
} else if (listen(sockfd, 128) != 0) { return;
}
if (listen(sockfd, 128) != 0) {
log("listen failed."); log("listen failed.");
invalidateSocket(); invalidateSocket();
} else log("Success!"); return;
}
log("Success!");
} (); } ();
if (sock == StreamListenSocketFD.invalid) if (sock == StreamListenSocketFD.invalid)

View file

@ -19,7 +19,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) alias listenStream = EventDriverSockets.listenStream;
override StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }