Move shared thread pool functionality to a separate module.

This commit is contained in:
Sönke Ludwig 2020-05-08 18:18:58 +02:00
parent a27abdb956
commit d7657b54e8
2 changed files with 94 additions and 67 deletions

View file

@ -1,6 +1,7 @@
module eventcore.drivers.threadedfile; module eventcore.drivers.threadedfile;
import eventcore.driver; import eventcore.driver;
import eventcore.internal.ioworker;
import eventcore.internal.utils; import eventcore.internal.utils;
import core.atomic; import core.atomic;
import core.stdc.errno; import core.stdc.errno;
@ -110,7 +111,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
ubyte[16*size_t.sizeof] userData; ubyte[16*size_t.sizeof] userData;
} }
TaskPool m_fileThreadPool; IOWorkerPool m_fileThreadPool;
ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop
SmallIntegerSet!size_t m_activeReads; SmallIntegerSet!size_t m_activeReads;
SmallIntegerSet!size_t m_activeWrites; SmallIntegerSet!size_t m_activeWrites;
@ -128,10 +129,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
void dispose() void dispose()
{ {
if (m_fileThreadPool) { m_fileThreadPool = IOWorkerPool.init;
StaticTaskPool.releaseRef();
m_fileThreadPool = null;
}
if (m_readyEvent != EventID.invalid) { if (m_readyEvent != EventID.invalid) {
log("finishing file events"); log("finishing file events");
@ -435,9 +433,9 @@ log("ready event");
log("create file event"); log("create file event");
m_readyEvent = m_events.create(); m_readyEvent = m_events.create();
} }
if (m_fileThreadPool is null) { if (!m_fileThreadPool) {
log("aquire thread pool"); log("aquire thread pool");
m_fileThreadPool = StaticTaskPool.addRef(); m_fileThreadPool = acquireIOWorkerPool();
} }
} }
} }
@ -456,63 +454,3 @@ private void log(ARGS...)(string fmt, ARGS args)
writefln(fmt, args); writefln(fmt, args);
} }
} }
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.sync.mutex : Mutex;
import std.parallelism : TaskPool;
private {
static shared Mutex m_mutex;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef()
@trusted nothrow {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) {
try {
m_pool = mallocT!TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
log("finishing thread pool");
try {
fin_pool.finish(true);
freeT(fin_pool);
} catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}

View file

@ -0,0 +1,89 @@
/** Provides a shared task pool for distributing tasks to worker threads.
*/
module eventcore.internal.ioworker;
import eventcore.internal.utils;
import std.parallelism : TaskPool;
IOWorkerPool acquireIOWorkerPool()
@safe nothrow {
return IOWorkerPool(true);
}
struct IOWorkerPool {
private {
TaskPool m_pool;
}
@safe nothrow:
private this(bool) { m_pool = StaticTaskPool.addRef(); }
~this() { if (m_pool) StaticTaskPool.releaseRef(); }
this(this) { if (m_pool) StaticTaskPool.addRef(); }
bool opCast(T)() const if (is(T == bool)) { return !!m_pool; }
@property TaskPool pool() { return m_pool; }
alias pool this;
}
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.sync.mutex : Mutex;
private {
static shared Mutex m_mutex;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef()
@trusted nothrow {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) {
try {
m_pool = mallocT!TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
//log("finishing thread pool");
try {
fin_pool.finish(true);
freeT(fin_pool);
} catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}