Use a ring buffer for worker tasks and make TaskFuncInfo creation slightly more efficient.

This commit is contained in:
Sönke Ludwig 2017-02-22 19:52:22 +01:00
parent 6f78310f26
commit 9ac908c599
3 changed files with 59 additions and 37 deletions

View file

@ -317,8 +317,7 @@ void setIdleHandler(bool delegate() @safe nothrow del)
Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
if (is(typeof(CALLABLE.init(ARGS.init)))) if (is(typeof(CALLABLE.init(ARGS.init))))
{ {
auto tfi = TaskFuncInfo.make(task, args); return runTask_internal!((ref tfi) { tfi.set(task, args); });
return runTask_internal(tfi);
} }
/** /**
@ -341,7 +340,7 @@ auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args)
return S(runTask(callable, args)); return S(runTask(callable, args));
} }
package Task runTask_internal(ref TaskFuncInfo tfi) package Task runTask_internal(alias TFI_SETUP)()
@safe nothrow { @safe nothrow {
import std.typecons : Tuple, tuple; import std.typecons : Tuple, tuple;
@ -359,7 +358,7 @@ package Task runTask_internal(ref TaskFuncInfo tfi)
f = new TaskFiber; f = new TaskFiber;
} }
f.m_taskFunc = tfi; TFI_SETUP(f.m_taskFunc);
f.bumpTaskCounter(); f.bumpTaskCounter();
auto handle = f.task(); auto handle = f.task();

View file

@ -335,8 +335,9 @@ final package class TaskFiber : Fiber {
private void run() private void run()
nothrow { nothrow {
import std.encoding : sanitize; import std.algorithm.mutation : swap;
import std.concurrency : Tid, thisTid; import std.concurrency : Tid, thisTid;
import std.encoding : sanitize;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield; import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield;
version (VibeDebugCatchAll) alias UncaughtException = Throwable; version (VibeDebugCatchAll) alias UncaughtException = Throwable;
@ -353,8 +354,8 @@ final package class TaskFiber : Fiber {
} }
} }
auto task = m_taskFunc; TaskFuncInfo task;
m_taskFunc = TaskFuncInfo.init; swap(task, m_taskFunc);
Task handle = this.task; Task handle = this.task;
try { try {
m_running = true; m_running = true;
@ -368,7 +369,7 @@ final package class TaskFiber : Fiber {
taskScheduler.yieldUninterruptible(); taskScheduler.yieldUninterruptible();
logTrace("Initial resume of task."); logTrace("Initial resume of task.");
} }
task.func(&task); task.call();
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);
} catch (Exception e) { } catch (Exception e) {
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
@ -488,12 +489,14 @@ final package class TaskFiber : Fiber {
} }
package struct TaskFuncInfo { package struct TaskFuncInfo {
void function(TaskFuncInfo*) func; void function(ref TaskFuncInfo) func;
void[2*size_t.sizeof] callable; void[2*size_t.sizeof] callable;
void[maxTaskParameterSize] args; void[maxTaskParameterSize] args;
static TaskFuncInfo make(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{ {
assert(!func, "Setting TaskFuncInfo that is already set.");
import std.algorithm : move; import std.algorithm : move;
import std.traits : hasElaborateAssign; import std.traits : hasElaborateAssign;
@ -504,7 +507,7 @@ package struct TaskFuncInfo {
"The arguments passed to run(Worker)Task must not exceed "~ "The arguments passed to run(Worker)Task must not exceed "~
maxTaskParameterSize.to!string~" bytes in total size."); maxTaskParameterSize.to!string~" bytes in total size.");
static void callDelegate(TaskFuncInfo* tfi) { static void callDelegate(ref TaskFuncInfo tfi) {
assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
// copy original call data to stack // copy original call data to stack
@ -520,21 +523,23 @@ package struct TaskFuncInfo {
mixin(callWithMove!ARGS("c", "args.expand")); mixin(callWithMove!ARGS("c", "args.expand"));
} }
return () @trusted { func = &callDelegate;
TaskFuncInfo tfi;
tfi.func = &callDelegate;
static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); () @trusted {
static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE();
tfi.typedCallable!CALLABLE = callable; static if (hasElaborateAssign!TARGS) initArgs!TARGS();
typedCallable!CALLABLE = callable;
foreach (i, A; ARGS) { foreach (i, A; ARGS) {
static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]);
else tfi.typedArgs!TARGS.expand[i] = args[i]; else typedArgs!TARGS.expand[i] = args[i];
} }
return tfi;
} (); } ();
} }
void call()
{
this.func(this);
}
@property ref C typedCallable(C)() @property ref C typedCallable(C)()
{ {

View file

@ -43,6 +43,7 @@ shared class TaskPool {
m_signal = createSharedManualEvent(); m_signal = createSharedManualEvent();
with (m_state.lock) { with (m_state.lock) {
queue.setup();
threads.length = thread_count; threads.length = thread_count;
foreach (i; 0 .. thread_count) { foreach (i; 0 .. thread_count) {
WorkerThread thr; WorkerThread thr;
@ -68,7 +69,7 @@ shared class TaskPool {
while (m_state.lock.threads.length > 0) while (m_state.lock.threads.length > 0)
ec = m_signal.waitUninterruptible(ec); ec = m_signal.waitUninterruptible(ec);
size_t cnt = m_state.lock.queue.tasks.length; size_t cnt = m_state.lock.queue.length;
if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt); if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt);
} }
@ -187,8 +188,7 @@ shared class TaskPool {
static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
auto tfi = TaskFuncInfo.make(callable, args); m_state.lock.queue.put(callable, args);
m_state.lock.queue.put(tfi);
m_signal.emitSingle(); m_signal.emitSingle();
} }
@ -203,9 +203,8 @@ shared class TaskPool {
"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
foreach (thr; m_state.lock.threads) { foreach (thr; m_state.lock.threads) {
// create one TFI per thread to properly acocunt for elaborate assignment operators/postblit // create one TFI per thread to properly account for elaborate assignment operators/postblit
auto tfi = TaskFuncInfo.make(callable, args); thr.m_queue.put(callable, args);
thr.m_queue.put(tfi);
} }
m_signal.emit(); m_signal.emit();
} }
@ -220,6 +219,7 @@ private class WorkerThread : Thread {
this(shared(TaskPool) pool) this(shared(TaskPool) pool)
{ {
m_pool = pool; m_pool = pool;
m_queue.setup();
super(&main); super(&main);
} }
@ -256,6 +256,7 @@ private class WorkerThread : Thread {
private void handleWorkerTasks() private void handleWorkerTasks()
nothrow @safe { nothrow @safe {
import std.algorithm.iteration : filter; import std.algorithm.iteration : filter;
import std.algorithm.mutation : swap;
import std.algorithm.searching : count; import std.algorithm.searching : count;
import std.array : array; import std.array : array;
@ -276,10 +277,8 @@ private class WorkerThread : Thread {
} }
} }
if (taskfunc.func !is null) { if (taskfunc.func !is null)
.runTask_internal(taskfunc); .runTask_internal!((ref tfi) { swap(tfi, taskfunc); });
taskfunc.func = null;
}
else emit_count = m_pool.m_signal.waitUninterruptible(emit_count); else emit_count = m_pool.m_signal.waitUninterruptible(emit_count);
} }
@ -301,15 +300,34 @@ private class WorkerThread : Thread {
private struct TaskQueue { private struct TaskQueue {
nothrow @safe: nothrow @safe:
// FIXME: use a more efficient storage! // TODO: avoid use of GC
TaskFuncInfo[] tasks;
@property bool empty() const { return tasks.length == 0; } import vibe.internal.array : FixedRingBuffer;
void put(ref TaskFuncInfo tfi) { tasks ~= tfi; } FixedRingBuffer!TaskFuncInfo* m_queue;
void setup()
{
m_queue = new FixedRingBuffer!TaskFuncInfo;
}
@property bool empty() const { return m_queue.empty; }
@property size_t length() const { return m_queue.length; }
void put(CALLABLE, ARGS...)(ref CALLABLE c, ref ARGS args)
{
import std.algorithm.comparison : max;
if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2);
assert(!m_queue.full);
m_queue.peekDst[0].set(c, args);
m_queue.putN(1);
}
bool consume(ref TaskFuncInfo tfi) bool consume(ref TaskFuncInfo tfi)
{ {
if (tasks.length == 0) return false; if (m_queue.empty) return false;
tfi = tasks[0]; m_queue.read(() @trusted { return (&tfi)[0 .. 1]; } ());
tasks = tasks[1 .. $];
return true; return true;
} }
} }