diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index f844265..4b3e1e8 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -54,7 +54,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { private { alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver); - alias EventsDriver = PosixEventDriverEvents!Loop; + alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver); version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop; else alias SignalsDriver = DummyEventDriverSignals!Loop; alias TimerDriver = LoopTimeoutTimerDriver; @@ -80,11 +80,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { this() { m_loop = new Loop; - m_events = new EventsDriver(m_loop); + m_sockets = new SocketsDriver(m_loop); + m_events = new EventsDriver(m_loop, m_sockets); m_signals = new SignalsDriver(m_loop); m_timers = new TimerDriver; m_core = new CoreDriver(m_loop, m_timers, m_events); - m_sockets = new SocketsDriver(m_loop); m_dns = new DNSDriver(m_events, m_signals); m_files = new FileDriver(m_events); m_watchers = new WatcherDriver(m_loop); @@ -1134,20 +1134,22 @@ private void passToDNSCallback()(DNSLookupID id, scope DNSLookupCallback cb, DNS } catch (Exception e) assert(false, e.msg); } -final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { +final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents { @safe: /*@nogc:*/ nothrow: private { Loop m_loop; + Sockets m_sockets; version (Windows) { - static struct ES { - uint refCount; - EventSlot slot; - } - ES[EventID] m_events; + EventSlot[DatagramSocketFD] m_events; + ubyte[long.sizeof] m_buf; } } - this(Loop loop) { m_loop = loop; } + this(Loop loop, Sockets sockets) + { + m_loop = loop; + m_sockets = sockets; + } final override EventID create() { @@ -1159,25 +1161,27 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); return id; } else version (Windows) { - auto h = () @trusted { return CreateEvent(null, false, false, null); } (); - auto id = EventID(cast(int)h); - m_events[id] = ES(1, EventSlot(new ConsumableQueue!EventCallback)); // FIXME: avoid dynamic memory allocation - return id; + auto addr = new InternetAddress(0x7F000001, 0); + auto s = m_sockets.createDatagramSocket(addr, addr); + if (s == DatagramSocketFD.invalid) print("oops"); + m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation + return cast(EventID)s; } else assert(false, "OS not supported!"); } final override void trigger(EventID event, bool notify_all = true) { - assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent."); + auto slot = &getSlot(event); if (notify_all) { //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); - foreach (w; getSlot(event).waiters.consume) { + foreach (w; slot.waiters.consume) { //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); w(event); } } else { - if (!getSlot(event).waiters.empty) - getSlot(event).waiters.consumeOne()(event); + if (!slot.waiters.empty) + slot.waiters.consumeOne()(event); } } @@ -1189,7 +1193,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { long one = 1; //log("emitting for all threads"); if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - () @trusted { .write(event, &one, one.sizeof); } (); + version (Windows) + (cast(Sockets)m_sockets).send(cast(DatagramSocketFD)event, cast(ubyte[])m_buf, IOMode.once, null, &onSocketDataSent); + else + () @trusted { .write(event, &one, one.sizeof); } (); } final override void wait(EventID event, EventCallback on_event) @@ -1217,6 +1224,17 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { trigger(event, all); } + version (Windows) { + private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress) + { + } + private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) + { + onEvent(cast(EventID)s); + m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + } + } + final override void addRef(EventID descriptor) { assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD."); @@ -1226,16 +1244,22 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { final override bool releaseRef(EventID descriptor) { assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); - if (--getRC(descriptor) == 0) { + void destroy() { () @trusted nothrow { scope (failure) assert(false); - destroy(getSlot(descriptor).waiters); + .destroy(getSlot(descriptor).waiters); assert(getSlot(descriptor).waiters is null); } (); - version (Windows) { - () @trusted { CloseHandle(cast(HANDLE)cast(int)descriptor); } (); - m_events.remove(descriptor); - } else { + } + version (Windows) { + if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) { + destroy(); + m_events.remove(cast(DatagramSocketFD)descriptor); + return false; + } + } else { + if (--getRC(descriptor) == 0) { + destroy(); m_loop.unregisterFD(descriptor); m_loop.clearFD(descriptor); close(descriptor); @@ -1248,8 +1272,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { private ref EventSlot getSlot(EventID id) { version (Windows) { - assert(id in m_events, "Invalid event ID."); - return m_events[id].slot; + assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID."); + return m_events[cast(DatagramSocketFD)id]; } else { assert(id < m_loop.m_fds.length, "Invalid event ID."); return m_loop.m_fds[id].event(); @@ -1258,11 +1282,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { private ref uint getRC(EventID id) { - version (Windows) { - return m_events[id].refCount; - } else { - return m_loop.m_fds[id].common.refCount; - } + return m_loop.m_fds[id].common.refCount; } } diff --git a/tests/0-event.d b/tests/0-event.d index c8b412b..b0f9e2c 100644 --- a/tests/0-event.d +++ b/tests/0-event.d @@ -13,6 +13,8 @@ import core.thread : Thread; bool s_done; shared EventDriverEvents ss_evts; +// TODO: adjust to detect premature loop exit if only an event wait is active (no timer) + void test(bool notify_all) { auto id = eventDriver.events.create();