diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 7747cde..c05181b 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -127,6 +127,10 @@ interface EventDriverSockets { */ StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect); + /** Aborts asynchronous connect by closing the socket. + */ + void cancelConnectStream(StreamSocketFD sock); + /** Adopts an existing stream socket. The given socket must be in a connected state. It will be automatically diff --git a/source/eventcore/drivers/libasync.d b/source/eventcore/drivers/libasync.d index c98c21a..b8e9332 100644 --- a/source/eventcore/drivers/libasync.d +++ b/source/eventcore/drivers/libasync.d @@ -89,6 +89,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } + override void cancelConnectStream(StreamSocketFD sock) + { + assert(false, "TODO!"); + } + override StreamSocketFD adoptStream(int socket) { assert(false, "TODO!"); diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 59d86e6..f6611ac 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -95,7 +95,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[sock].specific = StreamSocketSlot.init; m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); - releaseRef(sock); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count + releaseRef(sock); // onConnectError callback is weak reference auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); if (ret == 0) { @@ -113,14 +113,29 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets m_loop.clearFD(sock); m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); invalidateSocket(); - on_connect(sock, ConnectStatus.unknownError); - return sock; + on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError); + return StreamSocketFD.invalid; } } return sock; } + final override void cancelConnectStream(StreamSocketFD sock) + { + assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor"); + with (m_loop.m_fds[sock].streamSocket) + { + assert(state == ConnectionState.connecting, + "Unable to cancel connect on the socket that is not in connecting state"); + state = ConnectionState.closed; + connectCallback = null; + m_loop.clearFD(sock); + m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); + closeSocket(cast(sock_t)sock.value); + } + } + final override StreamSocketFD adoptStream(int socket) { auto fd = StreamSocketFD(socket); @@ -226,7 +241,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); - releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count + releaseRef(fd); // onConnectError callback is weak reference //print("accept %d", sockfd); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); @@ -889,4 +904,3 @@ private int getSocketError() version (Windows) return WSAGetLastError(); else return errno; } - diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 5c93cc6..7015df0 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -82,6 +82,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } } + final override void cancelConnectStream(StreamSocketFD sock) + { + assert(false, "Not implemented"); + } + override StreamSocketFD adoptStream(int socket) { return adoptStreamInternal(socket);