diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 2fa5a45..4a3f918 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -17,7 +17,7 @@ version (Posix) { import core.sys.posix.netinet.tcp; import core.sys.posix.sys.un; import core.sys.posix.unistd : close, read, write; - import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED; import core.sys.posix.fcntl; import core.sys.posix.sys.socket; @@ -88,6 +88,7 @@ version (Windows) { import core.sys.windows.winsock2; alias sockaddr_storage = SOCKADDR_STORAGE; alias EAGAIN = WSAEWOULDBLOCK; + alias ECONNREFUSED = WSAECONNREFUSED; enum SHUT_RDWR = SD_BOTH; enum SHUT_RD = SD_RECEIVE; enum SHUT_WR = SD_SEND; @@ -138,9 +139,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init); - m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); - m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); - releaseRef(sock); // onConnectError callback is weak reference + m_loop.registerFD(sock, EventMask.read|EventMask.write); auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); if (ret == 0) { @@ -155,10 +154,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect); } else { + m_loop.unregisterFD(sock, EventMask.read|EventMask.write); m_loop.clearFD!StreamSocketSlot(sock); - m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); invalidateSocket(); - on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError); + on_connect(StreamSocketFD.invalid, determineConnectStatus(err)); return StreamSocketFD.invalid; } } @@ -175,11 +174,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets "Unable to cancel connect on the socket that is not in connecting state"); state = ConnectionState.closed; connectCallback = null; - m_loop.setNotifyCallback!(EventType.status)(sock, null); m_loop.setNotifyCallback!(EventType.write)(sock, null); - m_loop.clearFD!StreamSocketSlot(sock); - m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); - closeSocket(cast(sock_t)sock.value); } } @@ -190,7 +185,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return StreamSocketFD.invalid; setSocketNonBlocking(fd); m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } @@ -199,22 +194,32 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto sock = cast(StreamSocketFD)fd; auto l = lockHandle(sock); m_loop.setNotifyCallback!(EventType.write)(sock, null); + + ConnectStatus status = ConnectStatus.unknownError; + int err; + socklen_t errlen = err.sizeof; + if (() @trusted { return getsockopt(cast(sock_t)fd, SOL_SOCKET, SO_ERROR, &err, &errlen); } () == 0) + status = determineConnectStatus(err); + with (m_loop.m_fds[sock].streamSocket) { - state = ConnectionState.connected; + assert(state == ConnectionState.connecting); + + state = status == ConnectStatus.connected + ? ConnectionState.connected + : ConnectionState.closed; + auto cb = connectCallback; connectCallback = null; - if (cb) cb(sock, ConnectStatus.connected); + if (cb) cb(cast(StreamSocketFD)sock, status); } } - private void onConnectError(FD sock) + private ConnectStatus determineConnectStatus(int sock_err) { - // FIXME: determine the correct kind of error! - with (m_loop.m_fds[sock].streamSocket) { - state = ConnectionState.closed; - auto cb = connectCallback; - connectCallback = null; - if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.refused); + switch (sock_err) { + default: return ConnectStatus.unknownError; + case 0: return ConnectStatus.connected; + case ECONNREFUSED: return ConnectStatus.refused; } } @@ -287,9 +292,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto fd = cast(StreamSocketFD)sockfd; m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); - m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); - releaseRef(fd); // onConnectError callback is weak reference + m_loop.registerFD(fd, EventMask.read|EventMask.write); //print("accept %d", sockfd); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); @@ -656,7 +659,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); - m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(sock, EventMask.read|EventMask.write); return sock; } @@ -673,7 +676,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return DatagramSocketFD.init; setSocketNonBlocking(fd, close_on_exec); m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } @@ -881,7 +884,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets // listening sockets have an incremented the reference count because of setNotifyCallback int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0; if (--slot.common.refCount == base_refcount) { - m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.unregisterFD(fd, EventMask.read|EventMask.write); switch (slot.specific.kind) with (slot.specific.Kind) { default: assert(false, "File descriptor slot is not a socket."); case streamSocket: diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index bca08c2..03673fc 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -85,7 +85,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } m_core.addWaiter(); - addRef(sock); return sock; } else { clearSocketSlot(sock); @@ -103,8 +102,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { assert(state == ConnectionState.connecting, "Must be in 'connecting' state when calling cancelConnection."); - clearSocketSlot(sock); - () @trusted { closesocket(sock); } (); + state = ConnectionState.closed; + connectCallback = null; + m_core.removeWaiter(); } } @@ -845,6 +845,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { default: break; case FD_CONNECT: auto cb = slot.streamSocket.connectCallback; + if (!cb) break; // cancelled connect? + slot.streamSocket.connectCallback = null; slot.common.driver.m_core.removeWaiter(); if (err) { diff --git a/tests/0-tcp-cancelconn.d b/tests/0-tcp-cancelconn.d index 6a24452..d019e13 100644 --- a/tests/0-tcp-cancelconn.d +++ b/tests/0-tcp-cancelconn.d @@ -28,6 +28,7 @@ void main() eventDriver.timers.wait(tm, (tm) { assert(eventDriver.sockets.getConnectionState(sock) == ConnectionState.connecting); eventDriver.sockets.cancelConnectStream(sock); + eventDriver.sockets.releaseRef(sock); s_done = true; });