diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 4a0a646..d92502e 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start write task"); try { - m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeWrites.remove(file); @@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start read task"); try { - m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeReads.remove(file); @@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil } /// private - static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) + static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer) { log("task fun"); - IOInfo* f = mixin("&fd.m_files[file]."~op); + shared(IOInfo)* f = mixin("&fi."~op); log("start processing"); if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) @@ -337,7 +341,7 @@ log("start processing"); scope (exit) { log("trigger event"); - () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); + files.m_events.trigger(files.m_readyEvent, true); } if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); @@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args) // Maintains a single thread pool shared by all driver instances (threads) private struct StaticTaskPool { - import core.atomic : cas, atomicStore; + import core.sync.mutex : Mutex; import std.parallelism : TaskPool; private { - static shared int m_locked = 0; + static shared Mutex m_mutex; static __gshared TaskPool m_pool; static __gshared int m_refCount = 0; } + shared static this() + { + m_mutex = new shared Mutex; + } + static TaskPool addRef() @trusted nothrow { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!m_refCount++) { try { @@ -452,8 +461,8 @@ private struct StaticTaskPool { TaskPool fin_pool; { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!--m_refCount) { fin_pool = m_pool;