Use level-triggered events for listening stream sockets.

The limit of 20 accepted connections per event invocation can otherwise lead to no more connections being accepted when more that 20 connections are available at a time. This is an adaptation of @Boris-Barboris pull request #21 to restrict the level triggered behavior to listening sockets. See also #20.
This commit is contained in:
Sönke Ludwig 2017-09-02 00:11:34 +02:00
parent eb595ef858
commit fc5df2f949
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
5 changed files with 20 additions and 17 deletions

View file

@ -91,7 +91,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
{ {
m_files.dispose(); m_files.dispose();
m_dns.dispose(); m_dns.dispose();
m_loop.dispose(); m_loop.dispose();
} }
} }
@ -215,11 +215,11 @@ package class PosixEventLoop {
protected abstract bool doProcessEvents(Duration dur); protected abstract bool doProcessEvents(Duration dur);
/// Registers the FD for general notification reception. /// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask); protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true);
/// Unregisters the FD for general notification reception. /// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd, EventMask mask); protected abstract void unregisterFD(FD fd, EventMask mask);
/// Updates the event mask to use for listening for notifications. /// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask); protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true);
final protected void notify(EventType evt)(FD fd) final protected void notify(EventType evt)(FD fd)
{ {

View file

@ -62,11 +62,11 @@ final class EpollEventLoop : PosixEventLoop {
close(m_epoll); close(m_epoll);
} }
override void registerFD(FD fd, EventMask mask) override void registerFD(FD fd, EventMask mask, bool edge_triggered = true)
{ {
debug (EventCoreEpollDebug) print("Epoll register FD %s: %s", fd, mask); debug (EventCoreEpollDebug) print("Epoll register FD %s: %s", fd, mask);
epoll_event ev; epoll_event ev;
ev.events |= EPOLLET; if (edge_triggered) ev.events |= EPOLLET;
if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP;
@ -80,11 +80,11 @@ final class EpollEventLoop : PosixEventLoop {
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } (); () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } ();
} }
override void updateFD(FD fd, EventMask old_mask, EventMask mask) override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true)
{ {
debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask); debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask);
epoll_event ev; epoll_event ev;
ev.events |= EPOLLET; if (edge_triggered) ev.events |= EPOLLET;
//ev.events = EPOLLONESHOT; //ev.events = EPOLLONESHOT;
if (mask & EventMask.read) ev.events |= EPOLLIN; if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.write) ev.events |= EPOLLOUT;

View file

@ -86,12 +86,13 @@ final class KqueueEventLoop : PosixEventLoop {
close(m_queue); close(m_queue);
} }
override void registerFD(FD fd, EventMask mask) override void registerFD(FD fd, EventMask mask, bool edge_triggered = true)
{ {
//print("register %s %s", fd, mask); //print("register %s %s", fd, mask);
kevent_t ev; kevent_t ev;
ev.ident = fd; ev.ident = fd;
ev.flags = EV_ADD|EV_CLEAR|EV_ENABLE; ev.flags = EV_ADD|EV_ENABLE;
if (edge_triggered) ev.flags |= EV_CLEAR;
if (mask & EventMask.read) { if (mask & EventMask.read) {
ev.filter = EVFILT_READ; ev.filter = EVFILT_READ;
m_changes ~= ev; m_changes ~= ev;
@ -111,21 +112,23 @@ final class KqueueEventLoop : PosixEventLoop {
m_changes ~= ev; m_changes ~= ev;
} }
override void updateFD(FD fd, EventMask old_mask, EventMask new_mask) override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true)
{ {
//print("update %s %s", fd, mask); //print("update %s %s", fd, mask);
kevent_t ev; kevent_t ev;
auto changes = old_mask ^ new_mask; auto changes = old_mask ^ new_mask;
if (changes & EventMask.read) { if (changes & EventMask.read) {
ev.filter = EVFILT_READ; ev.filter = EVFILT_READ;
ev.flags = EV_CLEAR | (new_mask & EventMask.read ? EV_ADD : EV_DELETE); ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev; m_changes ~= ev;
} }
if (changes & EventMask.write) { if (changes & EventMask.write) {
ev.filter = EVFILT_WRITE; ev.filter = EVFILT_WRITE;
ev.flags = EV_CLEAR | (new_mask & EventMask.write ? EV_ADD : EV_DELETE); ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev; m_changes ~= ev;
} }

View file

@ -72,7 +72,7 @@ final class SelectEventLoop : PosixEventLoop {
{ {
} }
override void registerFD(FD fd, EventMask mask) override void registerFD(FD fd, EventMask mask, bool edge_triggered = true)
{ {
} }
@ -80,7 +80,7 @@ final class SelectEventLoop : PosixEventLoop {
{ {
} }
override void updateFD(FD fd, EventMask old_mask, EventMask mask) override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true)
{ {
} }
} }

View file

@ -172,7 +172,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{ {
m_loop.registerFD(sock, EventMask.read); m_loop.registerFD(sock, EventMask.read, false);
m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept); m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept);
onAccept(sock); onAccept(sock);