From 48d083b20fcc112f8496ca5040bc4ad48b9c8d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 13 Apr 2019 16:57:04 +0200 Subject: [PATCH 1/2] Fix possible race condition in PosixEventDriverEvents.trigger. Accessing the event slot should only be done from the owner thread, since the chunk index of the ChoppedVector could be updated at any time. Instead of a triggerAll field, this flag is now propagated through the underlying eventfd/socket pair. --- source/eventcore/drivers/posix/events.d | 35 ++++++++++--------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index ce2cf7b..41d3517 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -48,7 +48,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -106,7 +106,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); assert(getRC(id) == 1); return id; } @@ -133,13 +133,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void trigger(EventID event, bool notify_all) shared @trusted @nogc { import core.atomic : atomicStore; - auto thisus = cast(PosixEventDriverEvents)this; - assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); - long one = 1; + long count = notify_all ? long.max : 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (Posix) .write(cast(int)event, &one, one.sizeof); - else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); + version (Posix) .write(cast(int)event, &count, count.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -157,26 +154,23 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS getSlot(event).waiters.removePending(on_event); } - private void onEvent(FD fd) - @trusted { - EventID event = cast(EventID)fd; - version (linux) { + version (linux) { + private void onEvent(FD fd) + @trusted { + EventID event = cast(EventID)fd; ulong cnt; () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); + trigger(event, cnt > 0); } - import core.atomic : cas; - auto all = cas(&getSlot(event).triggerAll, true, false); - trigger(event, all); - } - - version (linux) {} - else { + } else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) @nogc { m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { EventID evt = m_sockets.userData!EventID(s); - scope doit = { onEvent(evt); }; // cast to nogc + scope doit = { + trigger(evt, (cast(long[])m_buf)[0] > 1); + }; // cast to nogc () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } @@ -238,7 +232,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; - shared bool triggerAll; bool isInternal; version (linux) {} else { From 78db7c957311bb1191c644a850070f6ed3bf9018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 13 Apr 2019 17:00:29 +0200 Subject: [PATCH 2/2] Fix race-condition in ThreadedFileEventDriver. The m_files field was accessed from the worker threads, which is unsafe, because the chunk index of the ChoppedVector could change at any time. The accessed field is now copied to the worker thread instead. Also, instead of a custom spin lock, StaticTaskPool now uses a normal Mutex, which is just as fast, but emits the proper memory barriers and is integrated with LDC's thread sanitizer. --- source/eventcore/drivers/threadedfile.d | 31 ++++++++++++++++--------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 4a0a646..d92502e 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start write task"); try { - m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeWrites.remove(file); @@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start read task"); try { - m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeReads.remove(file); @@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil } /// private - static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) + static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer) { log("task fun"); - IOInfo* f = mixin("&fd.m_files[file]."~op); + shared(IOInfo)* f = mixin("&fi."~op); log("start processing"); if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) @@ -337,7 +341,7 @@ log("start processing"); scope (exit) { log("trigger event"); - () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); + files.m_events.trigger(files.m_readyEvent, true); } if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); @@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args) // Maintains a single thread pool shared by all driver instances (threads) private struct StaticTaskPool { - import core.atomic : cas, atomicStore; + import core.sync.mutex : Mutex; import std.parallelism : TaskPool; private { - static shared int m_locked = 0; + static shared Mutex m_mutex; static __gshared TaskPool m_pool; static __gshared int m_refCount = 0; } + shared static this() + { + m_mutex = new shared Mutex; + } + static TaskPool addRef() @trusted nothrow { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!m_refCount++) { try { @@ -452,8 +461,8 @@ private struct StaticTaskPool { TaskPool fin_pool; { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!--m_refCount) { fin_pool = m_pool;