From 48d083b20fcc112f8496ca5040bc4ad48b9c8d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 13 Apr 2019 16:57:04 +0200 Subject: [PATCH] Fix possible race condition in PosixEventDriverEvents.trigger. Accessing the event slot should only be done from the owner thread, since the chunk index of the ChoppedVector could be updated at any time. Instead of a triggerAll field, this flag is now propagated through the underlying eventfd/socket pair. --- source/eventcore/drivers/posix/events.d | 35 ++++++++++--------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index ce2cf7b..41d3517 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -48,7 +48,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -106,7 +106,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); assert(getRC(id) == 1); return id; } @@ -133,13 +133,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void trigger(EventID event, bool notify_all) shared @trusted @nogc { import core.atomic : atomicStore; - auto thisus = cast(PosixEventDriverEvents)this; - assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); - long one = 1; + long count = notify_all ? long.max : 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (Posix) .write(cast(int)event, &one, one.sizeof); - else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); + version (Posix) .write(cast(int)event, &count, count.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -157,26 +154,23 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS getSlot(event).waiters.removePending(on_event); } - private void onEvent(FD fd) - @trusted { - EventID event = cast(EventID)fd; - version (linux) { + version (linux) { + private void onEvent(FD fd) + @trusted { + EventID event = cast(EventID)fd; ulong cnt; () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); + trigger(event, cnt > 0); } - import core.atomic : cas; - auto all = cas(&getSlot(event).triggerAll, true, false); - trigger(event, all); - } - - version (linux) {} - else { + } else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) @nogc { m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { EventID evt = m_sockets.userData!EventID(s); - scope doit = { onEvent(evt); }; // cast to nogc + scope doit = { + trigger(evt, (cast(long[])m_buf)[0] > 1); + }; // cast to nogc () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } @@ -238,7 +232,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; - shared bool triggerAll; bool isInternal; version (linux) {} else {