Register sockets for events only once.

Only the callbacks are set and unset now, resulting in a considerable performance boost for the epoll backend.
This commit is contained in:
Sönke Ludwig 2016-01-16 14:48:30 +01:00
parent 2a926d87aa
commit 47c16c65cc

View file

@ -102,15 +102,11 @@ abstract class PosixEventDriver : EventDriver {
private void onConnect(FD sock) private void onConnect(FD sock)
{ {
stopNotify!(EventType.status)(sock);
stopNotify!(EventType.write)(sock);
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
} }
private void onConnectError(FD sock) private void onConnectError(FD sock)
{ {
stopNotify!(EventType.status)(sock);
stopNotify!(EventType.write)(sock);
// FIXME: determine the correct kind of error! // FIXME: determine the correct kind of error!
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
} }
@ -156,10 +152,11 @@ abstract class PosixEventDriver : EventDriver {
() @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } (); () @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } ();
if (sockfd == -1) break; if (sockfd == -1) break;
() @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } (); () @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } ();
registerFD(cast(FD)sockfd, EventMask.read|EventMask.write|EventMask.status); auto fd = cast(StreamSocketFD)sockfd;
addFD(cast(FD)sockfd); registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
addFD(fd);
//print("accept %d", sockfd); //print("accept %d", sockfd);
m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, cast(StreamSocketFD)sockfd); m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd);
} }
} }
@ -211,7 +208,7 @@ abstract class PosixEventDriver : EventDriver {
readBuffer = buffer; readBuffer = buffer;
} }
startNotify!(EventType.read)(socket, &onSocketRead); setNotifyCallback!(EventType.read)(socket, &onSocketRead);
} }
private void onSocketRead(FD fd) private void onSocketRead(FD fd)
@ -221,7 +218,7 @@ abstract class PosixEventDriver : EventDriver {
void finalize()(IOStatus status) void finalize()(IOStatus status)
{ {
stopNotify!(EventType.read)(socket); setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null; //m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead); slot.readCallback(socket, status, slot.bytesRead);
} }
@ -292,7 +289,7 @@ abstract class PosixEventDriver : EventDriver {
writeBuffer = buffer; writeBuffer = buffer;
} }
startNotify!(EventType.write)(socket, &onSocketWrite); setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
} }
private void onSocketWrite(FD fd) private void onSocketWrite(FD fd)
@ -306,14 +303,14 @@ abstract class PosixEventDriver : EventDriver {
if (ret < 0) { if (ret < 0) {
auto err = errno; auto err = errno;
if (err != EAGAIN) { if (err != EAGAIN) {
stopNotify!(EventType.write)(socket); setNotifyCallback!(EventType.write)(socket, null);
slot.readCallback(socket, IOStatus.error, slot.bytesRead); slot.readCallback(socket, IOStatus.error, slot.bytesRead);
return; return;
} }
} }
if (ret == 0) { if (ret == 0) {
stopNotify!(EventType.write)(socket); setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten);
return; return;
} }
@ -322,7 +319,7 @@ abstract class PosixEventDriver : EventDriver {
slot.bytesWritten += ret; slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $]; slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
stopNotify!(EventType.write)(socket); setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
return; return;
} }
@ -362,7 +359,7 @@ abstract class PosixEventDriver : EventDriver {
readBuffer = null; readBuffer = null;
} }
startNotify!(EventType.read)(socket, &onSocketDataAvailable); setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable);
} }
private void onSocketDataAvailable(FD fd) private void onSocketDataAvailable(FD fd)
@ -372,7 +369,7 @@ abstract class PosixEventDriver : EventDriver {
void finalize()(IOStatus status) void finalize()(IOStatus status)
{ {
stopNotify!(EventType.read)(socket); setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null; //m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, 0); slot.readCallback(socket, status, 0);
} }
@ -479,7 +476,7 @@ abstract class PosixEventDriver : EventDriver {
private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) private void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
m_fds[fd].callback[evt] = callback; m_fds[fd].callback[evt] = callback;
assert(m_fds[0].callback[evt] is null); assert(m_fds[0].callback[evt] is null);
m_waiterCount++; m_waiterCount++;
@ -488,12 +485,18 @@ abstract class PosixEventDriver : EventDriver {
private void stopNotify(EventType evt)(FD fd) private void stopNotify(EventType evt)(FD fd)
{ {
assert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
m_fds[fd].callback[evt] = null; m_fds[fd].callback[evt] = null;
m_waiterCount--; m_waiterCount--;
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd].eventMask);
} }
private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{
assert((callback !is null) != (m_fds[fd].callback[evt] !is null));
m_fds[fd].callback[evt] = callback;
}
private SocketFD createSocket(AddressFamily family) private SocketFD createSocket(AddressFamily family)
{ {
int sock; int sock;