From 78db7c957311bb1191c644a850070f6ed3bf9018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 13 Apr 2019 17:00:29 +0200 Subject: [PATCH] Fix race-condition in ThreadedFileEventDriver. The m_files field was accessed from the worker threads, which is unsafe, because the chunk index of the ChoppedVector could change at any time. The accessed field is now copied to the worker thread instead. Also, instead of a custom spin lock, StaticTaskPool now uses a normal Mutex, which is just as fast, but emits the proper memory barriers and is integrated with LDC's thread sanitizer. --- source/eventcore/drivers/threadedfile.d | 31 ++++++++++++++++--------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 4a0a646..d92502e 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start write task"); try { - m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeWrites.remove(file); @@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start read task"); try { - m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeReads.remove(file); @@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil } /// private - static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) + static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer) { log("task fun"); - IOInfo* f = mixin("&fd.m_files[file]."~op); + shared(IOInfo)* f = mixin("&fi."~op); log("start processing"); if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) @@ -337,7 +341,7 @@ log("start processing"); scope (exit) { log("trigger event"); - () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); + files.m_events.trigger(files.m_readyEvent, true); } if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); @@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args) // Maintains a single thread pool shared by all driver instances (threads) private struct StaticTaskPool { - import core.atomic : cas, atomicStore; + import core.sync.mutex : Mutex; import std.parallelism : TaskPool; private { - static shared int m_locked = 0; + static shared Mutex m_mutex; static __gshared TaskPool m_pool; static __gshared int m_refCount = 0; } + shared static this() + { + m_mutex = new shared Mutex; + } + static TaskPool addRef() @trusted nothrow { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!m_refCount++) { try { @@ -452,8 +461,8 @@ private struct StaticTaskPool { TaskPool fin_pool; { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!--m_refCount) { fin_pool = m_pool;