diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 632cccd..4d3ee49 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -317,8 +317,7 @@ void setIdleHandler(bool delegate() @safe nothrow del) Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) if (is(typeof(CALLABLE.init(ARGS.init)))) { - auto tfi = TaskFuncInfo.make(task, args); - return runTask_internal(tfi); + return runTask_internal!((ref tfi) { tfi.set(task, args); }); } /** @@ -341,7 +340,7 @@ auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args) return S(runTask(callable, args)); } -package Task runTask_internal(ref TaskFuncInfo tfi) +package Task runTask_internal(alias TFI_SETUP)() @safe nothrow { import std.typecons : Tuple, tuple; @@ -359,7 +358,7 @@ package Task runTask_internal(ref TaskFuncInfo tfi) f = new TaskFiber; } - f.m_taskFunc = tfi; + TFI_SETUP(f.m_taskFunc); f.bumpTaskCounter(); auto handle = f.task(); diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 94d324c..33dcc57 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -335,8 +335,9 @@ final package class TaskFiber : Fiber { private void run() nothrow { - import std.encoding : sanitize; + import std.algorithm.mutation : swap; import std.concurrency : Tid, thisTid; + import std.encoding : sanitize; import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield; version (VibeDebugCatchAll) alias UncaughtException = Throwable; @@ -353,8 +354,8 @@ final package class TaskFiber : Fiber { } } - auto task = m_taskFunc; - m_taskFunc = TaskFuncInfo.init; + TaskFuncInfo task; + swap(task, m_taskFunc); Task handle = this.task; try { m_running = true; @@ -368,7 +369,7 @@ final package class TaskFiber : Fiber { taskScheduler.yieldUninterruptible(); logTrace("Initial resume of task."); } - task.func(&task); + task.call(); debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); } catch (Exception e) { debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); @@ -488,12 +489,14 @@ final package class TaskFiber : Fiber { } package struct TaskFuncInfo { - void function(TaskFuncInfo*) func; + void function(ref TaskFuncInfo) func; void[2*size_t.sizeof] callable; void[maxTaskParameterSize] args; - static TaskFuncInfo make(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) + void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) { + assert(!func, "Setting TaskFuncInfo that is already set."); + import std.algorithm : move; import std.traits : hasElaborateAssign; @@ -504,7 +507,7 @@ package struct TaskFuncInfo { "The arguments passed to run(Worker)Task must not exceed "~ maxTaskParameterSize.to!string~" bytes in total size."); - static void callDelegate(TaskFuncInfo* tfi) { + static void callDelegate(ref TaskFuncInfo tfi) { assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); // copy original call data to stack @@ -520,21 +523,23 @@ package struct TaskFuncInfo { mixin(callWithMove!ARGS("c", "args.expand")); } - return () @trusted { - TaskFuncInfo tfi; - tfi.func = &callDelegate; + func = &callDelegate; - static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); - static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); - tfi.typedCallable!CALLABLE = callable; + () @trusted { + static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE(); + static if (hasElaborateAssign!TARGS) initArgs!TARGS(); + typedCallable!CALLABLE = callable; foreach (i, A; ARGS) { - static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); - else tfi.typedArgs!TARGS.expand[i] = args[i]; + static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]); + else typedArgs!TARGS.expand[i] = args[i]; } - return tfi; } (); } + void call() + { + this.func(this); + } @property ref C typedCallable(C)() { diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 7a40129..c6741f2 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -43,6 +43,7 @@ shared class TaskPool { m_signal = createSharedManualEvent(); with (m_state.lock) { + queue.setup(); threads.length = thread_count; foreach (i; 0 .. thread_count) { WorkerThread thr; @@ -68,7 +69,7 @@ shared class TaskPool { while (m_state.lock.threads.length > 0) ec = m_signal.waitUninterruptible(ec); - size_t cnt = m_state.lock.queue.tasks.length; + size_t cnt = m_state.lock.queue.length; if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt); } @@ -187,8 +188,7 @@ shared class TaskPool { static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); - auto tfi = TaskFuncInfo.make(callable, args); - m_state.lock.queue.put(tfi); + m_state.lock.queue.put(callable, args); m_signal.emitSingle(); } @@ -203,9 +203,8 @@ shared class TaskPool { "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); foreach (thr; m_state.lock.threads) { - // create one TFI per thread to properly acocunt for elaborate assignment operators/postblit - auto tfi = TaskFuncInfo.make(callable, args); - thr.m_queue.put(tfi); + // create one TFI per thread to properly account for elaborate assignment operators/postblit + thr.m_queue.put(callable, args); } m_signal.emit(); } @@ -220,6 +219,7 @@ private class WorkerThread : Thread { this(shared(TaskPool) pool) { m_pool = pool; + m_queue.setup(); super(&main); } @@ -256,6 +256,7 @@ private class WorkerThread : Thread { private void handleWorkerTasks() nothrow @safe { import std.algorithm.iteration : filter; + import std.algorithm.mutation : swap; import std.algorithm.searching : count; import std.array : array; @@ -276,10 +277,8 @@ private class WorkerThread : Thread { } } - if (taskfunc.func !is null) { - .runTask_internal(taskfunc); - taskfunc.func = null; - } + if (taskfunc.func !is null) + .runTask_internal!((ref tfi) { swap(tfi, taskfunc); }); else emit_count = m_pool.m_signal.waitUninterruptible(emit_count); } @@ -301,15 +300,34 @@ private class WorkerThread : Thread { private struct TaskQueue { nothrow @safe: - // FIXME: use a more efficient storage! - TaskFuncInfo[] tasks; - @property bool empty() const { return tasks.length == 0; } - void put(ref TaskFuncInfo tfi) { tasks ~= tfi; } + // TODO: avoid use of GC + + import vibe.internal.array : FixedRingBuffer; + FixedRingBuffer!TaskFuncInfo* m_queue; + + void setup() + { + m_queue = new FixedRingBuffer!TaskFuncInfo; + } + + @property bool empty() const { return m_queue.empty; } + + @property size_t length() const { return m_queue.length; } + + void put(CALLABLE, ARGS...)(ref CALLABLE c, ref ARGS args) + { + import std.algorithm.comparison : max; + if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2); + assert(!m_queue.full); + + m_queue.peekDst[0].set(c, args); + m_queue.putN(1); + } + bool consume(ref TaskFuncInfo tfi) { - if (tasks.length == 0) return false; - tfi = tasks[0]; - tasks = tasks[1 .. $]; + if (m_queue.empty) return false; + m_queue.read(() @trusted { return (&tfi)[0 .. 1]; } ()); return true; } }