Implement events for the WinAPI driver.

This commit is contained in:
Sönke Ludwig 2017-01-21 23:19:39 +01:00
parent f7ec3da756
commit 2a5252977e
3 changed files with 116 additions and 12 deletions

View file

@ -29,7 +29,7 @@ UDP Sockets | yes | yes | no | no
USDS | yes | yes | no | no USDS | yes | yes | no | no
DNS | yes | yes | no | no DNS | yes | yes | no | no
Timers | yes | yes | yes | no Timers | yes | yes | yes | no
Events | yes | yes | no | no Events | yes | yes | yes | no
Signals | yes² | yes² | no | no Signals | yes² | yes² | no | no
Files | yes | yes | no | no Files | yes | yes | no | no
UI Integration | no | no | yes | no UI Integration | no | no | yes | no

View file

@ -11,6 +11,8 @@ version (Windows):
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.timer; import eventcore.drivers.timer;
import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils;
import taggedalgebraic; import taggedalgebraic;
import core.sys.windows.windows; import core.sys.windows.windows;
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
@ -46,10 +48,10 @@ final class WinAPIEventDriver : EventDriver {
WSADATA wd; WSADATA wd;
enforce(() @trusted { return WSAStartup(0x0202, &wd); } () == 0, "Failed to initialize WinSock"); enforce(() @trusted { return WSAStartup(0x0202, &wd); } () == 0, "Failed to initialize WinSock");
m_events = new WinAPIEventDriverEvents();
m_signals = new WinAPIEventDriverSignals(); m_signals = new WinAPIEventDriverSignals();
m_timers = new LoopTimeoutTimerDriver(); m_timers = new LoopTimeoutTimerDriver();
m_core = new WinAPIEventDriverCore(m_timers); m_core = new WinAPIEventDriverCore(m_timers);
m_events = new WinAPIEventDriverEvents(m_core);
m_files = new WinAPIEventDriverFiles(); m_files = new WinAPIEventDriverFiles();
m_sockets = new WinAPIEventDriverSockets(); m_sockets = new WinAPIEventDriverSockets();
m_dns = new WinAPIEventDriverDNS(); m_dns = new WinAPIEventDriverDNS();
@ -70,6 +72,7 @@ final class WinAPIEventDriver : EventDriver {
override void dispose() override void dispose()
{ {
m_events.dispose();
assert(threadInstance !is null); assert(threadInstance !is null);
threadInstance = null; threadInstance = null;
} }
@ -83,6 +86,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
DWORD m_tid; DWORD m_tid;
LoopTimeoutTimerDriver m_timers; LoopTimeoutTimerDriver m_timers;
HANDLE[] m_registeredEvents; HANDLE[] m_registeredEvents;
void delegate() @safe nothrow[HANDLE] m_eventCallbacks;
HANDLE m_fileCompletionEvent; HANDLE m_fileCompletionEvent;
HandleSlot[HANDLE] m_handles; // FIXME: use allocator based hash map HandleSlot[HANDLE] m_handles; // FIXME: use allocator based hash map
@ -93,7 +97,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_timers = timers; m_timers = timers;
m_tid = () @trusted { return GetCurrentThreadId(); } (); m_tid = () @trusted { return GetCurrentThreadId(); } ();
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
m_registeredEvents ~= m_fileCompletionEvent; registerEvent(m_fileCompletionEvent);
} }
override size_t waiterCount() { return m_waiterCount; } override size_t waiterCount() { return m_waiterCount; }
@ -170,6 +174,10 @@ final class WinAPIEventDriverCore : EventDriverCore {
auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr,
timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } ();
if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) {
if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks)
(*pc)();
}
/*if (ret == WAIT_OBJECT_0) { /*if (ret == WAIT_OBJECT_0) {
got_event = true; got_event = true;
Win32TCPConnection[] to_remove; Win32TCPConnection[] to_remove;
@ -202,6 +210,13 @@ final class WinAPIEventDriverCore : EventDriverCore {
return got_event; return got_event;
} }
private void registerEvent(HANDLE event, void delegate() @safe nothrow callback = null)
{
m_registeredEvents ~= event;
if (callback) m_eventCallbacks[event] = callback;
}
private ref SlotType setupSlot(SlotType)(HANDLE h) private ref SlotType setupSlot(SlotType)(HANDLE h)
{ {
assert(h !in m_handles, "Handle already in use."); assert(h !in m_handles, "Handle already in use.");
@ -397,39 +412,129 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
final class WinAPIEventDriverEvents : EventDriverEvents { final class WinAPIEventDriverEvents : EventDriverEvents {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private {
static struct Trigger {
EventID id;
bool notifyAll;
}
static struct EventSlot {
uint refCount;
ConsumableQueue!EventCallback waiters;
}
WinAPIEventDriverCore m_core;
HANDLE m_event;
EventSlot[EventID] m_events;
CRITICAL_SECTION m_mutex;
ConsumableQueue!Trigger m_pending;
uint m_idCounter;
}
this(WinAPIEventDriverCore core)
{
m_core = core;
m_event = () @trusted { return CreateEvent(null, false, false, null); } ();
m_pending = new ConsumableQueue!Trigger; // FIXME: avoid GC allocation
InitializeCriticalSection(&m_mutex);
m_core.registerEvent(m_event, &triggerPending);
}
void dispose()
@trusted {
scope (failure) assert(false);
destroy(m_pending);
}
override EventID create() override EventID create()
{ {
assert(false, "TODO!"); auto id = EventID(m_idCounter++);
if (id == EventID.invalid) id = EventID(m_idCounter++);
m_events[id] = EventSlot(1, new ConsumableQueue!EventCallback); // FIXME: avoid GC allocation
return id;
} }
override void trigger(EventID event, bool notify_all = true) override void trigger(EventID event, bool notify_all = true)
{ {
assert(false, "TODO!"); auto pe = event in m_events;
assert(pe !is null, "Invalid event ID passed to triggerEvent.");
if (notify_all) {
foreach (w; pe.waiters.consume)
w(event);
} else {
if (!pe.waiters.empty)
pe.waiters.consumeOne()(event);
}
} }
override void trigger(EventID event, bool notify_all = true) shared override void trigger(EventID event, bool notify_all = true) shared
{ {
assert(false, "TODO!"); import core.atomic : atomicStore;
auto pe = event in m_events;
assert(pe !is null, "Invalid event ID passed to shared triggerEvent.");
() @trusted {
auto thisus = cast(WinAPIEventDriverEvents)this;
EnterCriticalSection(&thisus.m_mutex);
thisus.m_pending.put(Trigger(event, notify_all));
LeaveCriticalSection(&thisus.m_mutex);
SetEvent(thisus.m_event);
} ();
} }
override void wait(EventID event, EventCallback on_event) override void wait(EventID event, EventCallback on_event)
{ {
assert(false, "TODO!"); return m_events[event].waiters.put(on_event);
} }
override void cancelWait(EventID event, EventCallback on_event) override void cancelWait(EventID event, EventCallback on_event)
{ {
assert(false, "TODO!"); import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove;
m_events[event].waiters.removePending(on_event);
} }
override void addRef(EventID descriptor) override void addRef(EventID descriptor)
{ {
assert(false, "TODO!"); assert(m_events[descriptor].refCount > 0);
m_events[descriptor].refCount++;
} }
override bool releaseRef(EventID descriptor) override bool releaseRef(EventID descriptor)
{ {
assert(false, "TODO!"); auto pe = descriptor in m_events;
assert(pe.refCount > 0);
if (--pe.refCount == 0) {
() @trusted nothrow {
scope (failure) assert(false);
destroy(pe.waiters);
CloseHandle(idToHandle(descriptor));
} ();
m_events.remove(descriptor);
return false;
}
return true;
}
private void triggerPending()
{
while (true) {
Trigger t;
{
() @trusted { EnterCriticalSection(&m_mutex); } ();
scope (exit) () @trusted { LeaveCriticalSection(&m_mutex); } ();
if (m_pending.empty) break;
t = m_pending.consumeOne;
}
trigger(t.id, t.notifyAll);
}
}
private static HANDLE idToHandle(EventID event)
@trusted {
return cast(HANDLE)cast(int)event;
} }
} }

View file

@ -4,8 +4,7 @@ module eventcore.internal.consumablequeue;
*/ */
final class ConsumableQueue(T) final class ConsumableQueue(T)
{ {
@safe: @safe nothrow:
nothrow:
private { private {
struct Slot { struct Slot {