Use a single thread pool for all threaded file driver threads.

This avoids multiplying the number of threads by 5 for each thread that performs file I/O.
This commit is contained in:
Sönke Ludwig 2018-03-09 15:13:42 +01:00
parent acc35e1107
commit f5f64ee476

View file

@ -123,13 +123,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
void dispose()
{
if (m_fileThreadPool) {
log("finishing thread pool");
try m_fileThreadPool.finish();
catch (Exception e) {
//logError("Failed to shut down file I/O thread pool.");
}
}
if (m_fileThreadPool)
StaticTaskPool.releaseRef();
log("finishing file events");
m_events.cancelWait(m_readyEvent, &onReady);
onReady(m_readyEvent);
@ -196,12 +191,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
assert(f.callback is null, "Concurrent file writes are not allowed.");
f.callback = on_write_finish;
m_activeWrites.insert(file);
log("start task");
if (m_fileThreadPool is null)
m_fileThreadPool = StaticTaskPool.addRef();
log("start write task");
try {
if (m_fileThreadPool is null) {
m_fileThreadPool = new TaskPool(4);
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
startWaiting();
} catch (Exception e) {
@ -230,11 +223,10 @@ log("start task");
assert(f.callback is null, "Concurrent file reads are not allowed.");
f.callback = on_read_finish;
m_activeReads.insert(file);
if (m_fileThreadPool is null)
m_fileThreadPool = StaticTaskPool.addRef();
log("start read task");
try {
if (m_fileThreadPool is null) {
m_fileThreadPool = new TaskPool(4);
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
startWaiting();
} catch (Exception e) {
@ -359,3 +351,56 @@ private void log(ARGS...)(string fmt, ARGS args)
writefln(fmt, args);
}
}
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.atomic : cas, atomicStore;
import std.parallelism : TaskPool;
private {
static shared int m_locked = 0;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
static TaskPool addRef()
@trusted nothrow {
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
if (!m_refCount++) {
try {
m_pool = new TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, "Failed to create file thread pool: "~e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
log("finishing thread pool");
try fin_pool.finish();
catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}