cancelConnectStream for Posix

This commit is contained in:
Boris-Barboris 2017-11-11 01:14:54 +00:00
parent 8d7c1e3c96
commit 0cac9d56e0
4 changed files with 38 additions and 6 deletions

View file

@ -127,6 +127,10 @@ interface EventDriverSockets {
*/ */
StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect); StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect);
/** Aborts asynchronous connect by closing the socket.
*/
void cancelConnectStream(StreamSocketFD sock);
/** Adopts an existing stream socket. /** Adopts an existing stream socket.
The given socket must be in a connected state. It will be automatically The given socket must be in a connected state. It will be automatically
@ -553,7 +557,8 @@ enum ConnectStatus {
refused, refused,
timeout, timeout,
bindFailure, bindFailure,
unknownError unknownError,
cancelled
} }
enum ConnectionState { enum ConnectionState {

View file

@ -89,6 +89,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void cancelConnectStream(StreamSocketFD sock)
{
assert(false, "TODO!");
}
override StreamSocketFD adoptStream(int socket) override StreamSocketFD adoptStream(int socket)
{ {
assert(false, "TODO!"); assert(false, "TODO!");

View file

@ -95,7 +95,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init; m_loop.m_fds[sock].specific = StreamSocketSlot.init;
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
releaseRef(sock); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count releaseRef(sock); // onConnectError callback is weak reference
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
if (ret == 0) { if (ret == 0) {
@ -113,14 +113,32 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.clearFD(sock); m_loop.clearFD(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
invalidateSocket(); invalidateSocket();
on_connect(sock, ConnectStatus.unknownError); on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError);
return sock; return StreamSocketFD.invalid;
} }
} }
return sock; return sock;
} }
final override void cancelConnectStream(StreamSocketFD sock)
{
assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor");
with (m_loop.m_fds[sock].streamSocket)
{
assert(state == ConnectionState.connecting,
"Unable to cancel connect on the socket that is not in connecting state");
state = ConnectionState.closed;
auto cb = connectCallback;
connectCallback = null;
m_loop.clearFD(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
closeSocket(cast(sock_t)sock.value);
if (cb)
cb(StreamSocketFD.invalid, ConnectStatus.cancelled);
}
}
final override StreamSocketFD adoptStream(int socket) final override StreamSocketFD adoptStream(int socket)
{ {
auto fd = StreamSocketFD(socket); auto fd = StreamSocketFD(socket);
@ -226,7 +244,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count releaseRef(fd); // onConnectError callback is weak reference
//print("accept %d", sockfd); //print("accept %d", sockfd);
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
@ -889,4 +907,3 @@ private int getSocketError()
version (Windows) return WSAGetLastError(); version (Windows) return WSAGetLastError();
else return errno; else return errno;
} }

View file

@ -82,6 +82,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
} }
final override void cancelConnectStream(StreamSocketFD sock)
{
assert(false, "Not implemented");
}
override StreamSocketFD adoptStream(int socket) override StreamSocketFD adoptStream(int socket)
{ {
return adoptStreamInternal(socket); return adoptStreamInternal(socket);