Improve performance of accepting incoming connections.
- Removes the accept() loop, as that doesn't measurably improve performance in practice. This also gives each connection the possibility to perform initial I/O before more concurrency is generated by accepting the next connection. - Uses SOCK_NONBLOCK in conjunction with socket() and accept4() to save one additional syscall per connection.
This commit is contained in:
parent
fc5df2f949
commit
07baa0f612
|
@ -21,6 +21,11 @@ version (Posix) {
|
||||||
version (linux) enum SO_REUSEPORT = 15;
|
version (linux) enum SO_REUSEPORT = 15;
|
||||||
else enum SO_REUSEPORT = 0x200;
|
else enum SO_REUSEPORT = 0x200;
|
||||||
}
|
}
|
||||||
|
version (linux) {
|
||||||
|
extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc;
|
||||||
|
static if (!is(typeof(SOCK_NONBLOCK)))
|
||||||
|
enum SOCK_NONBLOCK = 0x800;
|
||||||
|
}
|
||||||
version (Windows) {
|
version (Windows) {
|
||||||
import core.sys.windows.windows;
|
import core.sys.windows.windows;
|
||||||
import core.sys.windows.winsock2;
|
import core.sys.windows.winsock2;
|
||||||
|
@ -180,25 +185,27 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
|
|
||||||
private void onAccept(FD listenfd)
|
private void onAccept(FD listenfd)
|
||||||
{
|
{
|
||||||
foreach (i; 0 .. 20) {
|
sock_t sockfd;
|
||||||
sock_t sockfd;
|
sockaddr_storage addr;
|
||||||
sockaddr_storage addr;
|
socklen_t addr_len = addr.sizeof;
|
||||||
socklen_t addr_len = addr.sizeof;
|
version (linux) {
|
||||||
|
() @trusted { sockfd = accept4(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len, SOCK_NONBLOCK); } ();
|
||||||
|
if (sockfd == -1) return;
|
||||||
|
} else {
|
||||||
() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
|
() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
|
||||||
if (sockfd == -1) break;
|
if (sockfd == -1) return;
|
||||||
|
|
||||||
setSocketNonBlocking(cast(SocketFD)sockfd);
|
setSocketNonBlocking(cast(SocketFD)sockfd);
|
||||||
auto fd = cast(StreamSocketFD)sockfd;
|
|
||||||
m_loop.initFD(fd, FDFlags.none);
|
|
||||||
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
|
||||||
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
|
||||||
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
|
|
||||||
releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count
|
|
||||||
//print("accept %d", sockfd);
|
|
||||||
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
|
|
||||||
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
|
|
||||||
}
|
}
|
||||||
|
auto fd = cast(StreamSocketFD)sockfd;
|
||||||
|
m_loop.initFD(fd, FDFlags.none);
|
||||||
|
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
||||||
|
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
||||||
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
|
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
|
||||||
|
releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count
|
||||||
|
//print("accept %d", sockfd);
|
||||||
|
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
|
||||||
|
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionState getConnectionState(StreamSocketFD sock)
|
ConnectionState getConnectionState(StreamSocketFD sock)
|
||||||
|
@ -737,9 +744,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
private sock_t createSocket(AddressFamily family, int type)
|
private sock_t createSocket(AddressFamily family, int type)
|
||||||
{
|
{
|
||||||
sock_t sock;
|
sock_t sock;
|
||||||
() @trusted { sock = socket(family, type, 0); } ();
|
version (linux) {
|
||||||
if (sock == -1) return -1;
|
() @trusted { sock = socket(family, type | SOCK_NONBLOCK, 0); } ();
|
||||||
setSocketNonBlocking(cast(SocketFD)sock);
|
if (sock == -1) return -1;
|
||||||
|
} else {
|
||||||
|
() @trusted { sock = socket(family, type, 0); } ();
|
||||||
|
if (sock == -1) return -1;
|
||||||
|
setSocketNonBlocking(cast(SocketFD)sock);
|
||||||
|
}
|
||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue