Merge pull request #28 from Boris-Barboris/connect_cancel
cancelConnectStream for Posix merged-on-behalf-of: Sönke Ludwig <s-ludwig@users.noreply.github.com>
This commit is contained in:
commit
f146fa01f4
|
@ -127,6 +127,10 @@ interface EventDriverSockets {
|
||||||
*/
|
*/
|
||||||
StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect);
|
StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect);
|
||||||
|
|
||||||
|
/** Aborts asynchronous connect by closing the socket.
|
||||||
|
*/
|
||||||
|
void cancelConnectStream(StreamSocketFD sock);
|
||||||
|
|
||||||
/** Adopts an existing stream socket.
|
/** Adopts an existing stream socket.
|
||||||
|
|
||||||
The given socket must be in a connected state. It will be automatically
|
The given socket must be in a connected state. It will be automatically
|
||||||
|
|
|
@ -89,6 +89,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
|
||||||
assert(false, "TODO!");
|
assert(false, "TODO!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override void cancelConnectStream(StreamSocketFD sock)
|
||||||
|
{
|
||||||
|
assert(false, "TODO!");
|
||||||
|
}
|
||||||
|
|
||||||
override StreamSocketFD adoptStream(int socket)
|
override StreamSocketFD adoptStream(int socket)
|
||||||
{
|
{
|
||||||
assert(false, "TODO!");
|
assert(false, "TODO!");
|
||||||
|
|
|
@ -95,7 +95,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
|
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
|
||||||
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
|
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
|
||||||
releaseRef(sock); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count
|
releaseRef(sock); // onConnectError callback is weak reference
|
||||||
|
|
||||||
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
|
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
|
@ -113,14 +113,29 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
m_loop.clearFD(sock);
|
m_loop.clearFD(sock);
|
||||||
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
||||||
invalidateSocket();
|
invalidateSocket();
|
||||||
on_connect(sock, ConnectStatus.unknownError);
|
on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError);
|
||||||
return sock;
|
return StreamSocketFD.invalid;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final override void cancelConnectStream(StreamSocketFD sock)
|
||||||
|
{
|
||||||
|
assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor");
|
||||||
|
with (m_loop.m_fds[sock].streamSocket)
|
||||||
|
{
|
||||||
|
assert(state == ConnectionState.connecting,
|
||||||
|
"Unable to cancel connect on the socket that is not in connecting state");
|
||||||
|
state = ConnectionState.closed;
|
||||||
|
connectCallback = null;
|
||||||
|
m_loop.clearFD(sock);
|
||||||
|
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
||||||
|
closeSocket(cast(sock_t)sock.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final override StreamSocketFD adoptStream(int socket)
|
final override StreamSocketFD adoptStream(int socket)
|
||||||
{
|
{
|
||||||
auto fd = StreamSocketFD(socket);
|
auto fd = StreamSocketFD(socket);
|
||||||
|
@ -226,7 +241,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
|
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
|
||||||
releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count
|
releaseRef(fd); // onConnectError callback is weak reference
|
||||||
//print("accept %d", sockfd);
|
//print("accept %d", sockfd);
|
||||||
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
|
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
|
||||||
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
|
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
|
||||||
|
@ -889,4 +904,3 @@ private int getSocketError()
|
||||||
version (Windows) return WSAGetLastError();
|
version (Windows) return WSAGetLastError();
|
||||||
else return errno;
|
else return errno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final override void cancelConnectStream(StreamSocketFD sock)
|
||||||
|
{
|
||||||
|
assert(false, "Not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
override StreamSocketFD adoptStream(int socket)
|
override StreamSocketFD adoptStream(int socket)
|
||||||
{
|
{
|
||||||
return adoptStreamInternal(socket);
|
return adoptStreamInternal(socket);
|
||||||
|
|
Loading…
Reference in a new issue