diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index ce2cf7b..41d3517 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -48,7 +48,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -106,7 +106,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); assert(getRC(id) == 1); return id; } @@ -133,13 +133,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void trigger(EventID event, bool notify_all) shared @trusted @nogc { import core.atomic : atomicStore; - auto thisus = cast(PosixEventDriverEvents)this; - assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); - long one = 1; + long count = notify_all ? long.max : 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (Posix) .write(cast(int)event, &one, one.sizeof); - else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); + version (Posix) .write(cast(int)event, &count, count.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -157,26 +154,23 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS getSlot(event).waiters.removePending(on_event); } - private void onEvent(FD fd) - @trusted { - EventID event = cast(EventID)fd; - version (linux) { + version (linux) { + private void onEvent(FD fd) + @trusted { + EventID event = cast(EventID)fd; ulong cnt; () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); + trigger(event, cnt > 0); } - import core.atomic : cas; - auto all = cas(&getSlot(event).triggerAll, true, false); - trigger(event, all); - } - - version (linux) {} - else { + } else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) @nogc { m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { EventID evt = m_sockets.userData!EventID(s); - scope doit = { onEvent(evt); }; // cast to nogc + scope doit = { + trigger(evt, (cast(long[])m_buf)[0] > 1); + }; // cast to nogc () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } @@ -238,7 +232,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; - shared bool triggerAll; bool isInternal; version (linux) {} else {