Fix race-condition in ThreadedFileEventDriver.

The m_files field was accessed from the worker threads, which is unsafe, because the chunk index of the ChoppedVector could change at any time. The accessed field is now copied to the worker thread instead.

Also, instead of a custom spin lock, StaticTaskPool now uses a normal Mutex, which is just as fast, but emits the proper memory barriers and is integrated with LDC's thread sanitizer.
This commit is contained in:
Sönke Ludwig 2019-04-13 17:00:29 +02:00
parent 48d083b20f
commit 78db7c9573

View file

@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
threadSetup();
log("start write task");
try {
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
auto thiss = () @trusted { return cast(shared)this; } ();
auto fs = () @trusted { return cast(shared)f; } ();
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer));
startWaiting();
} catch (Exception e) {
m_activeWrites.remove(file);
@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
threadSetup();
log("start read task");
try {
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
auto thiss = () @trusted { return cast(shared)this; } ();
auto fs = () @trusted { return cast(shared)f; } ();
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer));
startWaiting();
} catch (Exception e) {
m_activeReads.remove(file);
@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
}
/// private
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer)
static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer)
{
log("task fun");
IOInfo* f = mixin("&fd.m_files[file]."~op);
shared(IOInfo)* f = mixin("&fi."~op);
log("start processing");
if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing))
@ -337,7 +341,7 @@ log("start processing");
scope (exit) {
log("trigger event");
() @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true);
files.m_events.trigger(files.m_readyEvent, true);
}
if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok);
@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args)
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.atomic : cas, atomicStore;
import core.sync.mutex : Mutex;
import std.parallelism : TaskPool;
private {
static shared int m_locked = 0;
static shared Mutex m_mutex;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef()
@trusted nothrow {
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) {
try {
@ -452,8 +461,8 @@ private struct StaticTaskPool {
TaskPool fin_pool;
{
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) {
fin_pool = m_pool;