diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 185c693..1dfc57d 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -1,6 +1,7 @@ module eventcore.drivers.threadedfile; import eventcore.driver; +import eventcore.internal.ioworker; import eventcore.internal.utils; import core.atomic; import core.stdc.errno; @@ -110,7 +111,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil ubyte[16*size_t.sizeof] userData; } - TaskPool m_fileThreadPool; + IOWorkerPool m_fileThreadPool; ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop SmallIntegerSet!size_t m_activeReads; SmallIntegerSet!size_t m_activeWrites; @@ -128,10 +129,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil void dispose() { - if (m_fileThreadPool) { - StaticTaskPool.releaseRef(); - m_fileThreadPool = null; - } + m_fileThreadPool = IOWorkerPool.init; if (m_readyEvent != EventID.invalid) { log("finishing file events"); @@ -435,9 +433,9 @@ log("ready event"); log("create file event"); m_readyEvent = m_events.create(); } - if (m_fileThreadPool is null) { + if (!m_fileThreadPool) { log("aquire thread pool"); - m_fileThreadPool = StaticTaskPool.addRef(); + m_fileThreadPool = acquireIOWorkerPool(); } } } @@ -456,63 +454,3 @@ private void log(ARGS...)(string fmt, ARGS args) writefln(fmt, args); } } - - -// Maintains a single thread pool shared by all driver instances (threads) -private struct StaticTaskPool { - import core.sync.mutex : Mutex; - import std.parallelism : TaskPool; - - private { - static shared Mutex m_mutex; - static __gshared TaskPool m_pool; - static __gshared int m_refCount = 0; - } - - shared static this() - { - m_mutex = new shared Mutex; - } - - static TaskPool addRef() - @trusted nothrow { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!m_refCount++) { - try { - m_pool = mallocT!TaskPool(4); - m_pool.isDaemon = true; - } catch (Exception e) { - assert(false, e.msg); - } - } - - return m_pool; - } - - static void releaseRef() - @trusted nothrow { - TaskPool fin_pool; - - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!--m_refCount) { - fin_pool = m_pool; - m_pool = null; - } - } - - if (fin_pool) { - log("finishing thread pool"); - try { - fin_pool.finish(true); - freeT(fin_pool); - } catch (Exception e) { - //log("Failed to shut down file I/O thread pool."); - } - } - } -} diff --git a/source/eventcore/internal/ioworker.d b/source/eventcore/internal/ioworker.d new file mode 100644 index 0000000..0dcfd1a --- /dev/null +++ b/source/eventcore/internal/ioworker.d @@ -0,0 +1,89 @@ +/** Provides a shared task pool for distributing tasks to worker threads. +*/ +module eventcore.internal.ioworker; + +import eventcore.internal.utils; + +import std.parallelism : TaskPool; + + +IOWorkerPool acquireIOWorkerPool() +@safe nothrow { + return IOWorkerPool(true); +} + +struct IOWorkerPool { + private { + TaskPool m_pool; + } + + @safe nothrow: + + private this(bool) { m_pool = StaticTaskPool.addRef(); } + ~this() { if (m_pool) StaticTaskPool.releaseRef(); } + this(this) { if (m_pool) StaticTaskPool.addRef(); } + + bool opCast(T)() const if (is(T == bool)) { return !!m_pool; } + + @property TaskPool pool() { return m_pool; } + + alias pool this; +} + +// Maintains a single thread pool shared by all driver instances (threads) +private struct StaticTaskPool { + import core.sync.mutex : Mutex; + + private { + static shared Mutex m_mutex; + static __gshared TaskPool m_pool; + static __gshared int m_refCount = 0; + } + + shared static this() + { + m_mutex = new shared Mutex; + } + + static TaskPool addRef() + @trusted nothrow { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + if (!m_refCount++) { + try { + m_pool = mallocT!TaskPool(4); + m_pool.isDaemon = true; + } catch (Exception e) { + assert(false, e.msg); + } + } + + return m_pool; + } + + static void releaseRef() + @trusted nothrow { + TaskPool fin_pool; + + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + if (!--m_refCount) { + fin_pool = m_pool; + m_pool = null; + } + } + + if (fin_pool) { + //log("finishing thread pool"); + try { + fin_pool.finish(true); + freeT(fin_pool); + } catch (Exception e) { + //log("Failed to shut down file I/O thread pool."); + } + } + } +}