diff --git a/source/eventcore/posix.d b/source/eventcore/posix.d index 2ed7cd3..b49792f 100644 --- a/source/eventcore/posix.d +++ b/source/eventcore/posix.d @@ -102,15 +102,11 @@ abstract class PosixEventDriver : EventDriver { private void onConnect(FD sock) { - stopNotify!(EventType.status)(sock); - stopNotify!(EventType.write)(sock); m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); } private void onConnectError(FD sock) { - stopNotify!(EventType.status)(sock); - stopNotify!(EventType.write)(sock); // FIXME: determine the correct kind of error! m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); } @@ -156,10 +152,11 @@ abstract class PosixEventDriver : EventDriver { () @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } (); if (sockfd == -1) break; () @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } (); - registerFD(cast(FD)sockfd, EventMask.read|EventMask.write|EventMask.status); - addFD(cast(FD)sockfd); + auto fd = cast(StreamSocketFD)sockfd; + registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + addFD(fd); //print("accept %d", sockfd); - m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, cast(StreamSocketFD)sockfd); + m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); } } @@ -211,7 +208,7 @@ abstract class PosixEventDriver : EventDriver { readBuffer = buffer; } - startNotify!(EventType.read)(socket, &onSocketRead); + setNotifyCallback!(EventType.read)(socket, &onSocketRead); } private void onSocketRead(FD fd) @@ -221,7 +218,7 @@ abstract class PosixEventDriver : EventDriver { void finalize()(IOStatus status) { - stopNotify!(EventType.read)(socket); + setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, slot.bytesRead); } @@ -292,7 +289,7 @@ abstract class PosixEventDriver : EventDriver { writeBuffer = buffer; } - startNotify!(EventType.write)(socket, &onSocketWrite); + setNotifyCallback!(EventType.write)(socket, &onSocketWrite); } private void onSocketWrite(FD fd) @@ -306,14 +303,14 @@ abstract class PosixEventDriver : EventDriver { if (ret < 0) { auto err = errno; if (err != EAGAIN) { - stopNotify!(EventType.write)(socket); + setNotifyCallback!(EventType.write)(socket, null); slot.readCallback(socket, IOStatus.error, slot.bytesRead); return; } } if (ret == 0) { - stopNotify!(EventType.write)(socket); + setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten); return; } @@ -322,7 +319,7 @@ abstract class PosixEventDriver : EventDriver { slot.bytesWritten += ret; slot.writeBuffer = slot.writeBuffer[ret .. $]; if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { - stopNotify!(EventType.write)(socket); + setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); return; } @@ -362,7 +359,7 @@ abstract class PosixEventDriver : EventDriver { readBuffer = null; } - startNotify!(EventType.read)(socket, &onSocketDataAvailable); + setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable); } private void onSocketDataAvailable(FD fd) @@ -372,7 +369,7 @@ abstract class PosixEventDriver : EventDriver { void finalize()(IOStatus status) { - stopNotify!(EventType.read)(socket); + setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, 0); } @@ -479,7 +476,7 @@ abstract class PosixEventDriver : EventDriver { private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) { - assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); + //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); m_fds[fd].callback[evt] = callback; assert(m_fds[0].callback[evt] is null); m_waiterCount++; @@ -488,12 +485,18 @@ abstract class PosixEventDriver : EventDriver { private void stopNotify(EventType evt)(FD fd) { - assert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); + //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); m_fds[fd].callback[evt] = null; m_waiterCount--; updateFD(fd, m_fds[fd].eventMask); } + private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) + { + assert((callback !is null) != (m_fds[fd].callback[evt] !is null)); + m_fds[fd].callback[evt] = callback; + } + private SocketFD createSocket(AddressFamily family) { int sock;