diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 5cf899c..1f04d25 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -123,13 +123,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil void dispose() { - if (m_fileThreadPool) { - log("finishing thread pool"); - try m_fileThreadPool.finish(); - catch (Exception e) { - //logError("Failed to shut down file I/O thread pool."); - } - } + if (m_fileThreadPool) + StaticTaskPool.releaseRef(); log("finishing file events"); m_events.cancelWait(m_readyEvent, &onReady); onReady(m_readyEvent); @@ -196,12 +191,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil assert(f.callback is null, "Concurrent file writes are not allowed."); f.callback = on_write_finish; m_activeWrites.insert(file); -log("start task"); + if (m_fileThreadPool is null) + m_fileThreadPool = StaticTaskPool.addRef(); + log("start write task"); try { - if (m_fileThreadPool is null) { - m_fileThreadPool = new TaskPool(4); - m_fileThreadPool.isDaemon = true; - } m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); startWaiting(); } catch (Exception e) { @@ -230,11 +223,10 @@ log("start task"); assert(f.callback is null, "Concurrent file reads are not allowed."); f.callback = on_read_finish; m_activeReads.insert(file); + if (m_fileThreadPool is null) + m_fileThreadPool = StaticTaskPool.addRef(); + log("start read task"); try { - if (m_fileThreadPool is null) { - m_fileThreadPool = new TaskPool(4); - m_fileThreadPool.isDaemon = true; - } m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); startWaiting(); } catch (Exception e) { @@ -359,3 +351,56 @@ private void log(ARGS...)(string fmt, ARGS args) writefln(fmt, args); } } + + +// Maintains a single thread pool shared by all driver instances (threads) +private struct StaticTaskPool { + import core.atomic : cas, atomicStore; + import std.parallelism : TaskPool; + + private { + static shared int m_locked = 0; + static __gshared TaskPool m_pool; + static __gshared int m_refCount = 0; + } + + static TaskPool addRef() + @trusted nothrow { + while (!cas(&m_locked, 0, 1)) {} + scope (exit) atomicStore(m_locked, 0); + + if (!m_refCount++) { + try { + m_pool = new TaskPool(4); + m_pool.isDaemon = true; + } catch (Exception e) { + assert(false, "Failed to create file thread pool: "~e.msg); + } + } + + return m_pool; + } + + static void releaseRef() + @trusted nothrow { + TaskPool fin_pool; + + { + while (!cas(&m_locked, 0, 1)) {} + scope (exit) atomicStore(m_locked, 0); + + if (!--m_refCount) { + fin_pool = m_pool; + m_pool = null; + } + } + + if (fin_pool) { + log("finishing thread pool"); + try fin_pool.finish(); + catch (Exception e) { + //log("Failed to shut down file I/O thread pool."); + } + } + } +}