From 90a60f7981f7be3820cf110a3331fb65f4e94657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 21 Oct 2018 20:15:12 +0200 Subject: [PATCH 01/12] Add nogc typed alloc/free functions and make assert_nogc actually nogc. --- dub.sdl | 2 +- source/eventcore/internal/utils.d | 98 +++++++++++++++++++++++++++---- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/dub.sdl b/dub.sdl index d9ea537..be18279 100644 --- a/dub.sdl +++ b/dub.sdl @@ -8,7 +8,7 @@ targetType "library" libs "anl" "resolv" platform="linux" libs "ws2_32" "user32" platform="windows-dmd" -dependency "taggedalgebraic" version="~>0.10.4" +dependency "taggedalgebraic" version="~>0.10.12" configuration "epoll" { platforms "linux" diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d index c845648..2613522 100644 --- a/source/eventcore/internal/utils.d +++ b/source/eventcore/internal/utils.d @@ -1,5 +1,7 @@ module eventcore.internal.utils; +import core.memory : GC; +import std.traits : hasIndirections; import taggedalgebraic; @@ -8,17 +10,81 @@ void print(ARGS...)(string str, ARGS args) import std.format : formattedWrite; StdoutRange r; scope cb = () { - scope (failure) assert(false); - (&r).formattedWrite(str, args); + try (&r).formattedWrite(str, args); + catch (Exception e) assert(false, e.msg); }; (cast(void delegate() @nogc @safe nothrow)cb)(); r.put('\n'); } +T mallocT(T, ARGS...)(ARGS args) +@trusted @nogc { + import core.stdc.stdlib : malloc; + import std.conv : emplace; + + enum size = __traits(classInstanceSize, T); + auto ret = cast(T)malloc(size); + static if (hasIndirections!T) + GC.addRange(cast(void*)ret, __traits(classInstanceSize, T)); + scope doit = { emplace!T((cast(void*)ret)[0 .. size], args); }; + static if (__traits(compiles, () nothrow { typeof(doit).init(); })) // NOTE: doing the typeof thing here, because LDC 1.7.0 otherwise thinks doit gets escaped here + (cast(void delegate() @nogc nothrow)doit)(); + else + (cast(void delegate() @nogc)doit)(); + return ret; +} + +void freeT(T)(ref T inst) @nogc + if (is(T == class)) +{ + import core.stdc.stdlib : free; + + noGCDestroy(inst); + static if (hasIndirections!T) + GC.removeRange(cast(void*)inst); + free(cast(void*)inst); + inst = null; +} + +T[] mallocNT(T)(size_t cnt) +@trusted { + import core.stdc.stdlib : malloc; + import std.conv : emplace; + + auto ret = (cast(T*)malloc(T.sizeof * cnt))[0 .. cnt]; + static if (hasIndirections!T) + GC.addRange(cast(void*)ret, T.sizeof * cnt); + foreach (ref v; ret) + static if (!is(T == class)) + emplace!T(&v); + else v = null; + return ret; +} + +void freeNT(T)(ref T[] arr) +{ + import core.stdc.stdlib : free; + + foreach (ref v; arr) + static if (!is(T == class)) + destroy(v); + static if (hasIndirections!T) + GC.removeRange(arr.ptr); + free(arr.ptr); + arr = null; +} + +private void noGCDestroy(T)(ref T t) +@trusted { + // FIXME: only do this if the destructor chain is actually nogc + scope doit = { destroy(t); }; + (cast(void delegate() @nogc)doit)(); +} + private extern(C) Throwable.TraceInfo _d_traceContext(void* ptr = null); void nogc_assert(bool cond, string message, string file = __FILE__, int line = __LINE__) -@trusted nothrow { +@trusted nothrow @nogc { import core.stdc.stdlib : abort; import std.stdio : stderr; @@ -28,12 +94,15 @@ void nogc_assert(bool cond, string message, string file = __FILE__, int line = _ assert(false); } - stderr.writefln("Assertion failure @%s(%s): %s", file, line, message); - stderr.writeln("------------------------"); - if (auto info = _d_traceContext(null)) { - foreach (s; info) - stderr.writeln(s); - } else stderr.writeln("no stack trace available"); + scope doit = { + stderr.writefln("Assertion failure @%s(%s): %s", file, line, message); + stderr.writeln("------------------------"); + if (auto info = _d_traceContext(null)) { + foreach (s; info) + stderr.writeln(s); + } else stderr.writeln("no stack trace available"); + }; + (cast(void delegate() @nogc)doit)(); // write and _d_traceContext are not nogc } } @@ -53,14 +122,11 @@ struct StdoutRange { } struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { - import core.memory : GC; - static assert(nextPOT(CHUNK_SIZE) == CHUNK_SIZE, "CHUNK_SIZE must be a power of two for performance reasons."); @safe: nothrow: import core.stdc.stdlib : calloc, free, malloc, realloc; - import std.traits : hasIndirections; alias chunkSize = CHUNK_SIZE; @@ -183,7 +249,7 @@ struct AlgebraicChoppedVector(TCommon, TSpecific...) import std.format : format; string ret; foreach (i, U; TSpecific) - ret ~= "@property ref TSpecific[%s] %s() nothrow @safe { return this.specific.get!(TSpecific[%s]); }\n" + ret ~= "@property ref TSpecific[%s] %s() nothrow @safe @nogc { return this.specific.get!(TSpecific[%s]); }\n" .format(i, U.Handle.name, i); return ret; } @@ -207,10 +273,13 @@ struct SmallIntegerSet(V : size_t) size_t m_count; } + @disable this(this); + @property bool empty() const { return m_count == 0; } void insert(V i) { + assert(i >= 0); foreach (j; 0 .. m_bits.length) { uint b = 1u << (i%32); i /= 32; @@ -223,6 +292,9 @@ struct SmallIntegerSet(V : size_t) void remove(V i) { + assert(i >= 0); + if (i >= m_bits[0].length * 32) return; + foreach (j; 0 .. m_bits.length) { uint b = 1u << (i%32); i /= 32; From 4a605640bcb85eb21228a818b31c4b4eda268b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 21 Oct 2018 20:15:37 +0200 Subject: [PATCH 02/12] Make ConsumableQueue nogc. --- source/eventcore/internal/consumablequeue.d | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 63aec28..6fd6a04 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -1,5 +1,8 @@ module eventcore.internal.consumablequeue; +import eventcore.internal.utils : mallocNT, freeNT; + + /** FIFO queue with support for chunk-wise consumption. */ final class ConsumableQueue(T) @@ -18,6 +21,12 @@ final class ConsumableQueue(T) size_t m_pendingCount; } + ~this() + @trusted @nogc nothrow { + if (m_storage !is null) + freeNT(m_storage); + } + @property size_t length() const { return m_pendingCount; } @property bool empty() const { return length == 0; } @@ -43,11 +52,15 @@ final class ConsumableQueue(T) while (new_capacity < min_capacity) new_capacity *= 2; auto new_capacity_mask = new_capacity - 1; - auto new_storage = new Slot[new_capacity]; + auto new_storage = mallocNT!Slot(new_capacity); foreach (i; 0 .. m_consumedCount + m_pendingCount) new_storage[(m_first + i) & new_capacity_mask] = m_storage[(m_first + i) & m_capacityMask]; - m_storage = new_storage; + () @trusted { + if (m_storage !is null) + freeNT(m_storage); + m_storage = new_storage; + } (); m_capacityMask = new_capacity_mask; } From d66f2571796bcf75c5ff49186ee85ba01cb5e918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 22 Oct 2018 21:18:37 +0200 Subject: [PATCH 03/12] Avoid GC allocations for LoopTimeoutTimerDriver.m_firedTimers. --- source/eventcore/drivers/timer.d | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index 926400b..0f47f46 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -4,8 +4,9 @@ module eventcore.drivers.timer; import eventcore.driver; +import eventcore.internal.consumablequeue; import eventcore.internal.dlist; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : mallocT, freeT, nogc_assert; final class LoopTimeoutTimerDriver : EventDriverTimers { @@ -24,7 +25,7 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { TimerSlot*[TimerID] m_timers; StackDList!TimerSlot m_timerQueue; TimerID m_lastTimerID; - TimerSlot*[] m_firedTimers; + ConsumableQueue!(TimerSlot*) m_firedTimers; } static this() @@ -32,6 +33,17 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { ms_allocator.parent = Mallocator.instance; } + this() + @nogc @safe nothrow { + m_firedTimers = mallocT!(ConsumableQueue!(TimerSlot*)); + } + + ~this() + @nogc @trusted nothrow { + try freeT(m_firedTimers); + catch (Exception e) assert(false, e.msg); + } + package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; } final package Duration getNextTimeout(long stdtime) @@ -53,27 +65,24 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { do tm.timeout += tm.repeatDuration; while (tm.timeout <= stdtime); } else tm.pending = false; - m_firedTimers ~= tm; + m_firedTimers.put(tm); } - foreach (tm; m_firedTimers) { + auto processed_timers = m_firedTimers.consume(); + + foreach (tm; processed_timers) { m_timerQueue.remove(tm); if (tm.repeatDuration > 0) enqueueTimer(tm); } - foreach (tm; m_firedTimers) { + foreach (tm; processed_timers) { auto cb = tm.callback; tm.callback = null; if (cb) cb(tm.id); } - bool any_fired = m_firedTimers.length > 0; - - m_firedTimers.length = 0; - m_firedTimers.assumeSafeAppend(); - - return any_fired; + return processed_timers.length > 0; } final override TimerID create() From a4eaafce9a243443a709037ef4b539e957396e61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 22 Oct 2018 21:23:19 +0200 Subject: [PATCH 04/12] Avoid GC allocation in StaticTaskPool. --- source/eventcore/drivers/threadedfile.d | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index e2fcfb2..13dbe1a 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -421,10 +421,10 @@ private struct StaticTaskPool { if (!m_refCount++) { try { - m_pool = new TaskPool(4); + m_pool = mallocT!TaskPool(4); m_pool.isDaemon = true; } catch (Exception e) { - assert(false, "Failed to create file thread pool: "~e.msg); + assert(false, e.msg); } } @@ -447,8 +447,10 @@ private struct StaticTaskPool { if (fin_pool) { log("finishing thread pool"); - try fin_pool.finish(); - catch (Exception e) { + try { + fin_pool.finish(true); + freeT(fin_pool); + } catch (Exception e) { //log("Failed to shut down file I/O thread pool."); } } From e7e4a0f5f5cc0bbec9cd69424c4fd2d7d1e6165c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 21 Oct 2018 20:16:26 +0200 Subject: [PATCH 05/12] Make the Posix driver initialization nogc. --- source/eventcore/driver.d | 30 +++++++++++- source/eventcore/drivers/posix/dns.d | 4 +- source/eventcore/drivers/posix/driver.d | 58 ++++++++++++++-------- source/eventcore/drivers/posix/epoll.d | 11 ++--- source/eventcore/drivers/posix/events.d | 59 +++++++++-------------- source/eventcore/drivers/posix/kqueue.d | 33 ++++++++----- source/eventcore/drivers/posix/signals.d | 2 +- source/eventcore/drivers/posix/sockets.d | 14 ++++-- source/eventcore/drivers/posix/watchers.d | 2 +- 9 files changed, 128 insertions(+), 85 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index ed9059c..6d42ea1 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -352,13 +352,31 @@ interface EventDriverSockets { /** Retrieves a reference to a user-defined value associated with a descriptor. */ - @property final ref T userData(T, FD)(FD descriptor) - @trusted { + @property final ref T userData(T, FD)(FD descriptor) @trusted + if (!hasNoGCLifetime!T) + { import std.conv : emplace; static void init(void* ptr) { emplace(cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); } return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); } + /// ditto + @property final ref T userData(T, FD)(FD descriptor) @trusted @nogc + if (hasNoGCLifetime!T) + { + import std.conv : emplace; + static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } + static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } + + scope getter = { + return cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + }; + + static if (__traits(compiles, () nothrow @trusted { getter(); })) + return *(cast(T* delegate() @nogc nothrow)getter)(); + else + return *(cast(T* delegate() @nogc)getter)(); + } /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; @@ -368,6 +386,14 @@ interface EventDriverSockets { protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; } +enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); }); +unittest { + static struct S1 {} + static struct S2 { ~this() { new int; } } + static assert(hasNoGCLifetime!S1); + static assert(!hasNoGCLifetime!S2); +} + /** Performs asynchronous DNS queries. */ diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index f4067f8..1ec3e7d 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -43,7 +43,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver } this(Events events, Signals signals) - { + @nogc { m_events = events; setupEvent(); } @@ -139,7 +139,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver } private void setupEvent() - { + @nogc { if (m_event == EventID.invalid) { m_event = m_events.createInternal(); m_events.wait(m_event, &onDNSSignal); diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 65e75df..f6c5621 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -64,16 +64,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { } this() - { - m_loop = new Loop; - m_sockets = new SocketsDriver(m_loop); - m_events = new EventsDriver(m_loop, m_sockets); - m_signals = new SignalsDriver(m_loop); - m_timers = new TimerDriver; - m_core = new CoreDriver(m_loop, m_timers, m_events); - m_dns = new DNSDriver(m_events, m_signals); - m_files = new FileDriver(m_events); - m_watchers = new WatcherDriver(m_events); + @nogc @trusted { + m_loop = mallocT!Loop; + m_sockets = mallocT!SocketsDriver(m_loop); + m_events = mallocT!EventsDriver(m_loop, m_sockets); + m_signals = mallocT!SignalsDriver(m_loop); + m_timers = mallocT!TimerDriver; + m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); + m_dns = mallocT!DNSDriver(m_events, m_signals); + m_files = mallocT!FileDriver(m_events); + m_watchers = mallocT!WatcherDriver(m_events); } // force overriding these in the (final) sub classes to avoid virtual calls @@ -96,6 +96,19 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_core.dispose(); m_loop.dispose(); m_loop = null; + + try () @trusted { + freeT(m_watchers); + freeT(m_files); + freeT(m_dns); + freeT(m_core); + freeT(m_timers); + freeT(m_signals); + freeT(m_events); + freeT(m_sockets); + freeT(m_loop); + } (); + catch (Exception e) assert(false, e.msg); } } @@ -121,29 +134,32 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; } - protected this(Loop loop, Timers timers, Events events) - { + this(Loop loop, Timers timers, Events events) + @nogc { m_loop = loop; m_timers = timers; m_events = events; m_wakeupEvent = events.createInternal(); static if (__VERSION__ >= 2074) - m_threadCallbackMutex = new shared Mutex; + m_threadCallbackMutex = mallocT!(shared(Mutex)); else { - () @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); + () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } (); } - m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); + m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t))); m_threadCallbacks.reserve(1000); } - protected final void dispose() + final void dispose() { executeThreadCallbacks(); m_events.releaseRef(m_wakeupEvent); - atomicStore(m_threadCallbackMutex, null); m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized! + try { + () @trusted { freeT(m_threadCallbackMutex); } (); + () @trusted { freeT(m_threadCallbacks); } (); + } catch (Exception e) assert(false, e.msg); } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @@ -274,11 +290,11 @@ package class PosixEventLoop { protected abstract bool doProcessEvents(Duration dur); /// Registers the FD for general notification reception. - protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true); + protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc; /// Unregisters the FD for general notification reception. - protected abstract void unregisterFD(FD fd, EventMask mask); + protected abstract void unregisterFD(FD fd, EventMask mask) @nogc; /// Updates the event mask to use for listening for notifications. - protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true); + protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc; final protected void notify(EventType evt)(FD fd) { @@ -342,7 +358,7 @@ package class PosixEventLoop { } package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { + @system @nogc { FDSlot* fds = &m_fds[descriptor].common; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); diff --git a/source/eventcore/drivers/posix/epoll.d b/source/eventcore/drivers/posix/epoll.d index cb11929..e33f7d2 100644 --- a/source/eventcore/drivers/posix/epoll.d +++ b/source/eventcore/drivers/posix/epoll.d @@ -5,7 +5,7 @@ numbers of concurrently open sockets. */ module eventcore.drivers.posix.epoll; -@safe: /*@nogc:*/ nothrow: +@safe @nogc nothrow: version (linux): @@ -22,17 +22,16 @@ static if (!is(typeof(SOCK_CLOEXEC))) enum SOCK_CLOEXEC = 0x80000; final class EpollEventLoop : PosixEventLoop { -@safe: nothrow: +@safe nothrow: private { int m_epoll; - epoll_event[] m_events; + epoll_event[100] m_events; } this() - { + @nogc { m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } (); - m_events.length = 100; } override bool doProcessEvents(Duration timeout) @@ -60,7 +59,7 @@ final class EpollEventLoop : PosixEventLoop { } override void dispose() - { + @nogc { import core.sys.posix.unistd : close; close(m_epoll); } diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 6cff6bb..ba51c7b 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -4,7 +4,8 @@ module eventcore.drivers.posix.events; import eventcore.driver; import eventcore.drivers.posix.driver; import eventcore.internal.consumablequeue : ConsumableQueue; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : nogc_assert, mallocT, freeT; + version (linux) { nothrow @nogc extern (C) int eventfd(uint initval, int flags); @@ -25,21 +26,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS Loop m_loop; Sockets m_sockets; ubyte[ulong.sizeof] m_buf; - version (linux) {} - else { - // TODO: avoid the overhead of a mutex backed map here - import core.sync.mutex : Mutex; - Mutex m_eventsMutex; - EventID[DatagramSocketFD] m_events; - } } this(Loop loop, Sockets sockets) - { + @nogc { m_loop = loop; m_sockets = sockets; - version (linux) {} - else m_eventsMutex = new Mutex; } package @property Loop loop() { return m_loop; } @@ -50,14 +42,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } package(eventcore) EventID createInternal(bool is_internal = true) - { + @nogc { version (linux) { auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (eid == -1) return EventID.invalid; auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(new ConsumableQueue!EventCallback, false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -83,7 +75,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } else { // fake missing socketpair support on Windows import std.socket : InternetAddress; - auto addr = new InternetAddress(0x7F000001, 0); + scope addr = new InternetAddress(0x7F000001, 0); auto s = m_sockets.createDatagramSocketInternal(addr, null, true); if (s == DatagramSocketFD.invalid) return EventID.invalid; fd[0] = cast(sock_t)s; @@ -106,18 +98,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } } - m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); // use the second socket as the event ID and as the sending end for // other threads auto id = cast(EventID)fd[1]; - try { - synchronized (m_eventsMutex) - m_events[s] = id; - } catch (Exception e) assert(false, e.msg); + try m_sockets.userData!EventID(s) = id; + catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); assert(getRC(id) == 1); return id; } @@ -142,7 +132,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override void trigger(EventID event, bool notify_all) - shared @trusted { + shared @trusted @nogc { import core.atomic : atomicStore; auto thisus = cast(PosixEventDriverEvents)this; assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); @@ -154,7 +144,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override void wait(EventID event, EventCallback on_event) - { + @nogc { if (!isInternal(event)) m_loop.m_waiterCount++; getSlot(event).waiters.put(on_event); } @@ -183,13 +173,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS version (linux) {} else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) - { - m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - EventID evt; + @nogc { + m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { - synchronized (m_eventsMutex) - evt = m_events[s]; - onEvent(evt); + EventID evt = m_sockets.userData!EventID(s); + scope doit = { onEvent(evt); }; // cast to nogc + () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } } @@ -201,13 +190,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override bool releaseRef(EventID descriptor) - { + @nogc { nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); if (--getRC(descriptor) == 0) { if (!isInternal(descriptor)) m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; () @trusted nothrow { - try .destroy(getSlot(descriptor).waiters); + try freeT(getSlot(descriptor).waiters); catch (Exception e) nogc_assert(false, e.msg); } (); version (linux) { @@ -216,10 +205,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto rs = getSlot(descriptor).recvSocket; m_sockets.cancelReceive(rs); m_sockets.releaseRef(rs); - try { - synchronized (m_eventsMutex) - m_events.remove(rs); - } catch (Exception e) nogc_assert(false, e.msg); } m_loop.clearFD!EventSlot(descriptor); version (Posix) close(cast(int)descriptor); @@ -235,9 +220,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } private EventSlot* getSlot(EventID id) - { + @nogc { nogc_assert(id < m_loop.m_fds.length, "Invalid event ID."); - return () @trusted { return &m_loop.m_fds[id].event(); } (); + return () @trusted @nogc { return &m_loop.m_fds[id].event(); } (); } private ref uint getRC(EventID id) @@ -246,7 +231,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } private bool isInternal(EventID id) - { + @nogc { return getSlot(id).isInternal; } } diff --git a/source/eventcore/drivers/posix/kqueue.d b/source/eventcore/drivers/posix/kqueue.d index e4c3ac6..959f75d 100644 --- a/source/eventcore/drivers/posix/kqueue.d +++ b/source/eventcore/drivers/posix/kqueue.d @@ -34,14 +34,14 @@ alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop; final class KqueueEventLoop : PosixEventLoop { private { int m_queue; - kevent_t[] m_changes; - kevent_t[] m_events; + size_t m_changeCount = 0; + kevent_t[100] m_changes; + kevent_t[100] m_events; } this() - @safe nothrow { + @safe nothrow @nogc { m_queue = () @trusted { return kqueue(); } (); - m_events.length = 100; assert(m_queue >= 0, "Failed to create kqueue."); } @@ -57,9 +57,8 @@ final class KqueueEventLoop : PosixEventLoop { ts.tv_sec = cast(time_t)secs; ts.tv_nsec = cast(uint)hnsecs * 100; - auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changes.length, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); - m_changes.length = 0; - m_changes.assumeSafeAppend(); + auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); + m_changeCount = 0; //print("kevent returned %s", ret); @@ -97,11 +96,11 @@ final class KqueueEventLoop : PosixEventLoop { if (edge_triggered) ev.flags |= EV_CLEAR; if (mask & EventMask.read) { ev.filter = EVFILT_READ; - m_changes ~= ev; + putChange(ev); } if (mask & EventMask.write) { ev.filter = EVFILT_WRITE; - m_changes ~= ev; + putChange(ev); } //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; } @@ -111,7 +110,7 @@ final class KqueueEventLoop : PosixEventLoop { kevent_t ev; ev.ident = fd; ev.flags = EV_DELETE; - m_changes ~= ev; + putChange(ev); } override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @@ -124,16 +123,26 @@ final class KqueueEventLoop : PosixEventLoop { ev.filter = EVFILT_READ; ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE; if (edge_triggered) ev.flags |= EV_CLEAR; - m_changes ~= ev; + putChange(ev); } if (changes & EventMask.write) { ev.filter = EVFILT_WRITE; ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE; if (edge_triggered) ev.flags |= EV_CLEAR; - m_changes ~= ev; + putChange(ev); } //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; } + + private void putChange(ref kevent_t ev) + @safe nothrow @nogc { + m_changes[m_changeCount++] = ev; + if (m_changeCount == m_changes.length) { + auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) (); + assert(ret == 0); + m_changeCount = 0; + } + } } diff --git a/source/eventcore/drivers/posix/signals.d b/source/eventcore/drivers/posix/signals.d index 8297ec9..fdd45ae 100644 --- a/source/eventcore/drivers/posix/signals.d +++ b/source/eventcore/drivers/posix/signals.d @@ -17,7 +17,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna private Loop m_loop; - this(Loop loop) { m_loop = loop; } + this(Loop loop) @nogc { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) { diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 65c5157..c647975 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -654,7 +654,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false) - { + @nogc { auto fd = DatagramSocketFD(socket); if (m_loop.m_fds[fd].common.refCount) // FD already in use? return DatagramSocketFD.init; @@ -742,8 +742,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets on_receive_finish(socket, IOStatus.ok, ret, src_addrc); } + package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish) + @trusted @nogc { + scope void delegate() @safe nothrow do_it = { + receive(socket, buffer, mode, on_receive_finish); + }; + (cast(void delegate() @safe nothrow @nogc)do_it)(); + } + void cancelReceive(DatagramSocketFD socket) - { + @nogc { assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.m_fds[socket].datagramSocket.readBuffer = null; @@ -855,7 +863,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } final override bool releaseRef(SocketFD fd) - { + @nogc { import taggedalgebraic : hasType; auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 0dd1f1a..4af45a0 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -245,7 +245,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat } this(Events events) - { + @nogc { m_events = events; } From 284d4f43c3a5dc0ade56547ae41af08ce9682756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 22 Oct 2018 22:05:37 +0200 Subject: [PATCH 06/12] Make the WinAPI driver initialization nogc. --- source/eventcore/drivers/winapi/core.d | 48 ++++++++++++++-------- source/eventcore/drivers/winapi/driver.d | 37 ++++++++++++----- source/eventcore/drivers/winapi/events.d | 8 ++-- source/eventcore/drivers/winapi/files.d | 2 +- source/eventcore/drivers/winapi/sockets.d | 20 ++++----- source/eventcore/drivers/winapi/watchers.d | 2 +- 6 files changed, 72 insertions(+), 45 deletions(-) diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index e10c8ad..1c55fa8 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -5,7 +5,7 @@ version (Windows): import eventcore.driver; import eventcore.drivers.timer; import eventcore.internal.consumablequeue; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : mallocT, freeT, nogc_assert; import eventcore.internal.win32; import core.sync.mutex : Mutex; import core.time : Duration; @@ -21,8 +21,9 @@ final class WinAPIEventDriverCore : EventDriverCore { size_t m_waiterCount; DWORD m_tid; LoopTimeoutTimerDriver m_timers; - HANDLE[] m_registeredEvents; - void delegate() @safe nothrow[HANDLE] m_eventCallbacks; + HANDLE[MAXIMUM_WAIT_OBJECTS] m_registeredEvents; + void delegate() @safe nothrow[MAXIMUM_WAIT_OBJECTS] m_registeredEventCallbacks; + DWORD m_registeredEventCount = 0; HANDLE m_fileCompletionEvent; ConsumableQueue!IOEvent m_ioEvents; @@ -35,27 +36,36 @@ final class WinAPIEventDriverCore : EventDriverCore { } this(LoopTimeoutTimerDriver timers) - { + @nogc { m_timers = timers; m_tid = () @trusted { return GetCurrentThreadId(); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); registerEvent(m_fileCompletionEvent); - m_ioEvents = new ConsumableQueue!IOEvent; + m_ioEvents = mallocT!(ConsumableQueue!IOEvent); static if (__VERSION__ >= 2074) - m_threadCallbackMutex = new shared Mutex; + m_threadCallbackMutex = mallocT!(shared(Mutex)); else { - () @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); + () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } (); } - m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); + m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t))); m_threadCallbacks.reserve(1000); } + void dispose() + @trusted { + try { + freeT(m_threadCallbacks); + freeT(m_threadCallbackMutex); + freeT(m_ioEvents); + } catch (Exception e) assert(false, e.msg); + } + override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; } - package void addWaiter() { m_waiterCount++; } + package void addWaiter() @nogc { m_waiterCount++; } package void removeWaiter() - { + @nogc { assert(m_waiterCount > 0, "Decrementing waiter count below zero."); m_waiterCount--; } @@ -159,7 +169,7 @@ final class WinAPIEventDriverCore : EventDriverCore { bool got_event; DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max(max_wait.total!"msecs", 0), DWORD.max); - auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, + auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(m_registeredEventCount, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); while (!m_ioEvents.empty) { @@ -168,9 +178,9 @@ final class WinAPIEventDriverCore : EventDriverCore { } if (ret == WAIT_IO_COMPLETION) got_event = true; - else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { - if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) { - (*pc)(); + else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEventCount) { + if (auto cb = m_registeredEventCallbacks[ret - WAIT_OBJECT_0]) { + cb(); got_event = true; } } @@ -209,9 +219,11 @@ final class WinAPIEventDriverCore : EventDriverCore { package void registerEvent(HANDLE event, void delegate() @safe nothrow callback = null) - { - m_registeredEvents ~= event; - if (callback) m_eventCallbacks[event] = callback; + @nogc { + assert(m_registeredEventCount < MAXIMUM_WAIT_OBJECTS, "Too many registered events."); + m_registeredEvents[m_registeredEventCount] = event; + if (callback) m_registeredEventCallbacks[m_registeredEventCount] = callback; + m_registeredEventCount++; } package SlotType* setupSlot(SlotType)(HANDLE h) @@ -231,7 +243,7 @@ final class WinAPIEventDriverCore : EventDriverCore { } package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...) - { +@nogc { import std.algorithm.searching : canFind; m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); } diff --git a/source/eventcore/drivers/winapi/driver.d b/source/eventcore/drivers/winapi/driver.d index 7ef3281..d8bd0c3 100644 --- a/source/eventcore/drivers/winapi/driver.d +++ b/source/eventcore/drivers/winapi/driver.d @@ -18,6 +18,7 @@ import eventcore.drivers.winapi.files; import eventcore.drivers.winapi.signals; import eventcore.drivers.winapi.sockets; import eventcore.drivers.winapi.watchers; +import eventcore.internal.utils : mallocT, freeT; import core.sys.windows.windows; static assert(HANDLE.sizeof <= FD.BaseType.sizeof); @@ -39,23 +40,25 @@ final class WinAPIEventDriver : EventDriver { static WinAPIEventDriver threadInstance; this() - @safe { + @safe nothrow @nogc { assert(threadInstance is null); threadInstance = this; import std.exception : enforce; WSADATA wd; - enforce(() @trusted { return WSAStartup(0x0202, &wd); } () == 0, "Failed to initialize WinSock"); - m_signals = new WinAPIEventDriverSignals(); - m_timers = new LoopTimeoutTimerDriver(); - m_core = new WinAPIEventDriverCore(m_timers); - m_events = new WinAPIEventDriverEvents(m_core); - m_files = new WinAPIEventDriverFiles(m_core); - m_sockets = new WinAPIEventDriverSockets(m_core); - m_dns = new WinAPIEventDriverDNS(); - m_watchers = new WinAPIEventDriverWatchers(m_core); + auto res = () @trusted { return WSAStartup(0x0202, &wd); } (); + assert(res == 0, "Failed to initialize WinSock"); + + m_signals = mallocT!WinAPIEventDriverSignals(); + m_timers = mallocT!LoopTimeoutTimerDriver(); + m_core = mallocT!WinAPIEventDriverCore(m_timers); + m_events = mallocT!WinAPIEventDriverEvents(m_core); + m_files = mallocT!WinAPIEventDriverFiles(m_core); + m_sockets = mallocT!WinAPIEventDriverSockets(m_core); + m_dns = mallocT!WinAPIEventDriverDNS(); + m_watchers = mallocT!WinAPIEventDriverWatchers(m_core); } @safe: /*@nogc:*/ nothrow: @@ -75,8 +78,20 @@ final class WinAPIEventDriver : EventDriver { { if (!m_events) return; m_events.dispose(); - m_events = null; + m_core.dispose(); assert(threadInstance !is null); threadInstance = null; + + try () @trusted { + freeT(m_watchers); + freeT(m_dns); + freeT(m_sockets); + freeT(m_files); + freeT(m_events); + freeT(m_core); + freeT(m_timers); + freeT(m_signals); + } (); + catch (Exception e) assert(false, e.msg); } } diff --git a/source/eventcore/drivers/winapi/events.d b/source/eventcore/drivers/winapi/events.d index c76b224..0134c0c 100644 --- a/source/eventcore/drivers/winapi/events.d +++ b/source/eventcore/drivers/winapi/events.d @@ -6,7 +6,7 @@ import eventcore.driver; import eventcore.drivers.winapi.core; import eventcore.internal.win32; import eventcore.internal.consumablequeue; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : mallocT, freeT, nogc_assert; final class WinAPIEventDriverEvents : EventDriverEvents { @@ -31,10 +31,10 @@ final class WinAPIEventDriverEvents : EventDriverEvents { } this(WinAPIEventDriverCore core) - { + @nogc { m_core = core; m_event = () @trusted { return CreateEvent(null, false, false, null); } (); - m_pending = new ConsumableQueue!Trigger; // FIXME: avoid GC allocation + m_pending = mallocT!(ConsumableQueue!Trigger); // FIXME: avoid GC allocation InitializeCriticalSection(&m_mutex); m_core.registerEvent(m_event, &triggerPending); } @@ -42,7 +42,7 @@ final class WinAPIEventDriverEvents : EventDriverEvents { void dispose() @trusted { scope (failure) assert(false); - destroy(m_pending); + freeT(m_pending); } override EventID create() diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index 5dac5f8..32bae89 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -17,7 +17,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } this(WinAPIEventDriverCore core) - { + @nogc { m_core = core; } diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 7ef71fb..1263926 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -25,7 +25,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } this(WinAPIEventDriverCore core) - @trusted { + @trusted @nogc { m_tid = GetCurrentThreadId(); m_core = core; @@ -406,7 +406,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } override void cancelRead(StreamSocketFD socket) - @trusted { + @trusted @nogc { if (!m_sockets[socket].streamSocket.read.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped); m_sockets[socket].streamSocket.read.callback = null; @@ -414,7 +414,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } override void cancelWrite(StreamSocketFD socket) - @trusted { + @trusted @nogc { if (!m_sockets[socket].streamSocket.write.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped); m_sockets[socket].streamSocket.write.callback = null; @@ -549,7 +549,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } override void cancelReceive(DatagramSocketFD socket) - @trusted { + @trusted @nogc { if (!m_sockets[socket].datagramSocket.read.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped); m_sockets[socket].datagramSocket.read.callback = null; @@ -643,7 +643,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } override void cancelSend(DatagramSocketFD socket) - @trusted { + @trusted @nogc { if (!m_sockets[socket].datagramSocket.write.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped); m_sockets[socket].datagramSocket.write.callback = null; @@ -719,7 +719,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } override bool releaseRef(SocketFD fd) - { + @nogc { import taggedalgebraic : hasType; auto slot = () @trusted { return &m_sockets[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); @@ -787,7 +787,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } private void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { + @system @nogc { SocketSlot* fds = &m_sockets[descriptor].common; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); @@ -808,7 +808,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } package void clearSocketSlot(FD fd) - { + @nogc { auto slot = () @trusted { return &m_sockets[fd]; } (); if (slot.common.userDataDestructor) () @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } (); @@ -889,8 +889,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } } -void setupWindowClass() nothrow -@trusted { +void setupWindowClass() +@trusted nothrow @nogc { static __gshared registered = false; if (registered) return; diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index ea6bdb6..bb7c162 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -16,7 +16,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { } this(WinAPIEventDriverCore core) - { + @nogc { m_core = core; } From 73abd867b4fc6f7f957bceab1410d1fa0150820e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 21 Oct 2018 20:16:57 +0200 Subject: [PATCH 07/12] Initialize the driver instance lazily. --- source/eventcore/core.d | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/source/eventcore/core.d b/source/eventcore/core.d index d469d66..8659998 100644 --- a/source/eventcore/core.d +++ b/source/eventcore/core.d @@ -7,6 +7,7 @@ import eventcore.drivers.posix.epoll; import eventcore.drivers.posix.kqueue; import eventcore.drivers.libasync; import eventcore.drivers.winapi.driver; +import eventcore.internal.utils : mallocT, freeT; version (EventcoreEpollDriver) alias NativeEventDriver = EpollEventDriver; else version (EventcoreKqueueDriver) alias NativeEventDriver = KqueueEventDriver; @@ -19,8 +20,11 @@ else alias NativeEventDriver = EventDriver; @safe @nogc nothrow { static if (is(NativeEventDriver == EventDriver)) assert(s_driver !is null, "setupEventDriver() was not called for this thread."); - else - assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); + else { + if (!s_driver) { + s_driver = mallocT!NativeEventDriver(); + } + } return s_driver; } @@ -30,7 +34,6 @@ static if (!is(NativeEventDriver == EventDriver)) { if (!s_isMainThread) { if (!s_initCount++) { assert(s_driver is null); - s_driver = new NativeEventDriver; } } } @@ -38,15 +41,18 @@ static if (!is(NativeEventDriver == EventDriver)) { static ~this() { if (!s_isMainThread) { - if (!--s_initCount) - s_driver.dispose(); + if (!--s_initCount) { + if (s_driver) { + s_driver.dispose(); + freeT(s_driver); + } + } } } shared static this() { if (!s_initCount++) { - s_driver = new NativeEventDriver; s_isMainThread = true; } } @@ -54,7 +60,10 @@ static if (!is(NativeEventDriver == EventDriver)) { shared static ~this() { if (!--s_initCount) { - s_driver.dispose(); + if (s_driver) { + s_driver.dispose(); + freeT(s_driver); + } } } } else { From 0b73eda8d5df9f87fa22c92494794abd5fe4a85d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 22 Oct 2018 11:05:30 +0200 Subject: [PATCH 08/12] Deprecate the non-nogc userData overload. This allows more of the implementation to become nogc. --- source/eventcore/driver.d | 40 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 6d42ea1..aec1967 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -352,38 +352,34 @@ interface EventDriverSockets { /** Retrieves a reference to a user-defined value associated with a descriptor. */ - @property final ref T userData(T, FD)(FD descriptor) @trusted - if (!hasNoGCLifetime!T) - { - import std.conv : emplace; - static void init(void* ptr) { emplace(cast(T*)ptr); } - static void destr(void* ptr) { destroy(*cast(T*)ptr); } - return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); - } - /// ditto @property final ref T userData(T, FD)(FD descriptor) @trusted @nogc if (hasNoGCLifetime!T) { import std.conv : emplace; static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } - - scope getter = { - return cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); - }; - - static if (__traits(compiles, () nothrow @trusted { getter(); })) - return *(cast(T* delegate() @nogc nothrow)getter)(); - else - return *(cast(T* delegate() @nogc)getter)(); + return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + } + /// ditto + deprecated("Only @nogc constructible and destructible user data allowed.") + @property final ref T userData(T, FD)(FD descriptor) @trusted + if (!hasNoGCLifetime!T) + { + import std.conv : emplace; + static void init(void* ptr) { emplace(cast(T*)ptr); } + static void destr(void* ptr) { destroy(*cast(T*)ptr); } + static if (__traits(compiles, () nothrow { init(null); destr(null); })) + alias F = void function(void*) @nogc nothrow; + else alias F = void function(void*) @nogc; + return *cast(T*)rawUserData(descriptor, T.sizeof, cast(F)&init, cast(F)&destr); } /// Low-level user data access. Use `getUserData` instead. - protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; + protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; /// ditto - protected void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; + protected void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; /// ditto - protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; + protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; } enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); }); @@ -668,7 +664,7 @@ alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias TimerCallback = void delegate(TimerID); alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change); -@system alias DataInitializer = void function(void*); +@system alias DataInitializer = void function(void*) @nogc; enum ExitReason { timeout, From 6e839de7e22b4ca28aa0ca035797beb8e771b6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 24 Oct 2018 10:44:38 +0200 Subject: [PATCH 09/12] Refactor PollEventDriverWatchers. - Better encapsulates the mutex protection inside PollingThread - Uses mallocT/freeT to allocate snapshot nodes --- source/eventcore/drivers/posix/watchers.d | 72 +++++++++++++---------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 4af45a0..c747299 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -3,7 +3,7 @@ module eventcore.drivers.posix.watchers; import eventcore.driver; import eventcore.drivers.posix.driver; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : mallocT, freeT, nogc_assert; final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers @@ -313,42 +313,40 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat private void onEvent(EventID evt) { - import std.algorithm.mutation : swap; - auto pt = evt in m_pollers; if (!pt) return; m_events.wait(evt, &onEvent); - FileChange[] changes; - try synchronized (pt.m_changesMutex) - swap(changes, pt.m_changes); - catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); - - foreach (ref ch; changes) + foreach (ref ch; pt.readChanges()) pt.m_callback(cast(WatcherID)evt, ch); } private final class PollingThread : Thread { - int refCount = 1; - EventID changesEvent; - private { shared(Events) m_eventsDriver; Mutex m_changesMutex; - /*shared*/ FileChange[] m_changes; + /*shared*/ FileChange[] m_changes; // protected by m_changesMutex + EventID m_changesEvent; // protected by m_changesMutex immutable string m_basePath; immutable bool m_recursive; immutable FileChangesCallback m_callback; - size_t m_entryCount; - struct Entry { - Entry* parent; + final static class Entry { + Entry parent; string name; ulong size; long lastChange; + this(Entry parent, string name, ulong size, long lastChange) + { + this.parent = parent; + this.name = name; + this.size = size; + this.lastChange = lastChange; + } + string path() { import std.path : buildPath; @@ -361,11 +359,13 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat } struct Key { - Entry* parent; + Entry parent; string name; } - Entry*[Key] m_entries; + // used only within the polling thread + Entry[Key] m_entries; + size_t m_entryCount; } this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) @@ -374,7 +374,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat m_changesMutex = new Mutex; m_eventsDriver = event_driver; - changesEvent = event; + m_changesEvent = event; m_basePath = path; m_recursive = recursive; m_callback = callback; @@ -387,10 +387,21 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat void dispose() nothrow { try synchronized (m_changesMutex) { - changesEvent = EventID.invalid; + m_changesEvent = EventID.invalid; } catch (Exception e) assert(false, e.msg); } + FileChange[] readChanges() + nothrow { + import std.algorithm.mutation : swap; + + FileChange[] changes; + try synchronized (m_changesMutex) + swap(changes, m_changes); + catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); + return changes; + } + private void run() nothrow @trusted { import core.time : msecs; @@ -401,7 +412,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs; while (true) { try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) return; + if (m_changesEvent == EventID.invalid) return; } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); auto remaining = timeout - Clock.currTime(UTC()); if (remaining <= 0.msecs) break; @@ -411,9 +422,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat scan(true); try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) return; + if (m_changesEvent == EventID.invalid) return; if (m_changes.length) - m_eventsDriver.trigger(changesEvent, false); + m_eventsDriver.trigger(m_changesEvent, false); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Throwable th) { import core.stdc.stdio : fprintf, stderr; @@ -435,8 +446,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat @trusted nothrow { import std.algorithm.mutation : swap; - Entry*[Key] new_entries; - Entry*[] added; + Entry[Key] new_entries; + Entry[] added; size_t ec = 0; scan(null, generate_changes, new_entries, added, ec); @@ -445,12 +456,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (generate_changes) addChange(FileChangeKind.removed, e.key, e.value.isDir); + try freeT(e.value); + catch (Exception e) assert(false, e.msg); } - - static if (__VERSION__ >= 2079) { - import core.memory : __delete; - __delete(e.value); - } else mixin("delete e.value;"); } foreach (e; added) @@ -460,7 +468,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat m_entryCount = ec; } - private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec) + private void scan(Entry parent, bool generate_changes, ref Entry[Key] new_entries, ref Entry[] added, ref size_t ec) @trusted nothrow { import std.file : SpanMode, dirEntries; import std.path : buildPath, baseName; @@ -486,7 +494,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat ec++; m_entries.remove(key); } else { - auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); + auto e = mallocT!Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); new_entries[key] = e; ec++; if (generate_changes) added ~= e; From 406367b6c62ee53d98f82d1db751b2deee22b8b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 24 Oct 2018 12:08:38 +0200 Subject: [PATCH 10/12] Fix epoll compile error on older frontends. --- source/eventcore/drivers/posix/epoll.d | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/source/eventcore/drivers/posix/epoll.d b/source/eventcore/drivers/posix/epoll.d index e33f7d2..6a700b0 100644 --- a/source/eventcore/drivers/posix/epoll.d +++ b/source/eventcore/drivers/posix/epoll.d @@ -31,7 +31,9 @@ final class EpollEventLoop : PosixEventLoop { this() @nogc { - m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } (); + assumeSafeNoGC({ + m_epoll = epoll_create1(SOCK_CLOEXEC); + }); } override bool doProcessEvents(Duration timeout) @@ -73,13 +75,17 @@ final class EpollEventLoop : PosixEventLoop { if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; ev.data.fd = cast(int)fd; - () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); } (); + assumeSafeNoGC({ + epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); + }); } override void unregisterFD(FD fd, EventMask mask) { debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd); - () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } (); + assumeSafeNoGC({ + epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); + }); } override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) @@ -92,7 +98,9 @@ final class EpollEventLoop : PosixEventLoop { if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; ev.data.fd = cast(int)fd; - () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev); } (); + assumeSafeNoGC({ + epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev); + }); } } @@ -102,3 +110,8 @@ private timeval toTimeVal(Duration dur) dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); return tvdur; } + +private void assumeSafeNoGC(scope void delegate() nothrow doit) +@trusted { + (cast(void delegate() nothrow @nogc)doit)(); +} From 4d8d08b27ded7a66c047cf90d649c89e819ae30d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 24 Oct 2018 21:05:45 +0200 Subject: [PATCH 11/12] Fix destruction code in ChoppedVector. --- source/eventcore/internal/utils.d | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d index 2613522..03223cf 100644 --- a/source/eventcore/internal/utils.d +++ b/source/eventcore/internal/utils.d @@ -39,6 +39,8 @@ void freeT(T)(ref T inst) @nogc { import core.stdc.stdlib : free; + if (!inst) return; + noGCDestroy(inst); static if (hasIndirections!T) GC.removeRange(cast(void*)inst); @@ -151,12 +153,13 @@ struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { @nogc { () @trusted { foreach (i; 0 .. m_chunkCount) { - destroy(m_chunks[i]); + destroy(*m_chunks[i]); static if (hasIndirections!T) GC.removeRange(m_chunks[i]); free(m_chunks[i]); } free(m_chunks.ptr); + m_chunks = null; } (); m_chunkCount = 0; m_length = 0; From c553df66e108424ec347a5eccfd0c7cef597265b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 24 Oct 2018 21:06:31 +0200 Subject: [PATCH 12/12] Fix freeing of the loop in PosixEventDriver --- source/eventcore/drivers/posix/driver.d | 1 - 1 file changed, 1 deletion(-) diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index f6c5621..973b3db 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -95,7 +95,6 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_dns.dispose(); m_core.dispose(); m_loop.dispose(); - m_loop = null; try () @trusted { freeT(m_watchers);