Fix the approach to determine connect failures.

On macOS it could happen that both, onConnect and onConnectError, were triggered, resulting in seemingly overlapping connection attempts when they really were sequential. This in turn triggered a connection error leak test in vibe-core.

Now using only the write-ready flag plus the reported socket error status to determine failed connections, guaranteeing a single call back.
This commit is contained in:
Sönke Ludwig 2019-11-02 15:00:47 +01:00
parent 4813cba338
commit b32b329d15
3 changed files with 35 additions and 29 deletions

View file

@ -17,7 +17,7 @@ version (Posix) {
import core.sys.posix.netinet.tcp;
import core.sys.posix.sys.un;
import core.sys.posix.unistd : close, read, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED;
import core.sys.posix.fcntl;
import core.sys.posix.sys.socket;
@ -88,6 +88,7 @@ version (Windows) {
import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK;
alias ECONNREFUSED = WSAECONNREFUSED;
enum SHUT_RDWR = SD_BOTH;
enum SHUT_RD = SD_RECEIVE;
enum SHUT_WR = SD_SEND;
@ -138,9 +139,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
releaseRef(sock); // onConnectError callback is weak reference
m_loop.registerFD(sock, EventMask.read|EventMask.write);
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
if (ret == 0) {
@ -155,10 +154,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
} else {
m_loop.unregisterFD(sock, EventMask.read|EventMask.write);
m_loop.clearFD!StreamSocketSlot(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
invalidateSocket();
on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError);
on_connect(StreamSocketFD.invalid, determineConnectStatus(err));
return StreamSocketFD.invalid;
}
}
@ -175,11 +174,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
"Unable to cancel connect on the socket that is not in connecting state");
state = ConnectionState.closed;
connectCallback = null;
m_loop.setNotifyCallback!(EventType.status)(sock, null);
m_loop.setNotifyCallback!(EventType.write)(sock, null);
m_loop.clearFD!StreamSocketSlot(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
closeSocket(cast(sock_t)sock.value);
}
}
@ -190,7 +185,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return StreamSocketFD.invalid;
setSocketNonBlocking(fd);
m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.registerFD(fd, EventMask.read|EventMask.write);
return fd;
}
@ -199,22 +194,32 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
auto sock = cast(StreamSocketFD)fd;
auto l = lockHandle(sock);
m_loop.setNotifyCallback!(EventType.write)(sock, null);
ConnectStatus status = ConnectStatus.unknownError;
int err;
socklen_t errlen = err.sizeof;
if (() @trusted { return getsockopt(cast(sock_t)fd, SOL_SOCKET, SO_ERROR, &err, &errlen); } () == 0)
status = determineConnectStatus(err);
with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.connected;
assert(state == ConnectionState.connecting);
state = status == ConnectStatus.connected
? ConnectionState.connected
: ConnectionState.closed;
auto cb = connectCallback;
connectCallback = null;
if (cb) cb(sock, ConnectStatus.connected);
if (cb) cb(cast(StreamSocketFD)sock, status);
}
}
private void onConnectError(FD sock)
private ConnectStatus determineConnectStatus(int sock_err)
{
// FIXME: determine the correct kind of error!
with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.closed;
auto cb = connectCallback;
connectCallback = null;
if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.refused);
switch (sock_err) {
default: return ConnectStatus.unknownError;
case 0: return ConnectStatus.connected;
case ECONNREFUSED: return ConnectStatus.refused;
}
}
@ -287,9 +292,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
auto fd = cast(StreamSocketFD)sockfd;
m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
releaseRef(fd); // onConnectError callback is weak reference
m_loop.registerFD(fd, EventMask.read|EventMask.write);
//print("accept %d", sockfd);
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
@ -656,7 +659,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.registerFD(sock, EventMask.read|EventMask.write);
return sock;
}
@ -673,7 +676,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return DatagramSocketFD.init;
setSocketNonBlocking(fd, close_on_exec);
m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.registerFD(fd, EventMask.read|EventMask.write);
return fd;
}
@ -881,7 +884,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
// listening sockets have an incremented the reference count because of setNotifyCallback
int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0;
if (--slot.common.refCount == base_refcount) {
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.unregisterFD(fd, EventMask.read|EventMask.write);
switch (slot.specific.kind) with (slot.specific.Kind) {
default: assert(false, "File descriptor slot is not a socket.");
case streamSocket:

View file

@ -85,7 +85,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
}
m_core.addWaiter();
addRef(sock);
return sock;
} else {
clearSocketSlot(sock);
@ -103,8 +102,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
assert(state == ConnectionState.connecting,
"Must be in 'connecting' state when calling cancelConnection.");
clearSocketSlot(sock);
() @trusted { closesocket(sock); } ();
state = ConnectionState.closed;
connectCallback = null;
m_core.removeWaiter();
}
}
@ -845,6 +845,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
default: break;
case FD_CONNECT:
auto cb = slot.streamSocket.connectCallback;
if (!cb) break; // cancelled connect?
slot.streamSocket.connectCallback = null;
slot.common.driver.m_core.removeWaiter();
if (err) {

View file

@ -28,6 +28,7 @@ void main()
eventDriver.timers.wait(tm, (tm) {
assert(eventDriver.sockets.getConnectionState(sock) == ConnectionState.connecting);
eventDriver.sockets.cancelConnectStream(sock);
eventDriver.sockets.releaseRef(sock);
s_done = true;
});