From c6dec730d8653d3ce921026f005ea06d493b9307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 12 Oct 2016 22:59:15 +0200 Subject: [PATCH] Split up PosixEventDriver into individual classes. --- source/eventcore/core.d | 4 +- source/eventcore/driver.d | 10 +- source/eventcore/drivers/epoll.d | 14 +- source/eventcore/drivers/posix.d | 384 ++++++++++++++++++------------ source/eventcore/drivers/select.d | 11 +- source/eventcore/drivers/timer.d | 12 +- 6 files changed, 249 insertions(+), 186 deletions(-) diff --git a/source/eventcore/core.d b/source/eventcore/core.d index 5f003d8..6c96ac0 100644 --- a/source/eventcore/core.d +++ b/source/eventcore/core.d @@ -5,9 +5,11 @@ public import eventcore.driver; import eventcore.drivers.epoll; import eventcore.drivers.libasync; import eventcore.drivers.select; +import eventcore.drivers.posix; version (Have_libasync) alias NativeEventDriver = LibasyncEventDriver; -else alias NativeEventDriver = SelectEventDriver; +else version (linux) alias NativeEventDriver = PosixEventDriver!EpollEventLoop; +else alias NativeEventDriver = PosixEventDriver!SelectEventLoop; @property EventDriver eventDriver() @safe @nogc nothrow { diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 8ec368b..554af06 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -8,19 +8,19 @@ import std.socket : Address; interface EventDriver { @safe: /*@nogc:*/ nothrow: @property EventDriverCore core(); - @property EventDriverFiles files(); - @property EventDriverSockets sockets(); @property EventDriverTimers timers(); @property EventDriverEvents events(); @property EventDriverSignals signals(); + @property EventDriverSockets sockets(); + @property EventDriverFiles files(); @property EventDriverWatchers watchers(); + + /// Releases all resources associated with the driver + void dispose(); } interface EventDriverCore { @safe: /*@nogc:*/ nothrow: - /// Releases all resources associated with the driver - void dispose(); - /** The number of pending callbacks. When this number drops to zero, the event loop can safely be quit. It is diff --git a/source/eventcore/drivers/epoll.d b/source/eventcore/drivers/epoll.d index 1f24acd..5babbe8 100644 --- a/source/eventcore/drivers/epoll.d +++ b/source/eventcore/drivers/epoll.d @@ -17,7 +17,9 @@ import core.sys.posix.sys.time : timeval; import core.sys.linux.epoll; -final class EpollEventDriver : PosixEventDriver { +final class EpollEventLoop : PosixEventLoop { +@safe: nothrow: + private { int m_epoll; epoll_event[] m_events; @@ -29,15 +31,6 @@ final class EpollEventDriver : PosixEventDriver { m_events.length = 100; } - nothrow @safe { - override @property EpollEventDriver core() { return this; } - override @property EpollEventDriver sockets() { return this; } - override @property EpollEventDriver timers() { return this; } - override @property EpollEventDriver events() { return this; } - override @property EpollEventDriver signals() { return this; } - override @property EpollEventDriver watchers() { return this; } - } - override bool doProcessEvents(Duration timeout) @trusted { import std.algorithm : min; @@ -66,7 +59,6 @@ final class EpollEventDriver : PosixEventDriver { override void dispose() { import core.sys.posix.unistd : close; - super.dispose(); close(m_epoll); } diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 16ed970..8b2eb82 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -37,48 +37,87 @@ private long currStdTime() return Clock.currStdTime; } -abstract class PosixEventDriver : EventDriver, - EventDriverCore, EventDriverSockets, EventDriverTimers, - EventDriverEvents, EventDriverSignals, EventDriverWatchers -{ +final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { @safe: /*@nogc:*/ nothrow: private { - ChoppedVector!FDSlot m_fds; - size_t m_waiterCount = 0; - bool m_exit = false; - FD m_wakeupEvent; - ThreadedFileEventDriver!PosixEventDriver m_files; + alias CoreDriver = PosixEventDriverCore!(Loop, LoopTimeoutTimerDriver); + alias EventsDriver = PosixEventDriverEvents!Loop; + alias SignalsDriver = PosixEventDriverSignals!Loop; + alias TimerDriver = LoopTimeoutTimerDriver; + alias SocketsDriver = PosixEventDriverSockets!Loop; + alias FileDriver = ThreadedFileEventDriver!EventsDriver; + alias WatcherDriver = PosixEventDriverWatchers!Loop; + + Loop m_loop; + CoreDriver m_core; + EventsDriver m_events; + SignalsDriver m_signals; + LoopTimeoutTimerDriver m_timers; + SocketsDriver m_sockets; + FileDriver m_files; + WatcherDriver m_watchers; } - protected this() + this() { - m_wakeupEvent = eventfd(0, EFD_NONBLOCK); - initFD(m_wakeupEvent); - registerFD(m_wakeupEvent, EventMask.read); - //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD - m_files = new ThreadedFileEventDriver!PosixEventDriver(this); + m_loop = new Loop; + m_events = new EventsDriver(m_loop); + m_signals = new SignalsDriver(m_loop); + m_timers = new TimerDriver; + m_core = new CoreDriver(m_loop, m_timers); + m_sockets = new SocketsDriver(m_loop); + m_files = new FileDriver(m_events); + m_watchers = new WatcherDriver(m_loop); } // force overriding these in the (final) sub classes to avoid virtual calls - abstract override @property PosixEventDriver core(); - final override @property ThreadedFileEventDriver!PosixEventDriver files() { return m_files; } - abstract override @property PosixEventDriver sockets(); - abstract override @property PosixEventDriver timers(); - abstract override @property PosixEventDriver events(); - abstract override @property PosixEventDriver signals(); - abstract override @property PosixEventDriver watchers(); + final override @property CoreDriver core() { return m_core; } + final override @property EventsDriver events() { return m_events; } + final override @property SignalsDriver signals() { return m_signals; } + final override @property TimerDriver timers() { return m_timers; } + final override @property SocketsDriver sockets() { return m_sockets; } + final override @property FileDriver files() { return m_files; } + final override @property WatcherDriver watchers() { return m_watchers; } - mixin DefaultTimerImpl!(); + final override void dispose() + { + m_files.dispose(); + m_loop.dispose(); + } +} - protected int maxFD() const { return cast(int)m_fds.length; } - @property size_t waiterCount() const { return m_waiterCount; } +final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers) : EventDriverCore { +@safe: nothrow: + import core.time : Duration; + + protected alias ExtraEventsCallback = bool delegate(long); + + private { + Loop m_loop; + Timers m_timers; + bool m_exit = false; + FD m_wakeupEvent; + } + + protected this(Loop loop, Timers timers) + { + m_loop = loop; + m_timers = timers; + + m_wakeupEvent = eventfd(0, EFD_NONBLOCK); + m_loop.initFD(m_wakeupEvent); + m_loop.registerFD(m_wakeupEvent, EventMask.read); + //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD + } + + @property size_t waiterCount() const { return m_loop.m_waiterCount; } final override ExitReason processEvents(Duration timeout) { import std.algorithm : min; - import core.time : seconds; + import core.time : hnsecs, seconds; if (m_exit) { m_exit = false; @@ -90,16 +129,16 @@ abstract class PosixEventDriver : EventDriver, bool got_events; if (timeout <= 0.seconds) { - got_events = doProcessEvents(0.seconds); - processTimers(currStdTime); + got_events = m_loop.doProcessEvents(0.seconds); + m_timers.process(currStdTime); } else { long now = currStdTime; do { - auto nextto = min(getNextTimeout(now), timeout); - got_events = doProcessEvents(nextto); + auto nextto = min(m_timers.getNextTimeout(now), timeout); + got_events = m_loop.doProcessEvents(nextto); long prev_step = now; now = currStdTime; - got_events |= processTimers(now); + got_events |= m_timers.process(now); if (timeout != Duration.max) timeout -= (now - prev_step).hnsecs; } while (timeout > 0.seconds && !m_exit && !got_events); @@ -126,12 +165,27 @@ abstract class PosixEventDriver : EventDriver, m_exit = false; } - protected abstract bool doProcessEvents(Duration dur); - - abstract void dispose() - { - m_files.dispose(); + final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + FDSlot* fds = &m_loop.m_fds[descriptor]; + assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= FDSlot.userData.length, "Requested user data is too large."); + if (size > FDSlot.userData.length) assert(false); + if (!fds.userDataDestructor) { + initialize(fds.userData.ptr); + fds.userDataDestructor = destroy; + } + return m_loop.m_fds[descriptor].userData.ptr; } +} + + +final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { +@safe: /*@nogc:*/ nothrow: + private Loop m_loop; + + this(Loop loop) { m_loop = loop; } final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) { @@ -153,9 +207,8 @@ abstract class PosixEventDriver : EventDriver, return sock; } - registerFD(sock, EventMask.read|EventMask.write|EventMask.status); - - initFD(sock); + m_loop.initFD(sock); + m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } (); if (ret == 0) { @@ -163,13 +216,13 @@ abstract class PosixEventDriver : EventDriver, } else { auto err = getSocketError(); if (err == EINPROGRESS) { - with (m_fds[sock]) { + with (m_loop.m_fds[sock]) { connectCallback = on_connect; } - startNotify!(EventType.write)(sock, &onConnect); + m_loop.startNotify!(EventType.write)(sock, &onConnect); } else { - clearFD(sock); - unregisterFD(sock); + m_loop.clearFD(sock); + m_loop.unregisterFD(sock); invalidateSocket(); on_connect(sock, ConnectStatus.unknownError); return sock; @@ -181,14 +234,14 @@ abstract class PosixEventDriver : EventDriver, private void onConnect(FD sock) { - setNotifyCallback!(EventType.write)(sock, null); - m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); + m_loop.setNotifyCallback!(EventType.write)(sock, null); + m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); } private void onConnectError(FD sock) { // FIXME: determine the correct kind of error! - m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); + m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); } final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) @@ -216,7 +269,7 @@ abstract class PosixEventDriver : EventDriver, if (sock == StreamListenSocketFD.invalid) return sock; - initFD(sock); + m_loop.initFD(sock); if (on_accept) waitForConnections(sock, on_accept); @@ -226,9 +279,9 @@ abstract class PosixEventDriver : EventDriver, final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { log("wait for conn"); - registerFD(sock, EventMask.read); - m_fds[sock].acceptCallback = on_accept; - startNotify!(EventType.read)(sock, &onAccept); + m_loop.registerFD(sock, EventMask.read); + m_loop.m_fds[sock].acceptCallback = on_accept; + m_loop.startNotify!(EventType.read)(sock, &onAccept); onAccept(sock); } @@ -244,10 +297,10 @@ abstract class PosixEventDriver : EventDriver, setSocketNonBlocking(cast(SocketFD)sockfd); auto fd = cast(StreamSocketFD)sockfd; - registerFD(fd, EventMask.read|EventMask.write|EventMask.status); - initFD(fd); + m_loop.initFD(fd); + m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); //print("accept %d", sockfd); - m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); + m_loop.m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); } } @@ -299,33 +352,33 @@ abstract class PosixEventDriver : EventDriver, } } - with (m_fds[socket]) { + with (m_loop.m_fds[socket]) { readCallback = on_read_finish; readMode = mode; bytesRead = ret > 0 ? ret : 0; readBuffer = buffer; } - setNotifyCallback!(EventType.read)(socket, &onSocketRead); + m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketRead); } override void cancelRead(StreamSocketFD socket) { - assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); - setNotifyCallback!(EventType.read)(socket, null); - with (m_fds[socket]) { + assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + m_loop.setNotifyCallback!(EventType.read)(socket, null); + with (m_loop.m_fds[socket]) { readBuffer = null; } } private void onSocketRead(FD fd) { - auto slot = () @trusted { return &m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto socket = cast(StreamSocketFD)fd; void finalize()(IOStatus status) { - setNotifyCallback!(EventType.read)(socket, null); + m_loop.setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, slot.bytesRead); } @@ -394,26 +447,26 @@ abstract class PosixEventDriver : EventDriver, } } - with (m_fds[socket]) { + with (m_loop.m_fds[socket]) { writeCallback = on_write_finish; writeMode = mode; bytesWritten = ret > 0 ? ret : 0; writeBuffer = buffer; } - setNotifyCallback!(EventType.write)(socket, &onSocketWrite); + m_loop.setNotifyCallback!(EventType.write)(socket, &onSocketWrite); } override void cancelWrite(StreamSocketFD socket) { - assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); - setNotifyCallback!(EventType.write)(socket, null); - m_fds[socket].writeBuffer = null; + assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); + m_loop.setNotifyCallback!(EventType.write)(socket, null); + m_loop.m_fds[socket].writeBuffer = null; } private void onSocketWrite(FD fd) { - auto slot = () @trusted { return &m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto socket = cast(StreamSocketFD)fd; sizediff_t ret; @@ -422,14 +475,14 @@ abstract class PosixEventDriver : EventDriver, if (ret < 0) { auto err = getSocketError(); if (err != EAGAIN) { - setNotifyCallback!(EventType.write)(socket, null); + m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(socket, IOStatus.error, slot.bytesRead); return; } } if (ret == 0) { - setNotifyCallback!(EventType.write)(socket, null); + m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten); return; } @@ -438,7 +491,7 @@ abstract class PosixEventDriver : EventDriver, slot.bytesWritten += ret; slot.writeBuffer = slot.writeBuffer[ret .. $]; if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { - setNotifyCallback!(EventType.write)(socket, null); + m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); return; } @@ -471,24 +524,24 @@ abstract class PosixEventDriver : EventDriver, return; } - with (m_fds[socket]) { + with (m_loop.m_fds[socket]) { readCallback = on_data_available; readMode = IOMode.once; bytesRead = 0; readBuffer = null; } - setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable); + m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable); } private void onSocketDataAvailable(FD fd) { - auto slot = () @trusted { return &m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto socket = cast(StreamSocketFD)fd; void finalize()(IOStatus status) { - setNotifyCallback!(EventType.read)(socket, null); + m_loop.setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, 0); } @@ -522,9 +575,8 @@ abstract class PosixEventDriver : EventDriver, return DatagramSocketFD.init; } - registerFD(sock, EventMask.read|EventMask.write|EventMask.status); - - initFD(sock); + m_loop.initFD(sock); + m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); return sock; } @@ -551,14 +603,14 @@ abstract class PosixEventDriver : EventDriver, if (mode == IOMode.immediate) { on_receive_finish(socket, IOStatus.wouldBlock, 0, null); } else { - with (m_fds[socket]) { + with (m_loop.m_fds[socket]) { readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } (); readMode = mode; bytesRead = 0; readBuffer = buffer; } - setNotifyCallback!(EventType.read)(socket, &onDgramRead); + m_loop.setNotifyCallback!(EventType.read)(socket, &onDgramRead); } return; } @@ -568,14 +620,14 @@ abstract class PosixEventDriver : EventDriver, void cancelReceive(DatagramSocketFD socket) { - assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); - setNotifyCallback!(EventType.read)(socket, null); - m_fds[socket].readBuffer = null; + assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + m_loop.setNotifyCallback!(EventType.read)(socket, null); + m_loop.m_fds[socket].readBuffer = null; } private void onDgramRead(FD fd) @trusted { // DMD 2.072.0-b2: scope considered unsafe - auto slot = () @trusted { return &m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto socket = cast(DatagramSocketFD)fd; sizediff_t ret; @@ -586,13 +638,13 @@ abstract class PosixEventDriver : EventDriver, if (ret < 0) { auto err = getSocketError(); if (err != EAGAIN) { - setNotifyCallback!(EventType.read)(socket, null); + m_loop.setNotifyCallback!(EventType.read)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null); return; } } - setNotifyCallback!(EventType.read)(socket, null); + m_loop.setNotifyCallback!(EventType.read)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addr); } @@ -603,7 +655,7 @@ abstract class PosixEventDriver : EventDriver, sizediff_t ret; if (target_address) { () @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } (); - m_fds[socket].targetAddr = target_address; + m_loop.m_fds[socket].targetAddr = target_address; } else { () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); } @@ -619,14 +671,14 @@ abstract class PosixEventDriver : EventDriver, if (mode == IOMode.immediate) { on_send_finish(socket, IOStatus.wouldBlock, 0, null); } else { - with (m_fds[socket]) { + with (m_loop.m_fds[socket]) { writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } (); writeMode = mode; bytesWritten = 0; writeBuffer = buffer; } - setNotifyCallback!(EventType.write)(socket, &onDgramWrite); + m_loop.setNotifyCallback!(EventType.write)(socket, &onDgramWrite); } return; } @@ -636,14 +688,14 @@ abstract class PosixEventDriver : EventDriver, void cancelSend(DatagramSocketFD socket) { - assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); - setNotifyCallback!(EventType.write)(socket, null); - m_fds[socket].writeBuffer = null; + assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); + m_loop.setNotifyCallback!(EventType.write)(socket, null); + m_loop.m_fds[socket].writeBuffer = null; } private void onDgramWrite(FD fd) { - auto slot = () @trusted { return &m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto socket = cast(DatagramSocketFD)fd; sizediff_t ret; @@ -656,74 +708,91 @@ abstract class PosixEventDriver : EventDriver, if (ret < 0) { auto err = getSocketError(); if (err != EAGAIN) { - setNotifyCallback!(EventType.write)(socket, null); + m_loop.setNotifyCallback!(EventType.write)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); return; } } - setNotifyCallback!(EventType.write)(socket, null); + m_loop.setNotifyCallback!(EventType.write)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); } final override void addRef(SocketFD fd) { - auto pfd = () @trusted { return &m_fds[fd]; } (); + auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD."); - m_fds[fd].refCount++; + m_loop.m_fds[fd].refCount++; } final override void releaseRef(SocketFD fd) { - auto pfd = () @trusted { return &m_fds[fd]; } (); + auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_fds[fd].refCount == 0) { - unregisterFD(fd); - clearFD(fd); + if (--m_loop.m_fds[fd].refCount == 0) { + m_loop.unregisterFD(fd); + m_loop.clearFD(fd); closeSocket(fd); } } + private SocketFD createSocket(AddressFamily family, int type) + { + int sock; + () @trusted { sock = socket(family, type, 0); } (); + if (sock == -1) return SocketFD.invalid; + setSocketNonBlocking(cast(SocketFD)sock); + return cast(SocketFD)sock; + } +} + + +final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { +@safe: /*@nogc:*/ nothrow: + private Loop m_loop; + + this(Loop loop) { m_loop = loop; } + final override EventID create() { auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); - initFD(id); - m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation - registerFD(id, EventMask.read); - startNotify!(EventType.read)(id, &onEvent); + m_loop.initFD(id); + m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation + m_loop.registerFD(id, EventMask.read); + m_loop.startNotify!(EventType.read)(id, &onEvent); return id; } final override void trigger(EventID event, bool notify_all = true) { - assert(event < m_fds.length, "Invalid event ID passed to triggerEvent."); + assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent."); if (notify_all) { //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); - foreach (w; m_fds[event].waiters.consume) { + foreach (w; m_loop.m_fds[event].waiters.consume) { //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); w(event); } } else { - if (!m_fds[event].waiters.empty) - m_fds[event].waiters.consumeOne(); + if (!m_loop.m_fds[event].waiters.empty) + m_loop.m_fds[event].waiters.consumeOne(); } } final override void trigger(EventID event, bool notify_all = true) shared @trusted { import core.atomic : atomicStore; - auto thisus = cast(PosixEventDriver)this; - assert(event < thisus.m_fds.length, "Invalid event ID passed to shared triggerEvent."); + auto thisus = cast(PosixEventDriverEvents)this; + assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); long one = 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true); + if (notify_all) atomicStore(thisus.m_loop.m_fds[event].triggerAll, true); () @trusted { .write(event, &one, one.sizeof); } (); } final override void wait(EventID event, EventCallback on_event) { - assert(event < m_fds.length, "Invalid event ID passed to waitForEvent."); - return m_fds[event].waiters.put(on_event); + assert(event < m_loop.m_fds.length, "Invalid event ID passed to waitForEvent."); + return m_loop.m_fds[event].waiters.put(on_event); } final override void cancelWait(EventID event, EventCallback on_event) @@ -731,7 +800,7 @@ abstract class PosixEventDriver : EventDriver, import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; - m_fds[event].waiters.removePending(on_event); + m_loop.m_fds[event].waiters.removePending(on_event); } private void onEvent(FD event) @@ -739,26 +808,32 @@ abstract class PosixEventDriver : EventDriver, ulong cnt; () @trusted { .read(event, &cnt, cnt.sizeof); } (); import core.atomic : cas; - auto all = cas(&m_fds[event].triggerAll, true, false); + auto all = cas(&m_loop.m_fds[event].triggerAll, true, false); trigger(cast(EventID)event, all); } final override void addRef(EventID descriptor) { - assert(m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); - m_fds[descriptor].refCount++; + assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); + m_loop.m_fds[descriptor].refCount++; } final override void releaseRef(EventID descriptor) { - assert(m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_fds[descriptor].refCount == 0) { - unregisterFD(descriptor); - clearFD(descriptor); + assert(m_loop.m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD."); + if (--m_loop.m_fds[descriptor].refCount == 0) { + m_loop.unregisterFD(descriptor); + m_loop.clearFD(descriptor); close(descriptor); } } - +} + +final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals { +@safe: /*@nogc:*/ nothrow: + private Loop m_loop; + + this(Loop loop) { m_loop = loop; } final override void wait(int sig, SignalCallback on_signal) { @@ -769,6 +844,13 @@ abstract class PosixEventDriver : EventDriver, { assert(false, "TODO!"); } +} + +final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { +@safe: /*@nogc:*/ nothrow: + private Loop m_loop; + + this(Loop loop) { m_loop = loop; } final override WatcherID watchDirectory(string path, bool recursive) { @@ -794,21 +876,23 @@ abstract class PosixEventDriver : EventDriver, { assert(false, "TODO!"); } +} - final override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - FDSlot* fds = &m_fds[descriptor]; - assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, - "Requesting user data with differing type (destructor)."); - assert(size <= FDSlot.userData.length, "Requested user data is too large."); - if (size > FDSlot.userData.length) assert(false); - if (!fds.userDataDestructor) { - initialize(fds.userData.ptr); - fds.userDataDestructor = destroy; - } - return m_fds[descriptor].userData.ptr; + +package class PosixEventLoop { +@safe: nothrow: + import core.time : Duration; + + package { + ChoppedVector!FDSlot m_fds; + size_t m_waiterCount = 0; } + protected @property int maxFD() const { return cast(int)m_fds.length; } + + protected abstract void dispose(); + + protected abstract bool doProcessEvents(Duration dur); /// Registers the FD for general notification reception. protected abstract void registerFD(FD fd, EventMask mask); @@ -817,6 +901,13 @@ abstract class PosixEventDriver : EventDriver, /// Updates the event mask to use for listening for notifications. protected abstract void updateFD(FD fd, EventMask mask); + final protected void notify(EventType evt)(FD fd) + { + //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); + if (m_fds[fd].callback[evt]) + m_fds[fd].callback[evt](fd); + } + final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) { // TODO: optimize! @@ -825,14 +916,7 @@ abstract class PosixEventDriver : EventDriver, del(cast(FD)i); } - final protected void notify(EventType evt)(FD fd) - { - //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); - if (m_fds[fd].callback[evt]) - m_fds[fd].callback[evt](fd); - } - - private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) + package void startNotify(EventType evt)(FD fd, FDSlotCallback callback) { //log("start notify %s %s", evt, fd); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); @@ -840,7 +924,7 @@ abstract class PosixEventDriver : EventDriver, updateFD(fd, m_fds[fd].eventMask); } - private void stopNotify(EventType evt)(FD fd) + package void stopNotify(EventType evt)(FD fd) { //log("stop notify %s %s", evt, fd); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); @@ -848,7 +932,7 @@ abstract class PosixEventDriver : EventDriver, updateFD(fd, m_fds[fd].eventMask); } - private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) + package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) { assert((callback !is null) != (m_fds[fd].callback[evt] !is null), "Overwriting notification callback."); @@ -863,21 +947,12 @@ abstract class PosixEventDriver : EventDriver, m_fds[fd].callback[evt] = callback; } - private SocketFD createSocket(AddressFamily family, int type) - { - int sock; - () @trusted { sock = socket(family, type, 0); } (); - if (sock == -1) return SocketFD.invalid; - setSocketNonBlocking(cast(SocketFD)sock); - return cast(SocketFD)sock; - } - - private void initFD(FD fd) + package void initFD(FD fd) { m_fds[fd].refCount = 1; } - private void clearFD(FD fd) + package void clearFD(FD fd) { if (m_fds[fd].userDataDestructor) () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); @@ -889,6 +964,7 @@ abstract class PosixEventDriver : EventDriver, } } + alias FDEnumerateCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD); diff --git a/source/eventcore/drivers/select.d b/source/eventcore/drivers/select.d index 5c6e7b1..13f95db 100644 --- a/source/eventcore/drivers/select.d +++ b/source/eventcore/drivers/select.d @@ -23,14 +23,8 @@ version (Windows) { } -final class SelectEventDriver : PosixEventDriver { - override @property SelectEventDriver core() { return this; } - override @property SelectEventDriver sockets() { return this; } - override @property SelectEventDriver timers() { return this; } - override @property SelectEventDriver events() { return this; } - override @property SelectEventDriver signals() { return this; } - override @property SelectEventDriver watchers() { return this; } - +final class SelectEventLoop : PosixEventLoop { +@safe: nothrow: override bool doProcessEvents(Duration timeout) { //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); @@ -74,7 +68,6 @@ final class SelectEventDriver : PosixEventDriver { override void dispose() { - super.dispose(); } override void registerFD(FD fd, EventMask mask) diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index 2f4820f..4992f5d 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -6,7 +6,7 @@ module eventcore.drivers.timer; import eventcore.driver; -mixin template DefaultTimerImpl() { +final class LoopTimeoutTimerDriver : EventDriverTimers { import std.experimental.allocator.building_blocks.free_list; import std.experimental.allocator.building_blocks.region; import std.experimental.allocator.mallocator; @@ -29,12 +29,12 @@ mixin template DefaultTimerImpl() { ms_allocator.parent = Mallocator.instance; } - final protected Duration getNextTimeout(long stdtime) - { + final package Duration getNextTimeout(long stdtime) + @safe nothrow { return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max; } - final protected bool processTimers(long stdtime) + final package bool process(long stdtime) @trusted nothrow { assert(m_firedTimers.length == 0); if (m_timerQueue.empty) return false; @@ -48,7 +48,7 @@ mixin template DefaultTimerImpl() { while (tm.timeout <= stdtime); auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); try m_timerQueue.insertBefore(tail.release, tm); - catch (Exception e) { print("Failed to insert timer: %s", e.msg); } + catch (Exception e) { assert(false, e.msg); } } else tm.pending = false; m_firedTimers ~= tm; } @@ -94,7 +94,7 @@ mixin template DefaultTimerImpl() { auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); try m_timerQueue.insertBefore(largerRange.release, tm); - catch (Exception e) { print("Failed to insert timer: %s", e.msg); } + catch (Exception e) { assert(false, e.msg); } } final override void stop(TimerID timer)