Fix erroneously disabling some socket events in the Posix driver.

This commit is contained in:
Sönke Ludwig 2016-11-29 11:58:46 +01:00
parent ba61afbc2b
commit 175368b334

View file

@ -240,7 +240,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
connectCallback = on_connect; connectCallback = on_connect;
state = ConnectionState.connecting; state = ConnectionState.connecting;
} }
m_loop.startNotify!(EventType.write)(sock, &onConnect); m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
} else { } else {
m_loop.clearFD(sock); m_loop.clearFD(sock);
m_loop.unregisterFD(sock); m_loop.unregisterFD(sock);
@ -311,7 +311,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
log("wait for conn"); log("wait for conn");
m_loop.registerFD(sock, EventMask.read); m_loop.registerFD(sock, EventMask.read);
m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
m_loop.startNotify!(EventType.read)(sock, &onAccept); m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept);
onAccept(sock); onAccept(sock);
} }
@ -383,6 +383,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
// NOTE: since we know that not all data was read from the stream
// socket, the next call to recv is guaranteed to return EGAIN
// and we can avoid that call.
with (m_loop.m_fds[socket].streamSocket) { with (m_loop.m_fds[socket].streamSocket) {
readCallback = on_read_finish; readCallback = on_read_finish;
readMode = mode; readMode = mode;
@ -479,6 +483,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
// NOTE: since we know that not all data was writtem to the stream
// socket, the next call to send is guaranteed to return EGAIN
// and we can avoid that call.
with (m_loop.m_fds[socket].streamSocket) { with (m_loop.m_fds[socket].streamSocket) {
writeCallback = on_write_finish; writeCallback = on_write_finish;
writeMode = mode; writeMode = mode;
@ -1040,7 +1048,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
m_loop.initFD(id); m_loop.initFD(id);
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.startNotify!(EventType.read)(id, &onEvent); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
return id; return id;
} else assert(false, "OS not supported!"); } else assert(false, "OS not supported!");
} }
@ -1390,22 +1398,6 @@ package class PosixEventLoop {
del(cast(FD)i); del(cast(FD)i);
} }
package void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{
//log("start notify %s %s", evt, fd);
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
if (callback) setNotifyCallback!evt(fd, callback);
updateFD(fd, m_fds[fd.value].common.eventMask);
}
package void stopNotify(EventType evt)(FD fd)
{
//log("stop notify %s %s", evt, fd);
//ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
if (m_fds[fd].callback) setNotifyCallback!evt(fd, null);
updateFD(fd, m_fds[fd.value].common.eventMask);
}
package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),