From b32b329d15ec1fe50b0573eb4c10f3ee0f5b0c26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 2 Nov 2019 15:00:47 +0100 Subject: [PATCH] Fix the approach to determine connect failures. On macOS it could happen that both, onConnect and onConnectError, were triggered, resulting in seemingly overlapping connection attempts when they really were sequential. This in turn triggered a connection error leak test in vibe-core. Now using only the write-ready flag plus the reported socket error status to determine failed connections, guaranteeing a single call back. --- source/eventcore/drivers/posix/sockets.d | 55 ++++++++++++----------- source/eventcore/drivers/winapi/sockets.d | 8 ++-- tests/0-tcp-cancelconn.d | 1 + 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 2fa5a45..4a3f918 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -17,7 +17,7 @@ version (Posix) { import core.sys.posix.netinet.tcp; import core.sys.posix.sys.un; import core.sys.posix.unistd : close, read, write; - import core.stdc.errno : errno, EAGAIN, EINPROGRESS; + import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED; import core.sys.posix.fcntl; import core.sys.posix.sys.socket; @@ -88,6 +88,7 @@ version (Windows) { import core.sys.windows.winsock2; alias sockaddr_storage = SOCKADDR_STORAGE; alias EAGAIN = WSAEWOULDBLOCK; + alias ECONNREFUSED = WSAECONNREFUSED; enum SHUT_RDWR = SD_BOTH; enum SHUT_RD = SD_RECEIVE; enum SHUT_WR = SD_SEND; @@ -138,9 +139,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init); - m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); - m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); - releaseRef(sock); // onConnectError callback is weak reference + m_loop.registerFD(sock, EventMask.read|EventMask.write); auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); if (ret == 0) { @@ -155,10 +154,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect); } else { + m_loop.unregisterFD(sock, EventMask.read|EventMask.write); m_loop.clearFD!StreamSocketSlot(sock); - m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); invalidateSocket(); - on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError); + on_connect(StreamSocketFD.invalid, determineConnectStatus(err)); return StreamSocketFD.invalid; } } @@ -175,11 +174,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets "Unable to cancel connect on the socket that is not in connecting state"); state = ConnectionState.closed; connectCallback = null; - m_loop.setNotifyCallback!(EventType.status)(sock, null); m_loop.setNotifyCallback!(EventType.write)(sock, null); - m_loop.clearFD!StreamSocketSlot(sock); - m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); - closeSocket(cast(sock_t)sock.value); } } @@ -190,7 +185,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return StreamSocketFD.invalid; setSocketNonBlocking(fd); m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } @@ -199,22 +194,32 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto sock = cast(StreamSocketFD)fd; auto l = lockHandle(sock); m_loop.setNotifyCallback!(EventType.write)(sock, null); + + ConnectStatus status = ConnectStatus.unknownError; + int err; + socklen_t errlen = err.sizeof; + if (() @trusted { return getsockopt(cast(sock_t)fd, SOL_SOCKET, SO_ERROR, &err, &errlen); } () == 0) + status = determineConnectStatus(err); + with (m_loop.m_fds[sock].streamSocket) { - state = ConnectionState.connected; + assert(state == ConnectionState.connecting); + + state = status == ConnectStatus.connected + ? ConnectionState.connected + : ConnectionState.closed; + auto cb = connectCallback; connectCallback = null; - if (cb) cb(sock, ConnectStatus.connected); + if (cb) cb(cast(StreamSocketFD)sock, status); } } - private void onConnectError(FD sock) + private ConnectStatus determineConnectStatus(int sock_err) { - // FIXME: determine the correct kind of error! - with (m_loop.m_fds[sock].streamSocket) { - state = ConnectionState.closed; - auto cb = connectCallback; - connectCallback = null; - if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.refused); + switch (sock_err) { + default: return ConnectStatus.unknownError; + case 0: return ConnectStatus.connected; + case ECONNREFUSED: return ConnectStatus.refused; } } @@ -287,9 +292,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto fd = cast(StreamSocketFD)sockfd; m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); - m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); - releaseRef(fd); // onConnectError callback is weak reference + m_loop.registerFD(fd, EventMask.read|EventMask.write); //print("accept %d", sockfd); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); @@ -656,7 +659,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); - m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(sock, EventMask.read|EventMask.write); return sock; } @@ -673,7 +676,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return DatagramSocketFD.init; setSocketNonBlocking(fd, close_on_exec); m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } @@ -881,7 +884,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets // listening sockets have an incremented the reference count because of setNotifyCallback int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0; if (--slot.common.refCount == base_refcount) { - m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.unregisterFD(fd, EventMask.read|EventMask.write); switch (slot.specific.kind) with (slot.specific.Kind) { default: assert(false, "File descriptor slot is not a socket."); case streamSocket: diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index bca08c2..03673fc 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -85,7 +85,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } m_core.addWaiter(); - addRef(sock); return sock; } else { clearSocketSlot(sock); @@ -103,8 +102,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { assert(state == ConnectionState.connecting, "Must be in 'connecting' state when calling cancelConnection."); - clearSocketSlot(sock); - () @trusted { closesocket(sock); } (); + state = ConnectionState.closed; + connectCallback = null; + m_core.removeWaiter(); } } @@ -845,6 +845,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { default: break; case FD_CONNECT: auto cb = slot.streamSocket.connectCallback; + if (!cb) break; // cancelled connect? + slot.streamSocket.connectCallback = null; slot.common.driver.m_core.removeWaiter(); if (err) { diff --git a/tests/0-tcp-cancelconn.d b/tests/0-tcp-cancelconn.d index 6a24452..d019e13 100644 --- a/tests/0-tcp-cancelconn.d +++ b/tests/0-tcp-cancelconn.d @@ -28,6 +28,7 @@ void main() eventDriver.timers.wait(tm, (tm) { assert(eventDriver.sockets.getConnectionState(sock) == ConnectionState.connecting); eventDriver.sockets.cancelConnectStream(sock); + eventDriver.sockets.releaseRef(sock); s_done = true; });