diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 1f04d25..9bb76fb 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -108,7 +108,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil ChoppedVector!FileInfo m_files; SmallIntegerSet!FileFD m_activeReads; SmallIntegerSet!FileFD m_activeWrites; - EventID m_readyEvent; + EventID m_readyEvent = EventID.invalid; bool m_waiting; Events m_events; } @@ -118,18 +118,23 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil this(Events events) { m_events = events; - m_readyEvent = events.create(); } void dispose() { - if (m_fileThreadPool) + if (m_fileThreadPool) { StaticTaskPool.releaseRef(); - log("finishing file events"); - m_events.cancelWait(m_readyEvent, &onReady); - onReady(m_readyEvent); - m_events.releaseRef(m_readyEvent); - log("finished file events"); + m_fileThreadPool = null; + } + + if (m_readyEvent != EventID.invalid) { + log("finishing file events"); + m_events.cancelWait(m_readyEvent, &onReady); + onReady(m_readyEvent); + m_events.releaseRef(m_readyEvent); + m_readyEvent = EventID.invalid; + log("finished file events"); + } } final override FileFD open(string path, FileOpenMode mode) @@ -191,8 +196,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil assert(f.callback is null, "Concurrent file writes are not allowed."); f.callback = on_write_finish; m_activeWrites.insert(file); - if (m_fileThreadPool is null) - m_fileThreadPool = StaticTaskPool.addRef(); + threadSetup(); log("start write task"); try { m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); @@ -223,8 +227,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil assert(f.callback is null, "Concurrent file reads are not allowed."); f.callback = on_read_finish; m_activeReads.insert(file); - if (m_fileThreadPool is null) - m_fileThreadPool = StaticTaskPool.addRef(); + threadSetup(); log("start read task"); try { m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); @@ -330,6 +333,18 @@ log("ready event"); m_waiting = true; } } + + private void threadSetup() + { + if (m_readyEvent == EventID.invalid) { + log("create file event"); + m_readyEvent = m_events.create(); + } + if (m_fileThreadPool is null) { + log("aquire thread pool"); + m_fileThreadPool = StaticTaskPool.addRef(); + } + } } private auto safeAtomicLoad(T)(ref shared(T) v) @trusted { return atomicLoad(v); }