Merge pull request #60 from vibe-d/single_file_taskpool

Use a single thread pool for all threaded file driver threads.
This commit is contained in:
Sönke Ludwig 2018-03-11 23:36:34 +01:00 committed by GitHub
commit a681fc988b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -123,13 +123,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
void dispose() void dispose()
{ {
if (m_fileThreadPool) { if (m_fileThreadPool)
log("finishing thread pool"); StaticTaskPool.releaseRef();
try m_fileThreadPool.finish();
catch (Exception e) {
//logError("Failed to shut down file I/O thread pool.");
}
}
log("finishing file events"); log("finishing file events");
m_events.cancelWait(m_readyEvent, &onReady); m_events.cancelWait(m_readyEvent, &onReady);
onReady(m_readyEvent); onReady(m_readyEvent);
@ -196,12 +191,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
assert(f.callback is null, "Concurrent file writes are not allowed."); assert(f.callback is null, "Concurrent file writes are not allowed.");
f.callback = on_write_finish; f.callback = on_write_finish;
m_activeWrites.insert(file); m_activeWrites.insert(file);
log("start task"); if (m_fileThreadPool is null)
m_fileThreadPool = StaticTaskPool.addRef();
log("start write task");
try { try {
if (m_fileThreadPool is null) {
m_fileThreadPool = new TaskPool(4);
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
startWaiting(); startWaiting();
} catch (Exception e) { } catch (Exception e) {
@ -230,11 +223,10 @@ log("start task");
assert(f.callback is null, "Concurrent file reads are not allowed."); assert(f.callback is null, "Concurrent file reads are not allowed.");
f.callback = on_read_finish; f.callback = on_read_finish;
m_activeReads.insert(file); m_activeReads.insert(file);
if (m_fileThreadPool is null)
m_fileThreadPool = StaticTaskPool.addRef();
log("start read task");
try { try {
if (m_fileThreadPool is null) {
m_fileThreadPool = new TaskPool(4);
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
startWaiting(); startWaiting();
} catch (Exception e) { } catch (Exception e) {
@ -359,3 +351,56 @@ private void log(ARGS...)(string fmt, ARGS args)
writefln(fmt, args); writefln(fmt, args);
} }
} }
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.atomic : cas, atomicStore;
import std.parallelism : TaskPool;
private {
static shared int m_locked = 0;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
static TaskPool addRef()
@trusted nothrow {
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
if (!m_refCount++) {
try {
m_pool = new TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, "Failed to create file thread pool: "~e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
while (!cas(&m_locked, 0, 1)) {}
scope (exit) atomicStore(m_locked, 0);
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
log("finishing thread pool");
try fin_pool.finish();
catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}