From fc5df2f94986a05bb51f5ca7814b786eec3f5606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 2 Sep 2017 00:11:34 +0200 Subject: [PATCH] Use level-triggered events for listening stream sockets. The limit of 20 accepted connections per event invocation can otherwise lead to no more connections being accepted when more that 20 connections are available at a time. This is an adaptation of @Boris-Barboris pull request #21 to restrict the level triggered behavior to listening sockets. See also #20. --- source/eventcore/drivers/posix/driver.d | 6 +++--- source/eventcore/drivers/posix/epoll.d | 8 ++++---- source/eventcore/drivers/posix/kqueue.d | 17 ++++++++++------- source/eventcore/drivers/posix/select.d | 4 ++-- source/eventcore/drivers/posix/sockets.d | 2 +- 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 73c2bd6..b523d67 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -91,7 +91,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { { m_files.dispose(); m_dns.dispose(); - m_loop.dispose(); + m_loop.dispose(); } } @@ -215,11 +215,11 @@ package class PosixEventLoop { protected abstract bool doProcessEvents(Duration dur); /// Registers the FD for general notification reception. - protected abstract void registerFD(FD fd, EventMask mask); + protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true); /// Unregisters the FD for general notification reception. protected abstract void unregisterFD(FD fd, EventMask mask); /// Updates the event mask to use for listening for notifications. - protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask); + protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true); final protected void notify(EventType evt)(FD fd) { diff --git a/source/eventcore/drivers/posix/epoll.d b/source/eventcore/drivers/posix/epoll.d index cca2283..672b2cb 100644 --- a/source/eventcore/drivers/posix/epoll.d +++ b/source/eventcore/drivers/posix/epoll.d @@ -62,11 +62,11 @@ final class EpollEventLoop : PosixEventLoop { close(m_epoll); } - override void registerFD(FD fd, EventMask mask) + override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) { debug (EventCoreEpollDebug) print("Epoll register FD %s: %s", fd, mask); epoll_event ev; - ev.events |= EPOLLET; + if (edge_triggered) ev.events |= EPOLLET; if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; @@ -80,11 +80,11 @@ final class EpollEventLoop : PosixEventLoop { () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } (); } - override void updateFD(FD fd, EventMask old_mask, EventMask mask) + override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) { debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask); epoll_event ev; - ev.events |= EPOLLET; + if (edge_triggered) ev.events |= EPOLLET; //ev.events = EPOLLONESHOT; if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.write) ev.events |= EPOLLOUT; diff --git a/source/eventcore/drivers/posix/kqueue.d b/source/eventcore/drivers/posix/kqueue.d index 43e27a6..cf1aec4 100644 --- a/source/eventcore/drivers/posix/kqueue.d +++ b/source/eventcore/drivers/posix/kqueue.d @@ -86,12 +86,13 @@ final class KqueueEventLoop : PosixEventLoop { close(m_queue); } - override void registerFD(FD fd, EventMask mask) + override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) { //print("register %s %s", fd, mask); kevent_t ev; ev.ident = fd; - ev.flags = EV_ADD|EV_CLEAR|EV_ENABLE; + ev.flags = EV_ADD|EV_ENABLE; + if (edge_triggered) ev.flags |= EV_CLEAR; if (mask & EventMask.read) { ev.filter = EVFILT_READ; m_changes ~= ev; @@ -111,21 +112,23 @@ final class KqueueEventLoop : PosixEventLoop { m_changes ~= ev; } - override void updateFD(FD fd, EventMask old_mask, EventMask new_mask) + override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) { //print("update %s %s", fd, mask); kevent_t ev; auto changes = old_mask ^ new_mask; - + if (changes & EventMask.read) { ev.filter = EVFILT_READ; - ev.flags = EV_CLEAR | (new_mask & EventMask.read ? EV_ADD : EV_DELETE); + ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE; + if (edge_triggered) ev.flags |= EV_CLEAR; m_changes ~= ev; } - + if (changes & EventMask.write) { ev.filter = EVFILT_WRITE; - ev.flags = EV_CLEAR | (new_mask & EventMask.write ? EV_ADD : EV_DELETE); + ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE; + if (edge_triggered) ev.flags |= EV_CLEAR; m_changes ~= ev; } diff --git a/source/eventcore/drivers/posix/select.d b/source/eventcore/drivers/posix/select.d index 903968a..dabc491 100644 --- a/source/eventcore/drivers/posix/select.d +++ b/source/eventcore/drivers/posix/select.d @@ -72,7 +72,7 @@ final class SelectEventLoop : PosixEventLoop { { } - override void registerFD(FD fd, EventMask mask) + override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) { } @@ -80,7 +80,7 @@ final class SelectEventLoop : PosixEventLoop { { } - override void updateFD(FD fd, EventMask old_mask, EventMask mask) + override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) { } } diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 0204d26..6fb72d8 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -172,7 +172,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { - m_loop.registerFD(sock, EventMask.read); + m_loop.registerFD(sock, EventMask.read, false); m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept); onAccept(sock);