diff --git a/README.md b/README.md index 39f73c1..a034416 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ UDP Sockets | yes | yes | no | no USDS | yes | yes | no | no DNS | yes | yes | no | no Timers | yes | yes | yes | no -Events | yes | yes | no | no +Events | yes | yes | yes | no Signals | yes² | yes² | no | no Files | yes | yes | no | no UI Integration | no | no | yes | no diff --git a/source/eventcore/drivers/winapi.d b/source/eventcore/drivers/winapi.d index 5e3e2a2..025f643 100644 --- a/source/eventcore/drivers/winapi.d +++ b/source/eventcore/drivers/winapi.d @@ -11,6 +11,8 @@ version (Windows): import eventcore.driver; import eventcore.drivers.timer; +import eventcore.internal.consumablequeue : ConsumableQueue; +import eventcore.internal.utils; import taggedalgebraic; import core.sys.windows.windows; import core.sys.windows.winsock2; @@ -46,10 +48,10 @@ final class WinAPIEventDriver : EventDriver { WSADATA wd; enforce(() @trusted { return WSAStartup(0x0202, &wd); } () == 0, "Failed to initialize WinSock"); - m_events = new WinAPIEventDriverEvents(); m_signals = new WinAPIEventDriverSignals(); m_timers = new LoopTimeoutTimerDriver(); m_core = new WinAPIEventDriverCore(m_timers); + m_events = new WinAPIEventDriverEvents(m_core); m_files = new WinAPIEventDriverFiles(); m_sockets = new WinAPIEventDriverSockets(); m_dns = new WinAPIEventDriverDNS(); @@ -70,6 +72,7 @@ final class WinAPIEventDriver : EventDriver { override void dispose() { + m_events.dispose(); assert(threadInstance !is null); threadInstance = null; } @@ -83,6 +86,7 @@ final class WinAPIEventDriverCore : EventDriverCore { DWORD m_tid; LoopTimeoutTimerDriver m_timers; HANDLE[] m_registeredEvents; + void delegate() @safe nothrow[HANDLE] m_eventCallbacks; HANDLE m_fileCompletionEvent; HandleSlot[HANDLE] m_handles; // FIXME: use allocator based hash map @@ -93,7 +97,7 @@ final class WinAPIEventDriverCore : EventDriverCore { m_timers = timers; m_tid = () @trusted { return GetCurrentThreadId(); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); - m_registeredEvents ~= m_fileCompletionEvent; + registerEvent(m_fileCompletionEvent); } override size_t waiterCount() { return m_waiterCount; } @@ -170,6 +174,10 @@ final class WinAPIEventDriverCore : EventDriverCore { auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); + if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { + if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) + (*pc)(); + } /*if (ret == WAIT_OBJECT_0) { got_event = true; Win32TCPConnection[] to_remove; @@ -202,6 +210,13 @@ final class WinAPIEventDriverCore : EventDriverCore { return got_event; } + + private void registerEvent(HANDLE event, void delegate() @safe nothrow callback = null) + { + m_registeredEvents ~= event; + if (callback) m_eventCallbacks[event] = callback; + } + private ref SlotType setupSlot(SlotType)(HANDLE h) { assert(h !in m_handles, "Handle already in use."); @@ -397,39 +412,129 @@ final class WinAPIEventDriverFiles : EventDriverFiles { final class WinAPIEventDriverEvents : EventDriverEvents { @safe: /*@nogc:*/ nothrow: + private { + static struct Trigger { + EventID id; + bool notifyAll; + } + + static struct EventSlot { + uint refCount; + ConsumableQueue!EventCallback waiters; + } + + WinAPIEventDriverCore m_core; + HANDLE m_event; + EventSlot[EventID] m_events; + CRITICAL_SECTION m_mutex; + ConsumableQueue!Trigger m_pending; + uint m_idCounter; + } + + this(WinAPIEventDriverCore core) + { + m_core = core; + m_event = () @trusted { return CreateEvent(null, false, false, null); } (); + m_pending = new ConsumableQueue!Trigger; // FIXME: avoid GC allocation + InitializeCriticalSection(&m_mutex); + m_core.registerEvent(m_event, &triggerPending); + } + + void dispose() + @trusted { + scope (failure) assert(false); + destroy(m_pending); + } + override EventID create() { - assert(false, "TODO!"); + auto id = EventID(m_idCounter++); + if (id == EventID.invalid) id = EventID(m_idCounter++); + m_events[id] = EventSlot(1, new ConsumableQueue!EventCallback); // FIXME: avoid GC allocation + return id; } override void trigger(EventID event, bool notify_all = true) { - assert(false, "TODO!"); + auto pe = event in m_events; + assert(pe !is null, "Invalid event ID passed to triggerEvent."); + if (notify_all) { + foreach (w; pe.waiters.consume) + w(event); + } else { + if (!pe.waiters.empty) + pe.waiters.consumeOne()(event); + } } override void trigger(EventID event, bool notify_all = true) shared { - assert(false, "TODO!"); + import core.atomic : atomicStore; + auto pe = event in m_events; + assert(pe !is null, "Invalid event ID passed to shared triggerEvent."); + + () @trusted { + auto thisus = cast(WinAPIEventDriverEvents)this; + EnterCriticalSection(&thisus.m_mutex); + thisus.m_pending.put(Trigger(event, notify_all)); + LeaveCriticalSection(&thisus.m_mutex); + SetEvent(thisus.m_event); + } (); } override void wait(EventID event, EventCallback on_event) { - assert(false, "TODO!"); + return m_events[event].waiters.put(on_event); } override void cancelWait(EventID event, EventCallback on_event) { - assert(false, "TODO!"); + import std.algorithm.searching : countUntil; + import std.algorithm.mutation : remove; + + m_events[event].waiters.removePending(on_event); } override void addRef(EventID descriptor) { - assert(false, "TODO!"); + assert(m_events[descriptor].refCount > 0); + m_events[descriptor].refCount++; } override bool releaseRef(EventID descriptor) { - assert(false, "TODO!"); + auto pe = descriptor in m_events; + assert(pe.refCount > 0); + if (--pe.refCount == 0) { + () @trusted nothrow { + scope (failure) assert(false); + destroy(pe.waiters); + CloseHandle(idToHandle(descriptor)); + } (); + m_events.remove(descriptor); + return false; + } + return true; + } + + private void triggerPending() + { + while (true) { + Trigger t; + { + () @trusted { EnterCriticalSection(&m_mutex); } (); + scope (exit) () @trusted { LeaveCriticalSection(&m_mutex); } (); + if (m_pending.empty) break; + t = m_pending.consumeOne; + } + + trigger(t.id, t.notifyAll); + } + } + + private static HANDLE idToHandle(EventID event) + @trusted { + return cast(HANDLE)cast(int)event; } } diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 8018ae4..a806eb9 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -4,8 +4,7 @@ module eventcore.internal.consumablequeue; */ final class ConsumableQueue(T) { - @safe: - nothrow: + @safe nothrow: private { struct Slot {