Merge pull request #86 from vibe-d/lazy_init

Lazy driver initialization
This commit is contained in:
Sönke Ludwig 2018-10-25 12:07:20 +02:00 committed by GitHub
commit 5deeced2b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 405 additions and 211 deletions

View file

@ -8,7 +8,7 @@ targetType "library"
libs "anl" "resolv" platform="linux" libs "anl" "resolv" platform="linux"
libs "ws2_32" "user32" platform="windows-dmd" libs "ws2_32" "user32" platform="windows-dmd"
dependency "taggedalgebraic" version="~>0.10.4" dependency "taggedalgebraic" version="~>0.10.12"
configuration "epoll" { configuration "epoll" {
platforms "linux" platforms "linux"

View file

@ -7,6 +7,7 @@ import eventcore.drivers.posix.epoll;
import eventcore.drivers.posix.kqueue; import eventcore.drivers.posix.kqueue;
import eventcore.drivers.libasync; import eventcore.drivers.libasync;
import eventcore.drivers.winapi.driver; import eventcore.drivers.winapi.driver;
import eventcore.internal.utils : mallocT, freeT;
version (EventcoreEpollDriver) alias NativeEventDriver = EpollEventDriver; version (EventcoreEpollDriver) alias NativeEventDriver = EpollEventDriver;
else version (EventcoreKqueueDriver) alias NativeEventDriver = KqueueEventDriver; else version (EventcoreKqueueDriver) alias NativeEventDriver = KqueueEventDriver;
@ -19,8 +20,11 @@ else alias NativeEventDriver = EventDriver;
@safe @nogc nothrow { @safe @nogc nothrow {
static if (is(NativeEventDriver == EventDriver)) static if (is(NativeEventDriver == EventDriver))
assert(s_driver !is null, "setupEventDriver() was not called for this thread."); assert(s_driver !is null, "setupEventDriver() was not called for this thread.");
else else {
assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); if (!s_driver) {
s_driver = mallocT!NativeEventDriver();
}
}
return s_driver; return s_driver;
} }
@ -30,7 +34,6 @@ static if (!is(NativeEventDriver == EventDriver)) {
if (!s_isMainThread) { if (!s_isMainThread) {
if (!s_initCount++) { if (!s_initCount++) {
assert(s_driver is null); assert(s_driver is null);
s_driver = new NativeEventDriver;
} }
} }
} }
@ -38,15 +41,18 @@ static if (!is(NativeEventDriver == EventDriver)) {
static ~this() static ~this()
{ {
if (!s_isMainThread) { if (!s_isMainThread) {
if (!--s_initCount) if (!--s_initCount) {
if (s_driver) {
s_driver.dispose(); s_driver.dispose();
freeT(s_driver);
}
}
} }
} }
shared static this() shared static this()
{ {
if (!s_initCount++) { if (!s_initCount++) {
s_driver = new NativeEventDriver;
s_isMainThread = true; s_isMainThread = true;
} }
} }
@ -54,7 +60,10 @@ static if (!is(NativeEventDriver == EventDriver)) {
shared static ~this() shared static ~this()
{ {
if (!--s_initCount) { if (!--s_initCount) {
if (s_driver) {
s_driver.dispose(); s_driver.dispose();
freeT(s_driver);
}
} }
} }
} else { } else {

View file

@ -352,20 +352,42 @@ interface EventDriverSockets {
/** Retrieves a reference to a user-defined value associated with a descriptor. /** Retrieves a reference to a user-defined value associated with a descriptor.
*/ */
@property final ref T userData(T, FD)(FD descriptor) @property final ref T userData(T, FD)(FD descriptor) @trusted @nogc
@trusted { if (hasNoGCLifetime!T)
{
import std.conv : emplace;
static void init(void* ptr) @nogc { emplace(cast(T*)ptr); }
static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// ditto
deprecated("Only @nogc constructible and destructible user data allowed.")
@property final ref T userData(T, FD)(FD descriptor) @trusted
if (!hasNoGCLifetime!T)
{
import std.conv : emplace; import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); } static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); static if (__traits(compiles, () nothrow { init(null); destr(null); }))
alias F = void function(void*) @nogc nothrow;
else alias F = void function(void*) @nogc;
return *cast(T*)rawUserData(descriptor, T.sizeof, cast(F)&init, cast(F)&destr);
} }
/// Low-level user data access. Use `getUserData` instead. /// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc;
/// ditto /// ditto
protected void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc;
/// ditto /// ditto
protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc;
}
enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); });
unittest {
static struct S1 {}
static struct S2 { ~this() { new int; } }
static assert(hasNoGCLifetime!S1);
static assert(!hasNoGCLifetime!S2);
} }
@ -642,7 +664,7 @@ alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change); alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change);
@system alias DataInitializer = void function(void*); @system alias DataInitializer = void function(void*) @nogc;
enum ExitReason { enum ExitReason {
timeout, timeout,

View file

@ -43,7 +43,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
} }
this(Events events, Signals signals) this(Events events, Signals signals)
{ @nogc {
m_events = events; m_events = events;
setupEvent(); setupEvent();
} }
@ -139,7 +139,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
} }
private void setupEvent() private void setupEvent()
{ @nogc {
if (m_event == EventID.invalid) { if (m_event == EventID.invalid) {
m_event = m_events.createInternal(); m_event = m_events.createInternal();
m_events.wait(m_event, &onDNSSignal); m_events.wait(m_event, &onDNSSignal);

View file

@ -64,16 +64,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
} }
this() this()
{ @nogc @trusted {
m_loop = new Loop; m_loop = mallocT!Loop;
m_sockets = new SocketsDriver(m_loop); m_sockets = mallocT!SocketsDriver(m_loop);
m_events = new EventsDriver(m_loop, m_sockets); m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = new SignalsDriver(m_loop); m_signals = mallocT!SignalsDriver(m_loop);
m_timers = new TimerDriver; m_timers = mallocT!TimerDriver;
m_core = new CoreDriver(m_loop, m_timers, m_events); m_core = mallocT!CoreDriver(m_loop, m_timers, m_events);
m_dns = new DNSDriver(m_events, m_signals); m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events); m_files = mallocT!FileDriver(m_events);
m_watchers = new WatcherDriver(m_events); m_watchers = mallocT!WatcherDriver(m_events);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls
@ -95,7 +95,19 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_dns.dispose(); m_dns.dispose();
m_core.dispose(); m_core.dispose();
m_loop.dispose(); m_loop.dispose();
m_loop = null;
try () @trusted {
freeT(m_watchers);
freeT(m_files);
freeT(m_dns);
freeT(m_core);
freeT(m_timers);
freeT(m_signals);
freeT(m_events);
freeT(m_sockets);
freeT(m_loop);
} ();
catch (Exception e) assert(false, e.msg);
} }
} }
@ -121,29 +133,32 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
} }
protected this(Loop loop, Timers timers, Events events) this(Loop loop, Timers timers, Events events)
{ @nogc {
m_loop = loop; m_loop = loop;
m_timers = timers; m_timers = timers;
m_events = events; m_events = events;
m_wakeupEvent = events.createInternal(); m_wakeupEvent = events.createInternal();
static if (__VERSION__ >= 2074) static if (__VERSION__ >= 2074)
m_threadCallbackMutex = new shared Mutex; m_threadCallbackMutex = mallocT!(shared(Mutex));
else { else {
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } ();
} }
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)));
m_threadCallbacks.reserve(1000); m_threadCallbacks.reserve(1000);
} }
protected final void dispose() final void dispose()
{ {
executeThreadCallbacks(); executeThreadCallbacks();
m_events.releaseRef(m_wakeupEvent); m_events.releaseRef(m_wakeupEvent);
atomicStore(m_threadCallbackMutex, null);
m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized! m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized!
try {
() @trusted { freeT(m_threadCallbackMutex); } ();
() @trusted { freeT(m_threadCallbacks); } ();
} catch (Exception e) assert(false, e.msg);
} }
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
@ -274,11 +289,11 @@ package class PosixEventLoop {
protected abstract bool doProcessEvents(Duration dur); protected abstract bool doProcessEvents(Duration dur);
/// Registers the FD for general notification reception. /// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true); protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc;
/// Unregisters the FD for general notification reception. /// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd, EventMask mask); protected abstract void unregisterFD(FD fd, EventMask mask) @nogc;
/// Updates the event mask to use for listening for notifications. /// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true); protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc;
final protected void notify(EventType evt)(FD fd) final protected void notify(EventType evt)(FD fd)
{ {
@ -342,7 +357,7 @@ package class PosixEventLoop {
} }
package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system @nogc {
FDSlot* fds = &m_fds[descriptor].common; FDSlot* fds = &m_fds[descriptor].common;
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");

View file

@ -5,7 +5,7 @@
numbers of concurrently open sockets. numbers of concurrently open sockets.
*/ */
module eventcore.drivers.posix.epoll; module eventcore.drivers.posix.epoll;
@safe: /*@nogc:*/ nothrow: @safe @nogc nothrow:
version (linux): version (linux):
@ -22,17 +22,18 @@ static if (!is(typeof(SOCK_CLOEXEC)))
enum SOCK_CLOEXEC = 0x80000; enum SOCK_CLOEXEC = 0x80000;
final class EpollEventLoop : PosixEventLoop { final class EpollEventLoop : PosixEventLoop {
@safe: nothrow: @safe nothrow:
private { private {
int m_epoll; int m_epoll;
epoll_event[] m_events; epoll_event[100] m_events;
} }
this() this()
{ @nogc {
m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } (); assumeSafeNoGC({
m_events.length = 100; m_epoll = epoll_create1(SOCK_CLOEXEC);
});
} }
override bool doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)
@ -60,7 +61,7 @@ final class EpollEventLoop : PosixEventLoop {
} }
override void dispose() override void dispose()
{ @nogc {
import core.sys.posix.unistd : close; import core.sys.posix.unistd : close;
close(m_epoll); close(m_epoll);
} }
@ -74,13 +75,17 @@ final class EpollEventLoop : PosixEventLoop {
if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP;
ev.data.fd = cast(int)fd; ev.data.fd = cast(int)fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); } (); assumeSafeNoGC({
epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev);
});
} }
override void unregisterFD(FD fd, EventMask mask) override void unregisterFD(FD fd, EventMask mask)
{ {
debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd); debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd);
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); } (); assumeSafeNoGC({
epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null);
});
} }
override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true)
@ -93,7 +98,9 @@ final class EpollEventLoop : PosixEventLoop {
if (mask & EventMask.write) ev.events |= EPOLLOUT; if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP;
ev.data.fd = cast(int)fd; ev.data.fd = cast(int)fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev); } (); assumeSafeNoGC({
epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev);
});
} }
} }
@ -103,3 +110,8 @@ private timeval toTimeVal(Duration dur)
dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
return tvdur; return tvdur;
} }
private void assumeSafeNoGC(scope void delegate() nothrow doit)
@trusted {
(cast(void delegate() nothrow @nogc)doit)();
}

View file

@ -4,7 +4,8 @@ module eventcore.drivers.posix.events;
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.posix.driver; import eventcore.drivers.posix.driver;
import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : nogc_assert, mallocT, freeT;
version (linux) { version (linux) {
nothrow @nogc extern (C) int eventfd(uint initval, int flags); nothrow @nogc extern (C) int eventfd(uint initval, int flags);
@ -25,21 +26,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
Loop m_loop; Loop m_loop;
Sockets m_sockets; Sockets m_sockets;
ubyte[ulong.sizeof] m_buf; ubyte[ulong.sizeof] m_buf;
version (linux) {}
else {
// TODO: avoid the overhead of a mutex backed map here
import core.sync.mutex : Mutex;
Mutex m_eventsMutex;
EventID[DatagramSocketFD] m_events;
}
} }
this(Loop loop, Sockets sockets) this(Loop loop, Sockets sockets)
{ @nogc {
m_loop = loop; m_loop = loop;
m_sockets = sockets; m_sockets = sockets;
version (linux) {}
else m_eventsMutex = new Mutex;
} }
package @property Loop loop() { return m_loop; } package @property Loop loop() { return m_loop; }
@ -50,14 +42,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
package(eventcore) EventID createInternal(bool is_internal = true) package(eventcore) EventID createInternal(bool is_internal = true)
{ @nogc {
version (linux) { version (linux) {
auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (eid == -1) return EventID.invalid; if (eid == -1) return EventID.invalid;
auto id = cast(EventID)eid; auto id = cast(EventID)eid;
// FIXME: avoid dynamic memory allocation for the queue // FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal, m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal)); EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal));
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
@ -83,7 +75,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} else { } else {
// fake missing socketpair support on Windows // fake missing socketpair support on Windows
import std.socket : InternetAddress; import std.socket : InternetAddress;
auto addr = new InternetAddress(0x7F000001, 0); scope addr = new InternetAddress(0x7F000001, 0);
auto s = m_sockets.createDatagramSocketInternal(addr, null, true); auto s = m_sockets.createDatagramSocketInternal(addr, null, true);
if (s == DatagramSocketFD.invalid) return EventID.invalid; if (s == DatagramSocketFD.invalid) return EventID.invalid;
fd[0] = cast(sock_t)s; fd[0] = cast(sock_t)s;
@ -106,18 +98,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
} }
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
// use the second socket as the event ID and as the sending end for // use the second socket as the event ID and as the sending end for
// other threads // other threads
auto id = cast(EventID)fd[1]; auto id = cast(EventID)fd[1];
try { try m_sockets.userData!EventID(s) = id;
synchronized (m_eventsMutex) catch (Exception e) assert(false, e.msg);
m_events[s] = id;
} catch (Exception e) assert(false, e.msg);
// FIXME: avoid dynamic memory allocation for the queue // FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal, m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s)); EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s));
assert(getRC(id) == 1); assert(getRC(id) == 1);
return id; return id;
} }
@ -142,7 +132,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
final override void trigger(EventID event, bool notify_all) final override void trigger(EventID event, bool notify_all)
shared @trusted { shared @trusted @nogc {
import core.atomic : atomicStore; import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriverEvents)this; auto thisus = cast(PosixEventDriverEvents)this;
assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
@ -154,7 +144,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
final override void wait(EventID event, EventCallback on_event) final override void wait(EventID event, EventCallback on_event)
{ @nogc {
if (!isInternal(event)) m_loop.m_waiterCount++; if (!isInternal(event)) m_loop.m_waiterCount++;
getSlot(event).waiters.put(on_event); getSlot(event).waiters.put(on_event);
} }
@ -183,13 +173,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
version (linux) {} version (linux) {}
else { else {
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
{ @nogc {
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
EventID evt;
try { try {
synchronized (m_eventsMutex) EventID evt = m_sockets.userData!EventID(s);
evt = m_events[s]; scope doit = { onEvent(evt); }; // cast to nogc
onEvent(evt); () @trusted { (cast(void delegate() @nogc)doit)(); } ();
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
} }
@ -201,13 +190,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
final override bool releaseRef(EventID descriptor) final override bool releaseRef(EventID descriptor)
{ @nogc {
nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
if (--getRC(descriptor) == 0) { if (--getRC(descriptor) == 0) {
if (!isInternal(descriptor)) if (!isInternal(descriptor))
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
() @trusted nothrow { () @trusted nothrow {
try .destroy(getSlot(descriptor).waiters); try freeT(getSlot(descriptor).waiters);
catch (Exception e) nogc_assert(false, e.msg); catch (Exception e) nogc_assert(false, e.msg);
} (); } ();
version (linux) { version (linux) {
@ -216,10 +205,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
auto rs = getSlot(descriptor).recvSocket; auto rs = getSlot(descriptor).recvSocket;
m_sockets.cancelReceive(rs); m_sockets.cancelReceive(rs);
m_sockets.releaseRef(rs); m_sockets.releaseRef(rs);
try {
synchronized (m_eventsMutex)
m_events.remove(rs);
} catch (Exception e) nogc_assert(false, e.msg);
} }
m_loop.clearFD!EventSlot(descriptor); m_loop.clearFD!EventSlot(descriptor);
version (Posix) close(cast(int)descriptor); version (Posix) close(cast(int)descriptor);
@ -235,9 +220,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
private EventSlot* getSlot(EventID id) private EventSlot* getSlot(EventID id)
{ @nogc {
nogc_assert(id < m_loop.m_fds.length, "Invalid event ID."); nogc_assert(id < m_loop.m_fds.length, "Invalid event ID.");
return () @trusted { return &m_loop.m_fds[id].event(); } (); return () @trusted @nogc { return &m_loop.m_fds[id].event(); } ();
} }
private ref uint getRC(EventID id) private ref uint getRC(EventID id)
@ -246,7 +231,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} }
private bool isInternal(EventID id) private bool isInternal(EventID id)
{ @nogc {
return getSlot(id).isInternal; return getSlot(id).isInternal;
} }
} }

View file

@ -34,14 +34,14 @@ alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop;
final class KqueueEventLoop : PosixEventLoop { final class KqueueEventLoop : PosixEventLoop {
private { private {
int m_queue; int m_queue;
kevent_t[] m_changes; size_t m_changeCount = 0;
kevent_t[] m_events; kevent_t[100] m_changes;
kevent_t[100] m_events;
} }
this() this()
@safe nothrow { @safe nothrow @nogc {
m_queue = () @trusted { return kqueue(); } (); m_queue = () @trusted { return kqueue(); } ();
m_events.length = 100;
assert(m_queue >= 0, "Failed to create kqueue."); assert(m_queue >= 0, "Failed to create kqueue.");
} }
@ -57,9 +57,8 @@ final class KqueueEventLoop : PosixEventLoop {
ts.tv_sec = cast(time_t)secs; ts.tv_sec = cast(time_t)secs;
ts.tv_nsec = cast(uint)hnsecs * 100; ts.tv_nsec = cast(uint)hnsecs * 100;
auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changes.length, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts);
m_changes.length = 0; m_changeCount = 0;
m_changes.assumeSafeAppend();
//print("kevent returned %s", ret); //print("kevent returned %s", ret);
@ -97,11 +96,11 @@ final class KqueueEventLoop : PosixEventLoop {
if (edge_triggered) ev.flags |= EV_CLEAR; if (edge_triggered) ev.flags |= EV_CLEAR;
if (mask & EventMask.read) { if (mask & EventMask.read) {
ev.filter = EVFILT_READ; ev.filter = EVFILT_READ;
m_changes ~= ev; putChange(ev);
} }
if (mask & EventMask.write) { if (mask & EventMask.write) {
ev.filter = EVFILT_WRITE; ev.filter = EVFILT_WRITE;
m_changes ~= ev; putChange(ev);
} }
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
} }
@ -111,7 +110,7 @@ final class KqueueEventLoop : PosixEventLoop {
kevent_t ev; kevent_t ev;
ev.ident = fd; ev.ident = fd;
ev.flags = EV_DELETE; ev.flags = EV_DELETE;
m_changes ~= ev; putChange(ev);
} }
override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true)
@ -124,16 +123,26 @@ final class KqueueEventLoop : PosixEventLoop {
ev.filter = EVFILT_READ; ev.filter = EVFILT_READ;
ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE; ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR; if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev; putChange(ev);
} }
if (changes & EventMask.write) { if (changes & EventMask.write) {
ev.filter = EVFILT_WRITE; ev.filter = EVFILT_WRITE;
ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE; ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR; if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev; putChange(ev);
} }
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
} }
private void putChange(ref kevent_t ev)
@safe nothrow @nogc {
m_changes[m_changeCount++] = ev;
if (m_changeCount == m_changes.length) {
auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) ();
assert(ret == 0);
m_changeCount = 0;
}
}
} }

View file

@ -17,7 +17,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
private Loop m_loop; private Loop m_loop;
this(Loop loop) { m_loop = loop; } this(Loop loop) @nogc { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal) override SignalListenID listen(int sig, SignalCallback on_signal)
{ {

View file

@ -654,7 +654,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false) package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false)
{ @nogc {
auto fd = DatagramSocketFD(socket); auto fd = DatagramSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use? if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init; return DatagramSocketFD.init;
@ -742,8 +742,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
on_receive_finish(socket, IOStatus.ok, ret, src_addrc); on_receive_finish(socket, IOStatus.ok, ret, src_addrc);
} }
package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish)
@trusted @nogc {
scope void delegate() @safe nothrow do_it = {
receive(socket, buffer, mode, on_receive_finish);
};
(cast(void delegate() @safe nothrow @nogc)do_it)();
}
void cancelReceive(DatagramSocketFD socket) void cancelReceive(DatagramSocketFD socket)
{ @nogc {
assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress."); assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
m_loop.m_fds[socket].datagramSocket.readBuffer = null; m_loop.m_fds[socket].datagramSocket.readBuffer = null;
@ -855,7 +863,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
final override bool releaseRef(SocketFD fd) final override bool releaseRef(SocketFD fd)
{ @nogc {
import taggedalgebraic : hasType; import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");

View file

@ -3,7 +3,7 @@ module eventcore.drivers.posix.watchers;
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.posix.driver; import eventcore.drivers.posix.driver;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : mallocT, freeT, nogc_assert;
final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers
@ -245,7 +245,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
} }
this(Events events) this(Events events)
{ @nogc {
m_events = events; m_events = events;
} }
@ -313,42 +313,40 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
private void onEvent(EventID evt) private void onEvent(EventID evt)
{ {
import std.algorithm.mutation : swap;
auto pt = evt in m_pollers; auto pt = evt in m_pollers;
if (!pt) return; if (!pt) return;
m_events.wait(evt, &onEvent); m_events.wait(evt, &onEvent);
FileChange[] changes; foreach (ref ch; pt.readChanges())
try synchronized (pt.m_changesMutex)
swap(changes, pt.m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
foreach (ref ch; changes)
pt.m_callback(cast(WatcherID)evt, ch); pt.m_callback(cast(WatcherID)evt, ch);
} }
private final class PollingThread : Thread { private final class PollingThread : Thread {
int refCount = 1;
EventID changesEvent;
private { private {
shared(Events) m_eventsDriver; shared(Events) m_eventsDriver;
Mutex m_changesMutex; Mutex m_changesMutex;
/*shared*/ FileChange[] m_changes; /*shared*/ FileChange[] m_changes; // protected by m_changesMutex
EventID m_changesEvent; // protected by m_changesMutex
immutable string m_basePath; immutable string m_basePath;
immutable bool m_recursive; immutable bool m_recursive;
immutable FileChangesCallback m_callback; immutable FileChangesCallback m_callback;
size_t m_entryCount;
struct Entry { final static class Entry {
Entry* parent; Entry parent;
string name; string name;
ulong size; ulong size;
long lastChange; long lastChange;
this(Entry parent, string name, ulong size, long lastChange)
{
this.parent = parent;
this.name = name;
this.size = size;
this.lastChange = lastChange;
}
string path() string path()
{ {
import std.path : buildPath; import std.path : buildPath;
@ -361,11 +359,13 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
} }
struct Key { struct Key {
Entry* parent; Entry parent;
string name; string name;
} }
Entry*[Key] m_entries; // used only within the polling thread
Entry[Key] m_entries;
size_t m_entryCount;
} }
this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback)
@ -374,7 +374,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
m_changesMutex = new Mutex; m_changesMutex = new Mutex;
m_eventsDriver = event_driver; m_eventsDriver = event_driver;
changesEvent = event; m_changesEvent = event;
m_basePath = path; m_basePath = path;
m_recursive = recursive; m_recursive = recursive;
m_callback = callback; m_callback = callback;
@ -387,10 +387,21 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
void dispose() void dispose()
nothrow { nothrow {
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
changesEvent = EventID.invalid; m_changesEvent = EventID.invalid;
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
FileChange[] readChanges()
nothrow {
import std.algorithm.mutation : swap;
FileChange[] changes;
try synchronized (m_changesMutex)
swap(changes, m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
return changes;
}
private void run() private void run()
nothrow @trusted { nothrow @trusted {
import core.time : msecs; import core.time : msecs;
@ -401,7 +412,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs; auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs;
while (true) { while (true) {
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) return; if (m_changesEvent == EventID.invalid) return;
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
auto remaining = timeout - Clock.currTime(UTC()); auto remaining = timeout - Clock.currTime(UTC());
if (remaining <= 0.msecs) break; if (remaining <= 0.msecs) break;
@ -411,9 +422,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
scan(true); scan(true);
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) return; if (m_changesEvent == EventID.invalid) return;
if (m_changes.length) if (m_changes.length)
m_eventsDriver.trigger(changesEvent, false); m_eventsDriver.trigger(m_changesEvent, false);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
} catch (Throwable th) { } catch (Throwable th) {
import core.stdc.stdio : fprintf, stderr; import core.stdc.stdio : fprintf, stderr;
@ -435,8 +446,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
@trusted nothrow { @trusted nothrow {
import std.algorithm.mutation : swap; import std.algorithm.mutation : swap;
Entry*[Key] new_entries; Entry[Key] new_entries;
Entry*[] added; Entry[] added;
size_t ec = 0; size_t ec = 0;
scan(null, generate_changes, new_entries, added, ec); scan(null, generate_changes, new_entries, added, ec);
@ -445,12 +456,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
if (generate_changes) if (generate_changes)
addChange(FileChangeKind.removed, e.key, e.value.isDir); addChange(FileChangeKind.removed, e.key, e.value.isDir);
try freeT(e.value);
catch (Exception e) assert(false, e.msg);
} }
static if (__VERSION__ >= 2079) {
import core.memory : __delete;
__delete(e.value);
} else mixin("delete e.value;");
} }
foreach (e; added) foreach (e; added)
@ -460,7 +468,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
m_entryCount = ec; m_entryCount = ec;
} }
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec) private void scan(Entry parent, bool generate_changes, ref Entry[Key] new_entries, ref Entry[] added, ref size_t ec)
@trusted nothrow { @trusted nothrow {
import std.file : SpanMode, dirEntries; import std.file : SpanMode, dirEntries;
import std.path : buildPath, baseName; import std.path : buildPath, baseName;
@ -486,7 +494,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
ec++; ec++;
m_entries.remove(key); m_entries.remove(key);
} else { } else {
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); auto e = mallocT!Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
new_entries[key] = e; new_entries[key] = e;
ec++; ec++;
if (generate_changes) added ~= e; if (generate_changes) added ~= e;

View file

@ -421,10 +421,10 @@ private struct StaticTaskPool {
if (!m_refCount++) { if (!m_refCount++) {
try { try {
m_pool = new TaskPool(4); m_pool = mallocT!TaskPool(4);
m_pool.isDaemon = true; m_pool.isDaemon = true;
} catch (Exception e) { } catch (Exception e) {
assert(false, "Failed to create file thread pool: "~e.msg); assert(false, e.msg);
} }
} }
@ -447,8 +447,10 @@ private struct StaticTaskPool {
if (fin_pool) { if (fin_pool) {
log("finishing thread pool"); log("finishing thread pool");
try fin_pool.finish(); try {
catch (Exception e) { fin_pool.finish(true);
freeT(fin_pool);
} catch (Exception e) {
//log("Failed to shut down file I/O thread pool."); //log("Failed to shut down file I/O thread pool.");
} }
} }

View file

@ -4,8 +4,9 @@
module eventcore.drivers.timer; module eventcore.drivers.timer;
import eventcore.driver; import eventcore.driver;
import eventcore.internal.consumablequeue;
import eventcore.internal.dlist; import eventcore.internal.dlist;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : mallocT, freeT, nogc_assert;
final class LoopTimeoutTimerDriver : EventDriverTimers { final class LoopTimeoutTimerDriver : EventDriverTimers {
@ -24,7 +25,7 @@ final class LoopTimeoutTimerDriver : EventDriverTimers {
TimerSlot*[TimerID] m_timers; TimerSlot*[TimerID] m_timers;
StackDList!TimerSlot m_timerQueue; StackDList!TimerSlot m_timerQueue;
TimerID m_lastTimerID; TimerID m_lastTimerID;
TimerSlot*[] m_firedTimers; ConsumableQueue!(TimerSlot*) m_firedTimers;
} }
static this() static this()
@ -32,6 +33,17 @@ final class LoopTimeoutTimerDriver : EventDriverTimers {
ms_allocator.parent = Mallocator.instance; ms_allocator.parent = Mallocator.instance;
} }
this()
@nogc @safe nothrow {
m_firedTimers = mallocT!(ConsumableQueue!(TimerSlot*));
}
~this()
@nogc @trusted nothrow {
try freeT(m_firedTimers);
catch (Exception e) assert(false, e.msg);
}
package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; } package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; }
final package Duration getNextTimeout(long stdtime) final package Duration getNextTimeout(long stdtime)
@ -53,27 +65,24 @@ final class LoopTimeoutTimerDriver : EventDriverTimers {
do tm.timeout += tm.repeatDuration; do tm.timeout += tm.repeatDuration;
while (tm.timeout <= stdtime); while (tm.timeout <= stdtime);
} else tm.pending = false; } else tm.pending = false;
m_firedTimers ~= tm; m_firedTimers.put(tm);
} }
foreach (tm; m_firedTimers) { auto processed_timers = m_firedTimers.consume();
foreach (tm; processed_timers) {
m_timerQueue.remove(tm); m_timerQueue.remove(tm);
if (tm.repeatDuration > 0) if (tm.repeatDuration > 0)
enqueueTimer(tm); enqueueTimer(tm);
} }
foreach (tm; m_firedTimers) { foreach (tm; processed_timers) {
auto cb = tm.callback; auto cb = tm.callback;
tm.callback = null; tm.callback = null;
if (cb) cb(tm.id); if (cb) cb(tm.id);
} }
bool any_fired = m_firedTimers.length > 0; return processed_timers.length > 0;
m_firedTimers.length = 0;
m_firedTimers.assumeSafeAppend();
return any_fired;
} }
final override TimerID create() final override TimerID create()

View file

@ -5,7 +5,7 @@ version (Windows):
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.timer; import eventcore.drivers.timer;
import eventcore.internal.consumablequeue; import eventcore.internal.consumablequeue;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : mallocT, freeT, nogc_assert;
import eventcore.internal.win32; import eventcore.internal.win32;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
import core.time : Duration; import core.time : Duration;
@ -21,8 +21,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
size_t m_waiterCount; size_t m_waiterCount;
DWORD m_tid; DWORD m_tid;
LoopTimeoutTimerDriver m_timers; LoopTimeoutTimerDriver m_timers;
HANDLE[] m_registeredEvents; HANDLE[MAXIMUM_WAIT_OBJECTS] m_registeredEvents;
void delegate() @safe nothrow[HANDLE] m_eventCallbacks; void delegate() @safe nothrow[MAXIMUM_WAIT_OBJECTS] m_registeredEventCallbacks;
DWORD m_registeredEventCount = 0;
HANDLE m_fileCompletionEvent; HANDLE m_fileCompletionEvent;
ConsumableQueue!IOEvent m_ioEvents; ConsumableQueue!IOEvent m_ioEvents;
@ -35,27 +36,36 @@ final class WinAPIEventDriverCore : EventDriverCore {
} }
this(LoopTimeoutTimerDriver timers) this(LoopTimeoutTimerDriver timers)
{ @nogc {
m_timers = timers; m_timers = timers;
m_tid = () @trusted { return GetCurrentThreadId(); } (); m_tid = () @trusted { return GetCurrentThreadId(); } ();
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
registerEvent(m_fileCompletionEvent); registerEvent(m_fileCompletionEvent);
m_ioEvents = new ConsumableQueue!IOEvent; m_ioEvents = mallocT!(ConsumableQueue!IOEvent);
static if (__VERSION__ >= 2074) static if (__VERSION__ >= 2074)
m_threadCallbackMutex = new shared Mutex; m_threadCallbackMutex = mallocT!(shared(Mutex));
else { else {
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } ();
} }
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)));
m_threadCallbacks.reserve(1000); m_threadCallbacks.reserve(1000);
} }
void dispose()
@trusted {
try {
freeT(m_threadCallbacks);
freeT(m_threadCallbackMutex);
freeT(m_ioEvents);
} catch (Exception e) assert(false, e.msg);
}
override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; } override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; }
package void addWaiter() { m_waiterCount++; } package void addWaiter() @nogc { m_waiterCount++; }
package void removeWaiter() package void removeWaiter()
{ @nogc {
assert(m_waiterCount > 0, "Decrementing waiter count below zero."); assert(m_waiterCount > 0, "Decrementing waiter count below zero.");
m_waiterCount--; m_waiterCount--;
} }
@ -159,7 +169,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
bool got_event; bool got_event;
DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max(max_wait.total!"msecs", 0), DWORD.max); DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max(max_wait.total!"msecs", 0), DWORD.max);
auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(m_registeredEventCount, m_registeredEvents.ptr,
timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } ();
while (!m_ioEvents.empty) { while (!m_ioEvents.empty) {
@ -168,9 +178,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
} }
if (ret == WAIT_IO_COMPLETION) got_event = true; if (ret == WAIT_IO_COMPLETION) got_event = true;
else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEventCount) {
if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) { if (auto cb = m_registeredEventCallbacks[ret - WAIT_OBJECT_0]) {
(*pc)(); cb();
got_event = true; got_event = true;
} }
} }
@ -209,9 +219,11 @@ final class WinAPIEventDriverCore : EventDriverCore {
package void registerEvent(HANDLE event, void delegate() @safe nothrow callback = null) package void registerEvent(HANDLE event, void delegate() @safe nothrow callback = null)
{ @nogc {
m_registeredEvents ~= event; assert(m_registeredEventCount < MAXIMUM_WAIT_OBJECTS, "Too many registered events.");
if (callback) m_eventCallbacks[event] = callback; m_registeredEvents[m_registeredEventCount] = event;
if (callback) m_registeredEventCallbacks[m_registeredEventCount] = callback;
m_registeredEventCount++;
} }
package SlotType* setupSlot(SlotType)(HANDLE h) package SlotType* setupSlot(SlotType)(HANDLE h)
@ -231,7 +243,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
} }
package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...) package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...)
{ @nogc {
import std.algorithm.searching : canFind; import std.algorithm.searching : canFind;
m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped));
} }

View file

@ -18,6 +18,7 @@ import eventcore.drivers.winapi.files;
import eventcore.drivers.winapi.signals; import eventcore.drivers.winapi.signals;
import eventcore.drivers.winapi.sockets; import eventcore.drivers.winapi.sockets;
import eventcore.drivers.winapi.watchers; import eventcore.drivers.winapi.watchers;
import eventcore.internal.utils : mallocT, freeT;
import core.sys.windows.windows; import core.sys.windows.windows;
static assert(HANDLE.sizeof <= FD.BaseType.sizeof); static assert(HANDLE.sizeof <= FD.BaseType.sizeof);
@ -39,23 +40,25 @@ final class WinAPIEventDriver : EventDriver {
static WinAPIEventDriver threadInstance; static WinAPIEventDriver threadInstance;
this() this()
@safe { @safe nothrow @nogc {
assert(threadInstance is null); assert(threadInstance is null);
threadInstance = this; threadInstance = this;
import std.exception : enforce; import std.exception : enforce;
WSADATA wd; WSADATA wd;
enforce(() @trusted { return WSAStartup(0x0202, &wd); } () == 0, "Failed to initialize WinSock");
m_signals = new WinAPIEventDriverSignals(); auto res = () @trusted { return WSAStartup(0x0202, &wd); } ();
m_timers = new LoopTimeoutTimerDriver(); assert(res == 0, "Failed to initialize WinSock");
m_core = new WinAPIEventDriverCore(m_timers);
m_events = new WinAPIEventDriverEvents(m_core); m_signals = mallocT!WinAPIEventDriverSignals();
m_files = new WinAPIEventDriverFiles(m_core); m_timers = mallocT!LoopTimeoutTimerDriver();
m_sockets = new WinAPIEventDriverSockets(m_core); m_core = mallocT!WinAPIEventDriverCore(m_timers);
m_dns = new WinAPIEventDriverDNS(); m_events = mallocT!WinAPIEventDriverEvents(m_core);
m_watchers = new WinAPIEventDriverWatchers(m_core); m_files = mallocT!WinAPIEventDriverFiles(m_core);
m_sockets = mallocT!WinAPIEventDriverSockets(m_core);
m_dns = mallocT!WinAPIEventDriverDNS();
m_watchers = mallocT!WinAPIEventDriverWatchers(m_core);
} }
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
@ -75,8 +78,20 @@ final class WinAPIEventDriver : EventDriver {
{ {
if (!m_events) return; if (!m_events) return;
m_events.dispose(); m_events.dispose();
m_events = null; m_core.dispose();
assert(threadInstance !is null); assert(threadInstance !is null);
threadInstance = null; threadInstance = null;
try () @trusted {
freeT(m_watchers);
freeT(m_dns);
freeT(m_sockets);
freeT(m_files);
freeT(m_events);
freeT(m_core);
freeT(m_timers);
freeT(m_signals);
} ();
catch (Exception e) assert(false, e.msg);
} }
} }

View file

@ -6,7 +6,7 @@ import eventcore.driver;
import eventcore.drivers.winapi.core; import eventcore.drivers.winapi.core;
import eventcore.internal.win32; import eventcore.internal.win32;
import eventcore.internal.consumablequeue; import eventcore.internal.consumablequeue;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : mallocT, freeT, nogc_assert;
final class WinAPIEventDriverEvents : EventDriverEvents { final class WinAPIEventDriverEvents : EventDriverEvents {
@ -31,10 +31,10 @@ final class WinAPIEventDriverEvents : EventDriverEvents {
} }
this(WinAPIEventDriverCore core) this(WinAPIEventDriverCore core)
{ @nogc {
m_core = core; m_core = core;
m_event = () @trusted { return CreateEvent(null, false, false, null); } (); m_event = () @trusted { return CreateEvent(null, false, false, null); } ();
m_pending = new ConsumableQueue!Trigger; // FIXME: avoid GC allocation m_pending = mallocT!(ConsumableQueue!Trigger); // FIXME: avoid GC allocation
InitializeCriticalSection(&m_mutex); InitializeCriticalSection(&m_mutex);
m_core.registerEvent(m_event, &triggerPending); m_core.registerEvent(m_event, &triggerPending);
} }
@ -42,7 +42,7 @@ final class WinAPIEventDriverEvents : EventDriverEvents {
void dispose() void dispose()
@trusted { @trusted {
scope (failure) assert(false); scope (failure) assert(false);
destroy(m_pending); freeT(m_pending);
} }
override EventID create() override EventID create()

View file

@ -17,7 +17,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
} }
this(WinAPIEventDriverCore core) this(WinAPIEventDriverCore core)
{ @nogc {
m_core = core; m_core = core;
} }

View file

@ -25,7 +25,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
this(WinAPIEventDriverCore core) this(WinAPIEventDriverCore core)
@trusted { @trusted @nogc {
m_tid = GetCurrentThreadId(); m_tid = GetCurrentThreadId();
m_core = core; m_core = core;
@ -406,7 +406,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
override void cancelRead(StreamSocketFD socket) override void cancelRead(StreamSocketFD socket)
@trusted { @trusted @nogc {
if (!m_sockets[socket].streamSocket.read.callback) return; if (!m_sockets[socket].streamSocket.read.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped); CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped);
m_sockets[socket].streamSocket.read.callback = null; m_sockets[socket].streamSocket.read.callback = null;
@ -414,7 +414,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
override void cancelWrite(StreamSocketFD socket) override void cancelWrite(StreamSocketFD socket)
@trusted { @trusted @nogc {
if (!m_sockets[socket].streamSocket.write.callback) return; if (!m_sockets[socket].streamSocket.write.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped); CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped);
m_sockets[socket].streamSocket.write.callback = null; m_sockets[socket].streamSocket.write.callback = null;
@ -549,7 +549,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
override void cancelReceive(DatagramSocketFD socket) override void cancelReceive(DatagramSocketFD socket)
@trusted { @trusted @nogc {
if (!m_sockets[socket].datagramSocket.read.callback) return; if (!m_sockets[socket].datagramSocket.read.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped); CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped);
m_sockets[socket].datagramSocket.read.callback = null; m_sockets[socket].datagramSocket.read.callback = null;
@ -643,7 +643,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
override void cancelSend(DatagramSocketFD socket) override void cancelSend(DatagramSocketFD socket)
@trusted { @trusted @nogc {
if (!m_sockets[socket].datagramSocket.write.callback) return; if (!m_sockets[socket].datagramSocket.write.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped); CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped);
m_sockets[socket].datagramSocket.write.callback = null; m_sockets[socket].datagramSocket.write.callback = null;
@ -719,7 +719,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
override bool releaseRef(SocketFD fd) override bool releaseRef(SocketFD fd)
{ @nogc {
import taggedalgebraic : hasType; import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_sockets[fd]; } (); auto slot = () @trusted { return &m_sockets[fd]; } ();
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");
@ -787,7 +787,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
private void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) private void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system @nogc {
SocketSlot* fds = &m_sockets[descriptor].common; SocketSlot* fds = &m_sockets[descriptor].common;
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");
@ -808,7 +808,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
package void clearSocketSlot(FD fd) package void clearSocketSlot(FD fd)
{ @nogc {
auto slot = () @trusted { return &m_sockets[fd]; } (); auto slot = () @trusted { return &m_sockets[fd]; } ();
if (slot.common.userDataDestructor) if (slot.common.userDataDestructor)
() @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } (); () @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } ();
@ -889,8 +889,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
} }
void setupWindowClass() nothrow void setupWindowClass()
@trusted { @trusted nothrow @nogc {
static __gshared registered = false; static __gshared registered = false;
if (registered) return; if (registered) return;

View file

@ -16,7 +16,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
} }
this(WinAPIEventDriverCore core) this(WinAPIEventDriverCore core)
{ @nogc {
m_core = core; m_core = core;
} }

View file

@ -1,5 +1,8 @@
module eventcore.internal.consumablequeue; module eventcore.internal.consumablequeue;
import eventcore.internal.utils : mallocNT, freeNT;
/** FIFO queue with support for chunk-wise consumption. /** FIFO queue with support for chunk-wise consumption.
*/ */
final class ConsumableQueue(T) final class ConsumableQueue(T)
@ -18,6 +21,12 @@ final class ConsumableQueue(T)
size_t m_pendingCount; size_t m_pendingCount;
} }
~this()
@trusted @nogc nothrow {
if (m_storage !is null)
freeNT(m_storage);
}
@property size_t length() const { return m_pendingCount; } @property size_t length() const { return m_pendingCount; }
@property bool empty() const { return length == 0; } @property bool empty() const { return length == 0; }
@ -43,11 +52,15 @@ final class ConsumableQueue(T)
while (new_capacity < min_capacity) new_capacity *= 2; while (new_capacity < min_capacity) new_capacity *= 2;
auto new_capacity_mask = new_capacity - 1; auto new_capacity_mask = new_capacity - 1;
auto new_storage = new Slot[new_capacity]; auto new_storage = mallocNT!Slot(new_capacity);
foreach (i; 0 .. m_consumedCount + m_pendingCount) foreach (i; 0 .. m_consumedCount + m_pendingCount)
new_storage[(m_first + i) & new_capacity_mask] = m_storage[(m_first + i) & m_capacityMask]; new_storage[(m_first + i) & new_capacity_mask] = m_storage[(m_first + i) & m_capacityMask];
() @trusted {
if (m_storage !is null)
freeNT(m_storage);
m_storage = new_storage; m_storage = new_storage;
} ();
m_capacityMask = new_capacity_mask; m_capacityMask = new_capacity_mask;
} }

View file

@ -1,5 +1,7 @@
module eventcore.internal.utils; module eventcore.internal.utils;
import core.memory : GC;
import std.traits : hasIndirections;
import taggedalgebraic; import taggedalgebraic;
@ -8,17 +10,83 @@ void print(ARGS...)(string str, ARGS args)
import std.format : formattedWrite; import std.format : formattedWrite;
StdoutRange r; StdoutRange r;
scope cb = () { scope cb = () {
scope (failure) assert(false); try (&r).formattedWrite(str, args);
(&r).formattedWrite(str, args); catch (Exception e) assert(false, e.msg);
}; };
(cast(void delegate() @nogc @safe nothrow)cb)(); (cast(void delegate() @nogc @safe nothrow)cb)();
r.put('\n'); r.put('\n');
} }
T mallocT(T, ARGS...)(ARGS args)
@trusted @nogc {
import core.stdc.stdlib : malloc;
import std.conv : emplace;
enum size = __traits(classInstanceSize, T);
auto ret = cast(T)malloc(size);
static if (hasIndirections!T)
GC.addRange(cast(void*)ret, __traits(classInstanceSize, T));
scope doit = { emplace!T((cast(void*)ret)[0 .. size], args); };
static if (__traits(compiles, () nothrow { typeof(doit).init(); })) // NOTE: doing the typeof thing here, because LDC 1.7.0 otherwise thinks doit gets escaped here
(cast(void delegate() @nogc nothrow)doit)();
else
(cast(void delegate() @nogc)doit)();
return ret;
}
void freeT(T)(ref T inst) @nogc
if (is(T == class))
{
import core.stdc.stdlib : free;
if (!inst) return;
noGCDestroy(inst);
static if (hasIndirections!T)
GC.removeRange(cast(void*)inst);
free(cast(void*)inst);
inst = null;
}
T[] mallocNT(T)(size_t cnt)
@trusted {
import core.stdc.stdlib : malloc;
import std.conv : emplace;
auto ret = (cast(T*)malloc(T.sizeof * cnt))[0 .. cnt];
static if (hasIndirections!T)
GC.addRange(cast(void*)ret, T.sizeof * cnt);
foreach (ref v; ret)
static if (!is(T == class))
emplace!T(&v);
else v = null;
return ret;
}
void freeNT(T)(ref T[] arr)
{
import core.stdc.stdlib : free;
foreach (ref v; arr)
static if (!is(T == class))
destroy(v);
static if (hasIndirections!T)
GC.removeRange(arr.ptr);
free(arr.ptr);
arr = null;
}
private void noGCDestroy(T)(ref T t)
@trusted {
// FIXME: only do this if the destructor chain is actually nogc
scope doit = { destroy(t); };
(cast(void delegate() @nogc)doit)();
}
private extern(C) Throwable.TraceInfo _d_traceContext(void* ptr = null); private extern(C) Throwable.TraceInfo _d_traceContext(void* ptr = null);
void nogc_assert(bool cond, string message, string file = __FILE__, int line = __LINE__) void nogc_assert(bool cond, string message, string file = __FILE__, int line = __LINE__)
@trusted nothrow { @trusted nothrow @nogc {
import core.stdc.stdlib : abort; import core.stdc.stdlib : abort;
import std.stdio : stderr; import std.stdio : stderr;
@ -28,12 +96,15 @@ void nogc_assert(bool cond, string message, string file = __FILE__, int line = _
assert(false); assert(false);
} }
scope doit = {
stderr.writefln("Assertion failure @%s(%s): %s", file, line, message); stderr.writefln("Assertion failure @%s(%s): %s", file, line, message);
stderr.writeln("------------------------"); stderr.writeln("------------------------");
if (auto info = _d_traceContext(null)) { if (auto info = _d_traceContext(null)) {
foreach (s; info) foreach (s; info)
stderr.writeln(s); stderr.writeln(s);
} else stderr.writeln("no stack trace available"); } else stderr.writeln("no stack trace available");
};
(cast(void delegate() @nogc)doit)(); // write and _d_traceContext are not nogc
} }
} }
@ -53,14 +124,11 @@ struct StdoutRange {
} }
struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) {
import core.memory : GC;
static assert(nextPOT(CHUNK_SIZE) == CHUNK_SIZE, static assert(nextPOT(CHUNK_SIZE) == CHUNK_SIZE,
"CHUNK_SIZE must be a power of two for performance reasons."); "CHUNK_SIZE must be a power of two for performance reasons.");
@safe: nothrow: @safe: nothrow:
import core.stdc.stdlib : calloc, free, malloc, realloc; import core.stdc.stdlib : calloc, free, malloc, realloc;
import std.traits : hasIndirections;
alias chunkSize = CHUNK_SIZE; alias chunkSize = CHUNK_SIZE;
@ -85,12 +153,13 @@ struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) {
@nogc { @nogc {
() @trusted { () @trusted {
foreach (i; 0 .. m_chunkCount) { foreach (i; 0 .. m_chunkCount) {
destroy(m_chunks[i]); destroy(*m_chunks[i]);
static if (hasIndirections!T) static if (hasIndirections!T)
GC.removeRange(m_chunks[i]); GC.removeRange(m_chunks[i]);
free(m_chunks[i]); free(m_chunks[i]);
} }
free(m_chunks.ptr); free(m_chunks.ptr);
m_chunks = null;
} (); } ();
m_chunkCount = 0; m_chunkCount = 0;
m_length = 0; m_length = 0;
@ -183,7 +252,7 @@ struct AlgebraicChoppedVector(TCommon, TSpecific...)
import std.format : format; import std.format : format;
string ret; string ret;
foreach (i, U; TSpecific) foreach (i, U; TSpecific)
ret ~= "@property ref TSpecific[%s] %s() nothrow @safe { return this.specific.get!(TSpecific[%s]); }\n" ret ~= "@property ref TSpecific[%s] %s() nothrow @safe @nogc { return this.specific.get!(TSpecific[%s]); }\n"
.format(i, U.Handle.name, i); .format(i, U.Handle.name, i);
return ret; return ret;
} }
@ -207,10 +276,13 @@ struct SmallIntegerSet(V : size_t)
size_t m_count; size_t m_count;
} }
@disable this(this);
@property bool empty() const { return m_count == 0; } @property bool empty() const { return m_count == 0; }
void insert(V i) void insert(V i)
{ {
assert(i >= 0);
foreach (j; 0 .. m_bits.length) { foreach (j; 0 .. m_bits.length) {
uint b = 1u << (i%32); uint b = 1u << (i%32);
i /= 32; i /= 32;
@ -223,6 +295,9 @@ struct SmallIntegerSet(V : size_t)
void remove(V i) void remove(V i)
{ {
assert(i >= 0);
if (i >= m_bits[0].length * 32) return;
foreach (j; 0 .. m_bits.length) { foreach (j; 0 .. m_bits.length) {
uint b = 1u << (i%32); uint b = 1u << (i%32);
i /= 32; i /= 32;