diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index feef972..babcf08 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -45,7 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver this(Events events, Signals signals) { m_events = events; - m_event = events.create(); + m_event = events.createInternal(); m_events.wait(m_event, &onDNSSignal); } @@ -70,6 +70,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver try t.executeInNewThread();//taskPool.put(t); catch (Exception e) return DNSLookupID.invalid; debug (EventCoreLogDNS) print("lookup handle: %s", handle); + m_events.loop.m_waiterCount++; return handle; } @@ -89,6 +90,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver override void cancelLookup(DNSLookupID handle) { m_lookups[handle].callback = null; + m_events.loop.m_waiterCount--; } private void onDNSSignal(EventID event) @@ -113,6 +115,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver l.result = null; l.retcode = 0; if (i == m_maxHandle) m_maxHandle = lastmax; + m_events.loop.m_waiterCount--; passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai); } else lastmax = i; } @@ -152,7 +155,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive { m_signals = signals; m_dnsSignal = () @trusted { return SIGRTMIN; } (); - m_sighandle = signals.listen(m_dnsSignal, &onDNSSignal); + m_sighandle = signals.listenInternal(m_dnsSignal, &onDNSSignal); } void dispose() @@ -177,6 +180,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive return DNSLookupID.invalid; m_lookups[handle].callback = on_lookup_finished; + m_events.loop.m_waiterCount++; return handle; } @@ -185,6 +189,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive { gai_cancel(&m_lookups[handle].ctx); m_lookups[handle].callback = null; + m_events.loop.m_waiterCount--; } private void onDNSSignal(SignalListenID, SignalStatus status, int signal) @@ -204,6 +209,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive auto ai = l.ctx.ar_result; l.callback = null; l.ctx.ar_result = null; + m_events.loop.m_waiterCount--; passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai); } } diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index bd881c5..73c2bd6 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -115,10 +115,10 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_loop = loop; m_timers = timers; m_events = events; - m_wakeupEvent = events.create(); + m_wakeupEvent = events.createInternal(); } - @property size_t waiterCount() const { return m_loop.m_waiterCount; } + @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } final override ExitReason processEvents(Duration timeout) { @@ -241,25 +241,34 @@ package class PosixEventLoop { assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), "Overwriting notification callback."); // ensure that the FD doesn't get closed before the callback gets called. - if (callback !is null) { - m_waiterCount++; - m_fds[fd.value].common.refCount++; - } else { - m_fds[fd.value].common.refCount--; - m_waiterCount--; + with (m_fds[fd.value]) { + if (callback !is null) { + if (!(common.flags & FDFlags.internal)) m_waiterCount++; + common.refCount++; + } else { + common.refCount--; + if (!(common.flags & FDFlags.internal)) m_waiterCount--; + } + common.callback[evt] = callback; } - m_fds[fd.value].common.callback[evt] = callback; } - package void initFD(FD fd) + package void initFD(FD fd, FDFlags flags) { - m_fds[fd.value].common.refCount = 1; + with (m_fds[fd.value]) { + common.refCount = 1; + common.flags = flags; + } } package void clearFD(FD fd) { if (m_fds[fd.value].common.userDataDestructor) () @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } (); + if (!(m_fds[fd.value].common.flags & FDFlags.internal)) + foreach (cb; m_fds[fd.value].common.callback) + if (cb !is null) + m_waiterCount--; m_fds[fd.value] = m_fds.FullField.init; } } @@ -272,6 +281,7 @@ alias FDSlotCallback = void delegate(FD); private struct FDSlot { FDSlotCallback[EventType.max+1] callback; uint refCount; + FDFlags flags; DataInitializer userDataDestructor; ubyte[16*size_t.sizeof] userData; @@ -285,6 +295,11 @@ private struct FDSlot { } } +enum FDFlags { + none = 0, + internal = 1<<0, +} + enum EventType { read, write, diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 02f95c8..25ed35e 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -32,23 +32,30 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS m_sockets = sockets; } + package @property Loop loop() { return m_loop; } + final override EventID create() + { + return createInternal(false); + } + + package(eventcore) EventID createInternal(bool is_internal = true) { version (linux) { auto eid = eventfd(0, EFD_NONBLOCK); if (eid == -1) return EventID.invalid; auto id = cast(EventID)eid; - m_loop.initFD(id); - m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation + m_loop.initFD(id, FDFlags.internal); + m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); return id; } else { auto addr = new InternetAddress(0x7F000001, 0); - auto s = m_sockets.createDatagramSocket(addr, addr); + auto s = m_sockets.createDatagramSocketInternal(addr, addr, true); if (s == DatagramSocketFD.invalid) return EventID.invalid; m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation + m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation return cast(EventID)s; } } @@ -60,12 +67,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); foreach (w; slot.waiters.consume) { //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; w(event); } } else { if (!slot.waiters.empty) { - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; slot.waiters.consumeOne()(event); } } @@ -85,7 +92,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void wait(EventID event, EventCallback on_event) { - m_loop.m_waiterCount++; + if (!isInternal(event)) m_loop.m_waiterCount++; getSlot(event).waiters.put(on_event); } @@ -94,7 +101,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; getSlot(event).waiters.removePending(on_event); } @@ -171,10 +178,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS { return m_loop.m_fds[id].common.refCount; } + + private bool isInternal(EventID id) + { + return getSlot(id).isInternal; + } } package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; shared bool triggerAll; + bool isInternal; } diff --git a/source/eventcore/drivers/posix/signals.d b/source/eventcore/drivers/posix/signals.d index f0464b9..aec99ce 100644 --- a/source/eventcore/drivers/posix/signals.d +++ b/source/eventcore/drivers/posix/signals.d @@ -19,6 +19,11 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna this(Loop loop) { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) + { + return listenInternal(sig, on_signal, false); + } + + package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true) { auto fd = () @trusted { sigset_t sset; @@ -32,7 +37,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna } (); - m_loop.initFD(cast(FD)fd); + m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none); m_loop.m_fds[fd].specific = SignalSlot(on_signal); m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); @@ -52,7 +57,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna { FD fd = cast(FD)descriptor; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference m_loop.unregisterFD(fd, EventMask.read); m_loop.clearFD(fd); close(cast(int)fd); @@ -89,6 +94,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals this(Loop loop) { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) + { + return listenInternal(sig, on_signal, false); + } + + package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true) { assert(false); } diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 2e416c3..9c7b6ce 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -62,7 +62,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return sock; } - m_loop.initFD(sock); + m_loop.initFD(sock, FDFlags.none); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[sock].specific = StreamSocketSlot.init; m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); @@ -98,7 +98,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (m_loop.m_fds[fd].common.refCount) // FD already in use? return StreamSocketFD.invalid; setSocketNonBlocking(fd); - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[fd].specific = StreamSocketSlot.init; return fd; @@ -168,7 +168,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (sock == StreamListenSocketFD.invalid) return sock; - m_loop.initFD(sock); + m_loop.initFD(sock, FDFlags.none); m_loop.m_fds[sock].specific = StreamListenSocketSlot.init; if (on_accept) waitForConnections(sock, on_accept); @@ -196,7 +196,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets setSocketNonBlocking(cast(SocketFD)sockfd); auto fd = cast(StreamSocketFD)sockfd; - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.m_fds[fd].specific = StreamSocketSlot.init; m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); @@ -502,6 +502,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address) + { + return createDatagramSocketInternal(bind_address, target_address, false); + } + + package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address, scope Address target_address, bool is_internal = true) { auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM); if (sockfd == -1) return DatagramSocketFD.invalid; @@ -533,7 +538,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - m_loop.initFD(sock); + m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none); m_loop.m_fds[sock].specific = DgramSocketSlot.init; m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); @@ -546,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (m_loop.m_fds[fd].common.refCount) // FD already in use? return DatagramSocketFD.init; setSocketNonBlocking(fd); - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[fd].specific = DgramSocketSlot.init; return fd; @@ -710,8 +715,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool releaseRef(SocketFD fd) { + import taggedalgebraic : hasType; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + // listening sockets have an incremented the reference count because of setNotifyCallback + int base_refcount = m_loop.m_fds[fd].specific.hasType!StreamListenSocketSlot ? 1 : 0; + if (--m_loop.m_fds[fd].common.refCount == base_refcount) { m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.clearFD(fd); closeSocket(cast(sock_t)fd); diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index fea70a4..385294c 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -39,7 +39,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch } } - m_loop.initFD(FD(handle)); + m_loop.initFD(FD(handle), FDFlags.none); m_loop.registerFD(FD(handle), EventMask.read); m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); m_loop.m_fds[handle].specific = WatcherSlot(callback); @@ -59,7 +59,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch { FD fd = cast(FD)descriptor; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count m_loop.unregisterFD(fd, EventMask.read); m_loop.clearFD(fd); m_watches.remove(descriptor); @@ -103,10 +103,10 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch ch.directory = m_watches[id][ev.wd]; ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.name = name; - addRef(id); - auto cb = m_loop.m_fds[ id].watcher.callback; + addRef(id); // assure that the id doesn't get invalidated until after the callback + auto cb = m_loop.m_fds[id].watcher.callback; cb(id, ch); - if (!releaseRef(id)) break; + if (!releaseRef(id)) return; rem = rem[inotify_event.sizeof + ev.len .. $]; } diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index aa16d4a..926aceb 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -99,6 +99,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil SmallIntegerSet!FileFD m_activeReads; SmallIntegerSet!FileFD m_activeWrites; EventID m_readyEvent; + bool m_waiting; Events m_events; } @@ -108,7 +109,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil { m_events = events; m_readyEvent = events.create(); - m_events.wait(m_readyEvent, &onReady); } void dispose() @@ -191,7 +191,9 @@ log("start task"); m_fileThreadPool.isDaemon = true; } m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + startWaiting(); } catch (Exception e) { + m_activeWrites.remove(file); on_write_finish(file, IOStatus.error, 0); return; } @@ -201,6 +203,7 @@ log("start task"); { auto f = &m_files[file].write; m_activeWrites.remove(file); + m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); assert(res, "Cancelling write when no write is in progress."); } @@ -217,7 +220,9 @@ log("start task"); m_fileThreadPool.isDaemon = true; } m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + startWaiting(); } catch (Exception e) { + m_activeReads.remove(file); on_read_finish(file, IOStatus.error, 0); return; } @@ -227,6 +232,7 @@ log("start task"); { auto f = &m_files[file].read; m_activeReads.remove(file); + m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); assert(res, "Cancelling read when no write is in progress."); } @@ -252,7 +258,7 @@ log("start task"); /// private static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) { -log("ready event"); +log("task fun"); IOInfo* f = mixin("&fd.m_files[file]."~op); log("wait for cancel"); @@ -318,7 +324,17 @@ log("ready event"); m_files[f].write.flush(f); } - m_events.wait(m_readyEvent, &onReady); + m_waiting = false; + startWaiting(); + } + + private void startWaiting() + { + if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) { + log("wait for ready"); + m_events.wait(m_readyEvent, &onReady); + m_waiting = true; + } } } diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index e21437f..f0d7b1a 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -30,6 +30,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { ms_allocator.parent = Mallocator.instance; } + package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; } + final package Duration getNextTimeout(long stdtime) @safe nothrow { if (m_timerQueue.length == 0) return Duration.max;