diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index ed9059c..6d42ea1 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -352,13 +352,31 @@ interface EventDriverSockets { /** Retrieves a reference to a user-defined value associated with a descriptor. */ - @property final ref T userData(T, FD)(FD descriptor) - @trusted { + @property final ref T userData(T, FD)(FD descriptor) @trusted + if (!hasNoGCLifetime!T) + { import std.conv : emplace; static void init(void* ptr) { emplace(cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); } return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); } + /// ditto + @property final ref T userData(T, FD)(FD descriptor) @trusted @nogc + if (hasNoGCLifetime!T) + { + import std.conv : emplace; + static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } + static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } + + scope getter = { + return cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + }; + + static if (__traits(compiles, () nothrow @trusted { getter(); })) + return *(cast(T* delegate() @nogc nothrow)getter)(); + else + return *(cast(T* delegate() @nogc)getter)(); + } /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; @@ -368,6 +386,14 @@ interface EventDriverSockets { protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; } +enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); }); +unittest { + static struct S1 {} + static struct S2 { ~this() { new int; } } + static assert(hasNoGCLifetime!S1); + static assert(!hasNoGCLifetime!S2); +} + /** Performs asynchronous DNS queries. */ diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index f4067f8..1ec3e7d 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -43,7 +43,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver } this(Events events, Signals signals) - { + @nogc { m_events = events; setupEvent(); } @@ -139,7 +139,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver } private void setupEvent() - { + @nogc { if (m_event == EventID.invalid) { m_event = m_events.createInternal(); m_events.wait(m_event, &onDNSSignal); diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 65e75df..f6c5621 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -64,16 +64,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { } this() - { - m_loop = new Loop; - m_sockets = new SocketsDriver(m_loop); - m_events = new EventsDriver(m_loop, m_sockets); - m_signals = new SignalsDriver(m_loop); - m_timers = new TimerDriver; - m_core = new CoreDriver(m_loop, m_timers, m_events); - m_dns = new DNSDriver(m_events, m_signals); - m_files = new FileDriver(m_events); - m_watchers = new WatcherDriver(m_events); + @nogc @trusted { + m_loop = mallocT!Loop; + m_sockets = mallocT!SocketsDriver(m_loop); + m_events = mallocT!EventsDriver(m_loop, m_sockets); + m_signals = mallocT!SignalsDriver(m_loop); + m_timers = mallocT!TimerDriver; + m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); + m_dns = mallocT!DNSDriver(m_events, m_signals); + m_files = mallocT!FileDriver(m_events); + m_watchers = mallocT!WatcherDriver(m_events); } // force overriding these in the (final) sub classes to avoid virtual calls @@ -96,6 +96,19 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_core.dispose(); m_loop.dispose(); m_loop = null; + + try () @trusted { + freeT(m_watchers); + freeT(m_files); + freeT(m_dns); + freeT(m_core); + freeT(m_timers); + freeT(m_signals); + freeT(m_events); + freeT(m_sockets); + freeT(m_loop); + } (); + catch (Exception e) assert(false, e.msg); } } @@ -121,29 +134,32 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; } - protected this(Loop loop, Timers timers, Events events) - { + this(Loop loop, Timers timers, Events events) + @nogc { m_loop = loop; m_timers = timers; m_events = events; m_wakeupEvent = events.createInternal(); static if (__VERSION__ >= 2074) - m_threadCallbackMutex = new shared Mutex; + m_threadCallbackMutex = mallocT!(shared(Mutex)); else { - () @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); + () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } (); } - m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); + m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t))); m_threadCallbacks.reserve(1000); } - protected final void dispose() + final void dispose() { executeThreadCallbacks(); m_events.releaseRef(m_wakeupEvent); - atomicStore(m_threadCallbackMutex, null); m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized! + try { + () @trusted { freeT(m_threadCallbackMutex); } (); + () @trusted { freeT(m_threadCallbacks); } (); + } catch (Exception e) assert(false, e.msg); } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @@ -274,11 +290,11 @@ package class PosixEventLoop { protected abstract bool doProcessEvents(Duration dur); /// Registers the FD for general notification reception. - protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true); + protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc; /// Unregisters the FD for general notification reception. - protected abstract void unregisterFD(FD fd, EventMask mask); + protected abstract void unregisterFD(FD fd, EventMask mask) @nogc; /// Updates the event mask to use for listening for notifications. - protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true); + protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc; final protected void notify(EventType evt)(FD fd) { @@ -342,7 +358,7 @@ package class PosixEventLoop { } package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { + @system @nogc { FDSlot* fds = &m_fds[descriptor].common; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); diff --git a/source/eventcore/drivers/posix/epoll.d b/source/eventcore/drivers/posix/epoll.d index cb11929..e33f7d2 100644 --- a/source/eventcore/drivers/posix/epoll.d +++ b/source/eventcore/drivers/posix/epoll.d @@ -5,7 +5,7 @@ numbers of concurrently open sockets. */ module eventcore.drivers.posix.epoll; -@safe: /*@nogc:*/ nothrow: +@safe @nogc nothrow: version (linux): @@ -22,17 +22,16 @@ static if (!is(typeof(SOCK_CLOEXEC))) enum SOCK_CLOEXEC = 0x80000; final class EpollEventLoop : PosixEventLoop { -@safe: nothrow: +@safe nothrow: private { int m_epoll; - epoll_event[] m_events; + epoll_event[100] m_events; } this() - { + @nogc { m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } (); - m_events.length = 100; } override bool doProcessEvents(Duration timeout) @@ -60,7 +59,7 @@ final class EpollEventLoop : PosixEventLoop { } override void dispose() - { + @nogc { import core.sys.posix.unistd : close; close(m_epoll); } diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 6cff6bb..ba51c7b 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -4,7 +4,8 @@ module eventcore.drivers.posix.events; import eventcore.driver; import eventcore.drivers.posix.driver; import eventcore.internal.consumablequeue : ConsumableQueue; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : nogc_assert, mallocT, freeT; + version (linux) { nothrow @nogc extern (C) int eventfd(uint initval, int flags); @@ -25,21 +26,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS Loop m_loop; Sockets m_sockets; ubyte[ulong.sizeof] m_buf; - version (linux) {} - else { - // TODO: avoid the overhead of a mutex backed map here - import core.sync.mutex : Mutex; - Mutex m_eventsMutex; - EventID[DatagramSocketFD] m_events; - } } this(Loop loop, Sockets sockets) - { + @nogc { m_loop = loop; m_sockets = sockets; - version (linux) {} - else m_eventsMutex = new Mutex; } package @property Loop loop() { return m_loop; } @@ -50,14 +42,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } package(eventcore) EventID createInternal(bool is_internal = true) - { + @nogc { version (linux) { auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (eid == -1) return EventID.invalid; auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(new ConsumableQueue!EventCallback, false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -83,7 +75,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } else { // fake missing socketpair support on Windows import std.socket : InternetAddress; - auto addr = new InternetAddress(0x7F000001, 0); + scope addr = new InternetAddress(0x7F000001, 0); auto s = m_sockets.createDatagramSocketInternal(addr, null, true); if (s == DatagramSocketFD.invalid) return EventID.invalid; fd[0] = cast(sock_t)s; @@ -106,18 +98,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } } - m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); // use the second socket as the event ID and as the sending end for // other threads auto id = cast(EventID)fd[1]; - try { - synchronized (m_eventsMutex) - m_events[s] = id; - } catch (Exception e) assert(false, e.msg); + try m_sockets.userData!EventID(s) = id; + catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); assert(getRC(id) == 1); return id; } @@ -142,7 +132,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override void trigger(EventID event, bool notify_all) - shared @trusted { + shared @trusted @nogc { import core.atomic : atomicStore; auto thisus = cast(PosixEventDriverEvents)this; assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); @@ -154,7 +144,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override void wait(EventID event, EventCallback on_event) - { + @nogc { if (!isInternal(event)) m_loop.m_waiterCount++; getSlot(event).waiters.put(on_event); } @@ -183,13 +173,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS version (linux) {} else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) - { - m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - EventID evt; + @nogc { + m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { - synchronized (m_eventsMutex) - evt = m_events[s]; - onEvent(evt); + EventID evt = m_sockets.userData!EventID(s); + scope doit = { onEvent(evt); }; // cast to nogc + () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } } @@ -201,13 +190,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } final override bool releaseRef(EventID descriptor) - { + @nogc { nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); if (--getRC(descriptor) == 0) { if (!isInternal(descriptor)) m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; () @trusted nothrow { - try .destroy(getSlot(descriptor).waiters); + try freeT(getSlot(descriptor).waiters); catch (Exception e) nogc_assert(false, e.msg); } (); version (linux) { @@ -216,10 +205,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto rs = getSlot(descriptor).recvSocket; m_sockets.cancelReceive(rs); m_sockets.releaseRef(rs); - try { - synchronized (m_eventsMutex) - m_events.remove(rs); - } catch (Exception e) nogc_assert(false, e.msg); } m_loop.clearFD!EventSlot(descriptor); version (Posix) close(cast(int)descriptor); @@ -235,9 +220,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } private EventSlot* getSlot(EventID id) - { + @nogc { nogc_assert(id < m_loop.m_fds.length, "Invalid event ID."); - return () @trusted { return &m_loop.m_fds[id].event(); } (); + return () @trusted @nogc { return &m_loop.m_fds[id].event(); } (); } private ref uint getRC(EventID id) @@ -246,7 +231,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } private bool isInternal(EventID id) - { + @nogc { return getSlot(id).isInternal; } } diff --git a/source/eventcore/drivers/posix/kqueue.d b/source/eventcore/drivers/posix/kqueue.d index e4c3ac6..959f75d 100644 --- a/source/eventcore/drivers/posix/kqueue.d +++ b/source/eventcore/drivers/posix/kqueue.d @@ -34,14 +34,14 @@ alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop; final class KqueueEventLoop : PosixEventLoop { private { int m_queue; - kevent_t[] m_changes; - kevent_t[] m_events; + size_t m_changeCount = 0; + kevent_t[100] m_changes; + kevent_t[100] m_events; } this() - @safe nothrow { + @safe nothrow @nogc { m_queue = () @trusted { return kqueue(); } (); - m_events.length = 100; assert(m_queue >= 0, "Failed to create kqueue."); } @@ -57,9 +57,8 @@ final class KqueueEventLoop : PosixEventLoop { ts.tv_sec = cast(time_t)secs; ts.tv_nsec = cast(uint)hnsecs * 100; - auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changes.length, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); - m_changes.length = 0; - m_changes.assumeSafeAppend(); + auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); + m_changeCount = 0; //print("kevent returned %s", ret); @@ -97,11 +96,11 @@ final class KqueueEventLoop : PosixEventLoop { if (edge_triggered) ev.flags |= EV_CLEAR; if (mask & EventMask.read) { ev.filter = EVFILT_READ; - m_changes ~= ev; + putChange(ev); } if (mask & EventMask.write) { ev.filter = EVFILT_WRITE; - m_changes ~= ev; + putChange(ev); } //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; } @@ -111,7 +110,7 @@ final class KqueueEventLoop : PosixEventLoop { kevent_t ev; ev.ident = fd; ev.flags = EV_DELETE; - m_changes ~= ev; + putChange(ev); } override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @@ -124,16 +123,26 @@ final class KqueueEventLoop : PosixEventLoop { ev.filter = EVFILT_READ; ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE; if (edge_triggered) ev.flags |= EV_CLEAR; - m_changes ~= ev; + putChange(ev); } if (changes & EventMask.write) { ev.filter = EVFILT_WRITE; ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE; if (edge_triggered) ev.flags |= EV_CLEAR; - m_changes ~= ev; + putChange(ev); } //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; } + + private void putChange(ref kevent_t ev) + @safe nothrow @nogc { + m_changes[m_changeCount++] = ev; + if (m_changeCount == m_changes.length) { + auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) (); + assert(ret == 0); + m_changeCount = 0; + } + } } diff --git a/source/eventcore/drivers/posix/signals.d b/source/eventcore/drivers/posix/signals.d index 8297ec9..fdd45ae 100644 --- a/source/eventcore/drivers/posix/signals.d +++ b/source/eventcore/drivers/posix/signals.d @@ -17,7 +17,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna private Loop m_loop; - this(Loop loop) { m_loop = loop; } + this(Loop loop) @nogc { m_loop = loop; } override SignalListenID listen(int sig, SignalCallback on_signal) { diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 65c5157..c647975 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -654,7 +654,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false) - { + @nogc { auto fd = DatagramSocketFD(socket); if (m_loop.m_fds[fd].common.refCount) // FD already in use? return DatagramSocketFD.init; @@ -742,8 +742,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets on_receive_finish(socket, IOStatus.ok, ret, src_addrc); } + package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish) + @trusted @nogc { + scope void delegate() @safe nothrow do_it = { + receive(socket, buffer, mode, on_receive_finish); + }; + (cast(void delegate() @safe nothrow @nogc)do_it)(); + } + void cancelReceive(DatagramSocketFD socket) - { + @nogc { assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.m_fds[socket].datagramSocket.readBuffer = null; @@ -855,7 +863,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } final override bool releaseRef(SocketFD fd) - { + @nogc { import taggedalgebraic : hasType; auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 0dd1f1a..4af45a0 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -245,7 +245,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat } this(Events events) - { + @nogc { m_events = events; }