Make the kqueue event driver work.

This commit is contained in:
Sönke Ludwig 2017-01-22 20:54:09 +01:00
parent c9c6d73f5e
commit 25bab3e37e
4 changed files with 73 additions and 25 deletions

View file

@ -75,13 +75,13 @@ final class EpollEventLoop : PosixEventLoop {
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, fd, &ev); } (); () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, fd, &ev); } ();
} }
override void unregisterFD(FD fd) override void unregisterFD(FD fd, EventMask mask)
{ {
debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd); debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd);
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, fd, null); } (); () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, fd, null); } ();
} }
override void updateFD(FD fd, EventMask mask) override void updateFD(FD fd, EventMask old_mask, EventMask mask)
{ {
debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask); debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask);
epoll_event ev; epoll_event ev;

View file

@ -37,8 +37,9 @@ final class KqueueEventLoop : PosixEventLoop {
} }
this() this()
@safe nothrow @nogc { @safe nothrow {
m_queue = () @trusted { return kqueue(); } (); m_queue = () @trusted { return kqueue(); } ();
m_events.length = 100;
assert(m_queue >= 0, "Failed to create kqueue."); assert(m_queue >= 0, "Failed to create kqueue.");
} }
@ -58,9 +59,11 @@ final class KqueueEventLoop : PosixEventLoop {
m_changes.length = 0; m_changes.length = 0;
m_changes.assumeSafeAppend(); m_changes.assumeSafeAppend();
print("kevent returned %s", ret);
if (ret > 0) { if (ret > 0) {
foreach (ref evt; m_events[0 .. ret]) { foreach (ref evt; m_events[0 .. ret]) {
//print("event %s %s", evt.data.fd, evt.events); print("event %s %s", evt.ident, evt.filter, evt.flags);
assert(evt.ident <= uint.max); assert(evt.ident <= uint.max);
auto fd = cast(FD)cast(int)evt.ident; auto fd = cast(FD)cast(int)evt.ident;
if (evt.flags & (EV_EOF|EV_ERROR)) if (evt.flags & (EV_EOF|EV_ERROR))
@ -89,13 +92,18 @@ final class KqueueEventLoop : PosixEventLoop {
kevent_t ev; kevent_t ev;
ev.ident = fd; ev.ident = fd;
ev.flags = EV_ADD|EV_CLEAR|EV_ENABLE; ev.flags = EV_ADD|EV_CLEAR|EV_ENABLE;
if (mask & EventMask.read) ev.filter |= EVFILT_READ; if (mask & EventMask.read) {
if (mask & EventMask.write) ev.filter |= EVFILT_WRITE; ev.filter = EVFILT_READ;
m_changes ~= ev;
}
if (mask & EventMask.write) {
ev.filter = EVFILT_WRITE;
m_changes ~= ev;
}
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
m_changes ~= ev;
} }
override void unregisterFD(FD fd) override void unregisterFD(FD fd, EventMask mask)
{ {
kevent_t ev; kevent_t ev;
ev.ident = fd; ev.ident = fd;
@ -103,15 +111,24 @@ final class KqueueEventLoop : PosixEventLoop {
m_changes ~= ev; m_changes ~= ev;
} }
override void updateFD(FD fd, EventMask mask) override void updateFD(FD fd, EventMask old_mask, EventMask new_mask)
{ {
//print("update %s %s", fd, mask); //print("update %s %s", fd, mask);
kevent_t ev; kevent_t ev;
ev.filter = 0; auto changes = old_mask ^ new_mask;
ev.flags |= EV_CLEAR;
if (mask & EventMask.read) ev.filter |= EVFILT_READ; if (changes & EventMask.read) {
if (mask & EventMask.write) ev.filter |= EVFILT_WRITE; ev.filter = EVFILT_READ;
ev.flags = EV_CLEAR | (new_mask & EventMask.read ? EV_ADD : EV_DELETE);
m_changes ~= ev;
}
if (changes & EventMask.write) {
ev.filter = EVFILT_WRITE;
ev.flags = EV_CLEAR | (new_mask & EventMask.write ? EV_ADD : EV_DELETE);
m_changes ~= ev;
}
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
m_changes ~= ev;
} }
} }

View file

@ -66,6 +66,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver; alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop;
else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop;
else alias WatcherDriver = PosixEventDriverWatchers!Loop; else alias WatcherDriver = PosixEventDriverWatchers!Loop;
Loop m_loop; Loop m_loop;
@ -265,7 +266,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect); m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
} else { } else {
m_loop.clearFD(sock); m_loop.clearFD(sock);
m_loop.unregisterFD(sock); m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
invalidateSocket(); invalidateSocket();
on_connect(sock, ConnectStatus.unknownError); on_connect(sock, ConnectStatus.unknownError);
return sock; return sock;
@ -416,7 +417,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
if (ret == 0) { if (ret == 0 && buffer.length > 0) {
print("disconnect"); print("disconnect");
on_read_finish(socket, IOStatus.disconnected, 0); on_read_finish(socket, IOStatus.disconnected, 0);
return; return;
@ -428,7 +429,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return; return;
} }
if (ret > 0) { if (ret > 0 || buffer.length == 0) {
buffer = buffer[ret .. $]; buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) { if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, ret); on_read_finish(socket, IOStatus.ok, ret);
@ -864,7 +865,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
{ {
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) { if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD(fd); m_loop.clearFD(fd);
closeSocket(fd); closeSocket(fd);
return false; return false;
@ -1301,7 +1302,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
version (linux) { version (linux) {
if (--getRC(descriptor) == 0) { if (--getRC(descriptor) == 0) {
destroy(); destroy();
m_loop.unregisterFD(descriptor); m_loop.unregisterFD(descriptor, EventMask.read);
m_loop.clearFD(descriptor); m_loop.clearFD(descriptor);
close(descriptor); close(descriptor);
return false; return false;
@ -1377,7 +1378,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) { if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd); m_loop.clearFD(fd);
close(fd); close(fd);
return false; return false;
@ -1479,7 +1480,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) { if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd); m_loop.clearFD(fd);
m_watches.remove(fd); m_watches.remove(fd);
/*errnoEnforce(*/close(fd)/* == 0)*/; /*errnoEnforce(*/close(fd)/* == 0)*/;
@ -1544,6 +1545,36 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
} }
} }
version (OSX)
final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{
/*FSEventStreamCreate
FSEventStreamScheduleWithRunLoop
FSEventStreamStart*/
assert(false, "TODO!");
}
final override void addRef(WatcherID descriptor)
{
assert(false, "TODO!");
}
final override bool releaseRef(WatcherID descriptor)
{
/*FSEventStreamStop
FSEventStreamUnscheduleFromRunLoop
FSEventStreamInvalidate
FSEventStreamRelease*/
assert(false, "TODO!");
}
}
final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private Loop m_loop; private Loop m_loop;
@ -1585,9 +1616,9 @@ package class PosixEventLoop {
/// Registers the FD for general notification reception. /// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask); protected abstract void registerFD(FD fd, EventMask mask);
/// Unregisters the FD for general notification reception. /// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd); protected abstract void unregisterFD(FD fd, EventMask mask);
/// Updates the event mask to use for listening for notifications. /// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask mask); protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask);
final protected void notify(EventType evt)(FD fd) final protected void notify(EventType evt)(FD fd)
{ {

View file

@ -76,11 +76,11 @@ final class SelectEventLoop : PosixEventLoop {
{ {
} }
override void unregisterFD(FD fd) override void unregisterFD(FD fd, EventMask mask)
{ {
} }
override void updateFD(FD fd, EventMask mask) override void updateFD(FD fd, EventMask old_mask, EventMask mask)
{ {
} }
} }