diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index feef972..babcf08 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -45,7 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver this(Events events, Signals signals) { m_events = events; - m_event = events.create(); + m_event = events.createInternal(); m_events.wait(m_event, &onDNSSignal); } @@ -70,6 +70,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver try t.executeInNewThread();//taskPool.put(t); catch (Exception e) return DNSLookupID.invalid; debug (EventCoreLogDNS) print("lookup handle: %s", handle); + m_events.loop.m_waiterCount++; return handle; } @@ -89,6 +90,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver override void cancelLookup(DNSLookupID handle) { m_lookups[handle].callback = null; + m_events.loop.m_waiterCount--; } private void onDNSSignal(EventID event) @@ -113,6 +115,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver l.result = null; l.retcode = 0; if (i == m_maxHandle) m_maxHandle = lastmax; + m_events.loop.m_waiterCount--; passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai); } else lastmax = i; } @@ -152,7 +155,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive { m_signals = signals; m_dnsSignal = () @trusted { return SIGRTMIN; } (); - m_sighandle = signals.listen(m_dnsSignal, &onDNSSignal); + m_sighandle = signals.listenInternal(m_dnsSignal, &onDNSSignal); } void dispose() @@ -177,6 +180,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive return DNSLookupID.invalid; m_lookups[handle].callback = on_lookup_finished; + m_events.loop.m_waiterCount++; return handle; } @@ -185,6 +189,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive { gai_cancel(&m_lookups[handle].ctx); m_lookups[handle].callback = null; + m_events.loop.m_waiterCount--; } private void onDNSSignal(SignalListenID, SignalStatus status, int signal) @@ -204,6 +209,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive auto ai = l.ctx.ar_result; l.callback = null; l.ctx.ar_result = null; + m_events.loop.m_waiterCount--; passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai); } } diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index bd881c5..73c2bd6 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -115,10 +115,10 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_loop = loop; m_timers = timers; m_events = events; - m_wakeupEvent = events.create(); + m_wakeupEvent = events.createInternal(); } - @property size_t waiterCount() const { return m_loop.m_waiterCount; } + @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } final override ExitReason processEvents(Duration timeout) { @@ -241,25 +241,34 @@ package class PosixEventLoop { assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), "Overwriting notification callback."); // ensure that the FD doesn't get closed before the callback gets called. - if (callback !is null) { - m_waiterCount++; - m_fds[fd.value].common.refCount++; - } else { - m_fds[fd.value].common.refCount--; - m_waiterCount--; + with (m_fds[fd.value]) { + if (callback !is null) { + if (!(common.flags & FDFlags.internal)) m_waiterCount++; + common.refCount++; + } else { + common.refCount--; + if (!(common.flags & FDFlags.internal)) m_waiterCount--; + } + common.callback[evt] = callback; } - m_fds[fd.value].common.callback[evt] = callback; } - package void initFD(FD fd) + package void initFD(FD fd, FDFlags flags) { - m_fds[fd.value].common.refCount = 1; + with (m_fds[fd.value]) { + common.refCount = 1; + common.flags = flags; + } } package void clearFD(FD fd) { if (m_fds[fd.value].common.userDataDestructor) () @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } (); + if (!(m_fds[fd.value].common.flags & FDFlags.internal)) + foreach (cb; m_fds[fd.value].common.callback) + if (cb !is null) + m_waiterCount--; m_fds[fd.value] = m_fds.FullField.init; } } @@ -272,6 +281,7 @@ alias FDSlotCallback = void delegate(FD); private struct FDSlot { FDSlotCallback[EventType.max+1] callback; uint refCount; + FDFlags flags; DataInitializer userDataDestructor; ubyte[16*size_t.sizeof] userData; @@ -285,6 +295,11 @@ private struct FDSlot { } } +enum FDFlags { + none = 0, + internal = 1<<0, +} + enum EventType { read, write, diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 02f95c8..25ed35e 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -32,23 +32,30 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS m_sockets = sockets; } + package @property Loop loop() { return m_loop; } + final override EventID create() + { + return createInternal(false); + } + + package(eventcore) EventID createInternal(bool is_internal = true) { version (linux) { auto eid = eventfd(0, EFD_NONBLOCK); if (eid == -1) return EventID.invalid; auto id = cast(EventID)eid; - m_loop.initFD(id); - m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation + m_loop.initFD(id, FDFlags.internal); + m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); return id; } else { auto addr = new InternetAddress(0x7F000001, 0); - auto s = m_sockets.createDatagramSocket(addr, addr); + auto s = m_sockets.createDatagramSocketInternal(addr, addr, true); if (s == DatagramSocketFD.invalid) return EventID.invalid; m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation + m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation return cast(EventID)s; } } @@ -60,12 +67,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); foreach (w; slot.waiters.consume) { //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; w(event); } } else { if (!slot.waiters.empty) { - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; slot.waiters.consumeOne()(event); } } @@ -85,7 +92,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void wait(EventID event, EventCallback on_event) { - m_loop.m_waiterCount++; + if (!isInternal(event)) m_loop.m_waiterCount++; getSlot(event).waiters.put(on_event); } @@ -94,7 +101,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; - m_loop.m_waiterCount--; + if (!isInternal(event)) m_loop.m_waiterCount--; getSlot(event).waiters.removePending(on_event); } @@ -171,10 +178,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS { return m_loop.m_fds[id].common.refCount; } + + private bool isInternal(EventID id) + { + return getSlot(id).isInternal; + } } package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; shared bool triggerAll; + bool isInternal; } diff --git a/source/eventcore/drivers/posix/signals.d b/source/eventcore/drivers/posix/signals.d index f0464b9..aec99ce 100644 --- a/source/eventcore/drivers/posix/signals.d +++ b/source/eventcore/drivers/posix/signals.d @@ -19,6 +19,11 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna this(Loop loop) { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) + { + return listenInternal(sig, on_signal, false); + } + + package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true) { auto fd = () @trusted { sigset_t sset; @@ -32,7 +37,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna } (); - m_loop.initFD(cast(FD)fd); + m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none); m_loop.m_fds[fd].specific = SignalSlot(on_signal); m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); @@ -52,7 +57,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna { FD fd = cast(FD)descriptor; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference m_loop.unregisterFD(fd, EventMask.read); m_loop.clearFD(fd); close(cast(int)fd); @@ -89,6 +94,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals this(Loop loop) { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) + { + return listenInternal(sig, on_signal, false); + } + + package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true) { assert(false); } diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 2e416c3..9c7b6ce 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -62,7 +62,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return sock; } - m_loop.initFD(sock); + m_loop.initFD(sock, FDFlags.none); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[sock].specific = StreamSocketSlot.init; m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); @@ -98,7 +98,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (m_loop.m_fds[fd].common.refCount) // FD already in use? return StreamSocketFD.invalid; setSocketNonBlocking(fd); - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[fd].specific = StreamSocketSlot.init; return fd; @@ -168,7 +168,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (sock == StreamListenSocketFD.invalid) return sock; - m_loop.initFD(sock); + m_loop.initFD(sock, FDFlags.none); m_loop.m_fds[sock].specific = StreamListenSocketSlot.init; if (on_accept) waitForConnections(sock, on_accept); @@ -196,7 +196,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets setSocketNonBlocking(cast(SocketFD)sockfd); auto fd = cast(StreamSocketFD)sockfd; - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.m_fds[fd].specific = StreamSocketSlot.init; m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); @@ -502,6 +502,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address) + { + return createDatagramSocketInternal(bind_address, target_address, false); + } + + package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address, scope Address target_address, bool is_internal = true) { auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM); if (sockfd == -1) return DatagramSocketFD.invalid; @@ -533,7 +538,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - m_loop.initFD(sock); + m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none); m_loop.m_fds[sock].specific = DgramSocketSlot.init; m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); @@ -546,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (m_loop.m_fds[fd].common.refCount) // FD already in use? return DatagramSocketFD.init; setSocketNonBlocking(fd); - m_loop.initFD(fd); + m_loop.initFD(fd, FDFlags.none); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[fd].specific = DgramSocketSlot.init; return fd; @@ -710,8 +715,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool releaseRef(SocketFD fd) { + import taggedalgebraic : hasType; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + // listening sockets have an incremented the reference count because of setNotifyCallback + int base_refcount = m_loop.m_fds[fd].specific.hasType!StreamListenSocketSlot ? 1 : 0; + if (--m_loop.m_fds[fd].common.refCount == base_refcount) { m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.clearFD(fd); closeSocket(cast(sock_t)fd); diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index fea70a4..385294c 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -39,7 +39,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch } } - m_loop.initFD(FD(handle)); + m_loop.initFD(FD(handle), FDFlags.none); m_loop.registerFD(FD(handle), EventMask.read); m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); m_loop.m_fds[handle].specific = WatcherSlot(callback); @@ -59,7 +59,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch { FD fd = cast(FD)descriptor; assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].common.refCount == 0) { + if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count m_loop.unregisterFD(fd, EventMask.read); m_loop.clearFD(fd); m_watches.remove(descriptor); @@ -103,10 +103,10 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch ch.directory = m_watches[id][ev.wd]; ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.name = name; - addRef(id); - auto cb = m_loop.m_fds[ id].watcher.callback; + addRef(id); // assure that the id doesn't get invalidated until after the callback + auto cb = m_loop.m_fds[id].watcher.callback; cb(id, ch); - if (!releaseRef(id)) break; + if (!releaseRef(id)) return; rem = rem[inotify_event.sizeof + ev.len .. $]; } diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index aa16d4a..926aceb 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -99,6 +99,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil SmallIntegerSet!FileFD m_activeReads; SmallIntegerSet!FileFD m_activeWrites; EventID m_readyEvent; + bool m_waiting; Events m_events; } @@ -108,7 +109,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil { m_events = events; m_readyEvent = events.create(); - m_events.wait(m_readyEvent, &onReady); } void dispose() @@ -191,7 +191,9 @@ log("start task"); m_fileThreadPool.isDaemon = true; } m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + startWaiting(); } catch (Exception e) { + m_activeWrites.remove(file); on_write_finish(file, IOStatus.error, 0); return; } @@ -201,6 +203,7 @@ log("start task"); { auto f = &m_files[file].write; m_activeWrites.remove(file); + m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); assert(res, "Cancelling write when no write is in progress."); } @@ -217,7 +220,9 @@ log("start task"); m_fileThreadPool.isDaemon = true; } m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + startWaiting(); } catch (Exception e) { + m_activeReads.remove(file); on_read_finish(file, IOStatus.error, 0); return; } @@ -227,6 +232,7 @@ log("start task"); { auto f = &m_files[file].read; m_activeReads.remove(file); + m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); assert(res, "Cancelling read when no write is in progress."); } @@ -252,7 +258,7 @@ log("start task"); /// private static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) { -log("ready event"); +log("task fun"); IOInfo* f = mixin("&fd.m_files[file]."~op); log("wait for cancel"); @@ -318,7 +324,17 @@ log("ready event"); m_files[f].write.flush(f); } - m_events.wait(m_readyEvent, &onReady); + m_waiting = false; + startWaiting(); + } + + private void startWaiting() + { + if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) { + log("wait for ready"); + m_events.wait(m_readyEvent, &onReady); + m_waiting = true; + } } } diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index e21437f..f0d7b1a 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -30,6 +30,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { ms_allocator.parent = Mallocator.instance; } + package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; } + final package Duration getNextTimeout(long stdtime) @safe nothrow { if (m_timerQueue.length == 0) return Duration.max; diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d index 23b2952..04e20dd 100644 --- a/source/eventcore/internal/utils.d +++ b/source/eventcore/internal/utils.d @@ -180,8 +180,11 @@ struct SmallIntegerSet(V : size_t) { private { uint[][4] m_bits; + size_t m_count; } + @property bool empty() const { return m_count == 0; } + void insert(V i) { foreach (j; 0 .. m_bits.length) { @@ -189,6 +192,7 @@ struct SmallIntegerSet(V : size_t) i /= 32; if (i >= m_bits[j].length) m_bits[j].length = nextPOT(i+1); + if (j == 0 && !(m_bits[j][i] & b)) m_count++; m_bits[j][i] |= b; } } @@ -199,6 +203,7 @@ struct SmallIntegerSet(V : size_t) uint b = 1u << (i%32); i /= 32; if (!m_bits[j][i]) break; + if (j == 0 && m_bits[j][i] & b) m_count--; m_bits[j][i] &= ~b; if (m_bits[j][i]) break; } @@ -237,10 +242,13 @@ unittest { SmallIntegerSet!uint set; bool[uint] controlset; + + assert(set.empty); foreach (i; ints) { set.insert(i); controlset[i] = true; } + assert(!set.empty); foreach (jidx, j; ints) { size_t cnt = 0; @@ -256,6 +264,7 @@ unittest { set.remove(j); controlset.remove(j); } + assert(set.empty); foreach (i; set) assert(false); } diff --git a/tests/0-dirwatcher.d b/tests/0-dirwatcher.d index e6e599b..a5468c9 100644 --- a/tests/0-dirwatcher.d +++ b/tests/0-dirwatcher.d @@ -39,8 +39,8 @@ void main() assert(change.kind == FileChangeKind.removed); assert(change.directory == "."); assert(change.name == testFilename); + eventDriver.watchers.releaseRef(id); s_done = true; - eventDriver.core.exit(); break; } }); @@ -63,7 +63,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; diff --git a/tests/0-dns.d b/tests/0-dns.d index 08b5fe6..7f3a43b 100644 --- a/tests/0-dns.d +++ b/tests/0-dns.d @@ -17,17 +17,16 @@ void main() assert(status == DNSStatus.ok); assert(addrs.length >= 1); foreach (a; addrs) { - try writefln("addr %s (%s)", a.toAddrString(), a.toPortString()); + try writefln("addr %s (%s)", a.toAddrString(), a.toPortString()); catch (Exception e) assert(false, e.msg); } s_done = true; - eventDriver.core.exit(); }); ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-event.d b/tests/0-event.d index b0f9e2c..5d66633 100644 --- a/tests/0-event.d +++ b/tests/0-event.d @@ -27,7 +27,6 @@ void test(bool notify_all) assert(!s_done); s_done = true; eventDriver.timers.cancelWait(tm); - eventDriver.core.exit(); }); }); @@ -51,7 +50,7 @@ void test(bool notify_all) ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-file.d b/tests/0-file.d index b533d8e..58b33f5 100644 --- a/tests/0-file.d +++ b/tests/0-file.d @@ -18,11 +18,6 @@ void main() auto f = eventDriver.files.open("test.txt", FileOpenMode.createTrunc); assert(eventDriver.files.getSize(f) == 0); auto data = cast(const(ubyte)[])"Hello, World!"; - auto tm = eventDriver.timers.create(); - eventDriver.timers.set(tm, 500.msecs, 0.msecs); - eventDriver.timers.wait(tm, (tm) { - assert(false, "File operation stalled."); - }); eventDriver.files.write(f, 0, data[0 .. 7], IOMode.all, (f, status, nbytes) { assert(status == IOStatus.ok); @@ -48,7 +43,6 @@ void main() } (); eventDriver.files.releaseRef(f); s_done = true; - eventDriver.core.exit(); }); }); }); @@ -57,7 +51,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-signal.d b/tests/0-signal.d index 273349e..22252e8 100644 --- a/tests/0-signal.d +++ b/tests/0-signal.d @@ -21,8 +21,8 @@ void main() assert(!s_done); assert(status == SignalStatus.ok); assert(sig == () @trusted { return SIGUSR1; } ()); + eventDriver.signals.releaseRef(id); s_done = true; - eventDriver.core.exit(); }); auto tm = eventDriver.timers.create(); @@ -34,7 +34,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; diff --git a/tests/0-tcp-readwait.d b/tests/0-tcp-readwait.d index e599753..3b8bfae 100644 --- a/tests/0-tcp-readwait.d +++ b/tests/0-tcp-readwait.d @@ -43,9 +43,6 @@ void main() destroy(server); destroy(client); s_done = true; - - // FIXME: this shouldn't ne necessary: - eventDriver.core.exit(); })(s_rbuf, IOMode.immediate); })(s_rbuf[0 .. 0], IOMode.once); }); @@ -67,7 +64,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; diff --git a/tests/0-tcp.d b/tests/0-tcp.d index 2af5885..92708f4 100644 --- a/tests/0-tcp.d +++ b/tests/0-tcp.d @@ -43,9 +43,6 @@ void main() destroy(server); destroy(client); s_done = true; - - // FIXME: this shouldn't ne necessary: - eventDriver.core.exit(); })(s_rbuf, IOMode.once); })(s_rbuf, IOMode.once); }); @@ -62,7 +59,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-timer.d b/tests/0-timer.d index 264b68c..d75997f 100644 --- a/tests/0-timer.d +++ b/tests/0-timer.d @@ -34,10 +34,11 @@ void main() s_cnt++; assert(dur > 100.msecs * s_cnt); assert(dur < 100.msecs * s_cnt + 20.msecs); + assert(s_cnt <= 3); if (s_cnt == 3) { s_done = true; - eventDriver.core.exit(); + eventDriver.timers.stop(timer); } else eventDriver.timers.wait(tm, &secondTier); } catch (Exception e) { assert(false, e.msg); @@ -52,7 +53,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-udp.d b/tests/0-udp.d index 2a49f9e..0369237 100644 --- a/tests/0-udp.d +++ b/tests/0-udp.d @@ -55,9 +55,6 @@ void main() destroy(s_connectedSocket); s_done = true; log("done."); - - // FIXME: this shouldn't be necessary: - eventDriver.core.exit(); })(s_rbuf, IOMode.immediate); }); })(s_rbuf, IOMode.once); @@ -70,7 +67,7 @@ void main() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); } diff --git a/tests/0-usds.d b/tests/0-usds.d index b03e434..700ea6f 100644 --- a/tests/0-usds.d +++ b/tests/0-usds.d @@ -118,9 +118,6 @@ void testStream() destroy(server); destroy(client); s_done = true; - - // FIXME: this shouldn't ne necessary: - eventDriver.core.exit(); })(s_rbuf, IOMode.once); })(s_rbuf, IOMode.once); }); @@ -137,7 +134,7 @@ void testStream() ExitReason er; do er = eventDriver.core.processEvents(Duration.max); while (er == ExitReason.idle); - //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; } diff --git a/tests/0-waitercount.d b/tests/0-waitercount.d new file mode 100644 index 0000000..578102a --- /dev/null +++ b/tests/0-waitercount.d @@ -0,0 +1,33 @@ +/++ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ +module test; + +import eventcore.core; +import core.stdc.signal; +import core.time : Duration, msecs; +import core.thread : Thread; + + +void test(Duration timeout) +{ + while (true) { + auto reason = eventDriver.core.processEvents(timeout); + final switch (reason) with (ExitReason) { + case exited: assert(false, "Manual exit without call to exit()!?"); + case timeout: assert(false, "Event loop timed out, although no waiters exist!?"); + case idle: break; + case outOfWaiters: return; + } + } +} + +void main() +{ + assert(eventDriver.core.waiterCount == 0, "Initial waiter count not 0!"); + test(100.msecs); + assert(eventDriver.core.waiterCount == 0, "Waiter count after outOfWaiters not 0!"); + test(Duration.max); + assert(eventDriver.core.waiterCount == 0); +}