Fix (immediate) detection of remove connection close for EPOLL.

This commit is contained in:
Sönke Ludwig 2017-01-30 21:01:31 +01:00
parent 5a5dcd6376
commit 773e09cd3d
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
2 changed files with 7 additions and 6 deletions

View file

@ -48,10 +48,9 @@ final class EpollEventLoop : PosixEventLoop {
foreach (ref evt; m_events[0 .. ret]) { foreach (ref evt; m_events[0 .. ret]) {
debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events); debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events);
auto fd = cast(FD)evt.data.fd; auto fd = cast(FD)evt.data.fd;
if (evt.events & (EPOLLERR|EPOLLHUP|EPOLLRDHUP)) notify!(EventType.status)(fd);
if (evt.events & EPOLLIN) notify!(EventType.read)(fd); if (evt.events & EPOLLIN) notify!(EventType.read)(fd);
if (evt.events & EPOLLOUT) notify!(EventType.write)(fd); if (evt.events & EPOLLOUT) notify!(EventType.write)(fd);
if (evt.events & EPOLLERR) notify!(EventType.status)(fd);
else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd);
} }
return true; return true;
} else return false; } else return false;
@ -70,7 +69,7 @@ final class EpollEventLoop : PosixEventLoop {
ev.events |= EPOLLET; ev.events |= EPOLLET;
if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP|EPOLLRDHUP;
ev.data.fd = cast(int)fd; ev.data.fd = cast(int)fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); } (); () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); } ();
} }

View file

@ -65,6 +65,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.initFD(sock); m_loop.initFD(sock);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init; m_loop.m_fds[sock].specific = StreamSocketSlot.init;
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
if (ret == 0) { if (ret == 0) {
@ -108,9 +109,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
with (m_loop.m_fds[sock].streamSocket) { with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.connected; state = ConnectionState.connected;
assert(connectCallback !is null); assert(connectCallback !is null);
auto cb = connectCallback; connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
connectCallback = null; connectCallback = null;
cb(cast(StreamSocketFD)sock, ConnectStatus.connected);
} }
} }
@ -119,8 +119,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
// FIXME: determine the correct kind of error! // FIXME: determine the correct kind of error!
with (m_loop.m_fds[sock].streamSocket) { with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.closed; state = ConnectionState.closed;
connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); auto cb = connectCallback;
connectCallback = null; connectCallback = null;
if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.refused);
} }
} }
@ -198,6 +199,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.m_fds[fd].specific = StreamSocketSlot.init; m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
//print("accept %d", sockfd); //print("accept %d", sockfd);
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);