2017-02-22 17:35:51 +00:00
/ * *
Multi - threaded task pool implementation .
2020-03-14 18:47:20 +00:00
Copyright : © 2012 - 2020 Sönke Ludwig
2017-02-22 17:35:51 +00:00
License : Subject to the terms of the MIT license , as written in the included LICENSE . txt file .
Authors : Sönke Ludwig
* /
module vibe.core.taskpool ;
import vibe.core.concurrency : isWeaklyIsolated ;
import vibe.core.core : exitEventLoop , logicalProcessorCount , runEventLoop , runTask , runTask_internal ;
import vibe.core.log ;
2019-09-17 12:35:52 +00:00
import vibe.core.sync : ManualEvent , Monitor , createSharedManualEvent , createMonitor ;
2020-03-14 18:47:20 +00:00
import vibe.core.task : Task , TaskFuncInfo , TaskSettings , callWithMove ;
2017-02-22 17:35:51 +00:00
import core.sync.mutex : Mutex ;
import core.thread : Thread ;
import std.concurrency : prioritySend , receiveOnly ;
import std.traits : isFunctionPointer ;
/ * * Implements a shared , multi - threaded task pool .
* /
2017-07-23 13:04:11 +00:00
shared final class TaskPool {
2017-02-22 17:35:51 +00:00
private {
struct State {
WorkerThread [ ] threads ;
TaskQueue queue ;
bool term ;
}
2019-09-17 12:35:52 +00:00
vibe . core . sync . Monitor ! ( State , shared ( Mutex ) ) m_state ;
2017-02-22 17:35:51 +00:00
shared ( ManualEvent ) m_signal ;
2017-09-21 11:38:24 +00:00
immutable size_t m_threadCount ;
2017-02-22 17:35:51 +00:00
}
/ * * Creates a new task pool with the specified number of threads .
Params :
2019-01-13 23:27:09 +00:00
thread_count = The number of worker threads to create
2017-02-22 17:35:51 +00:00
* /
this ( size_t thread_count = logicalProcessorCount ( ) )
@safe {
import std.format : format ;
2017-09-21 11:38:24 +00:00
m_threadCount = thread_count ;
2017-02-22 17:35:51 +00:00
m_signal = createSharedManualEvent ( ) ;
2019-09-17 12:35:52 +00:00
m_state = createMonitor ! State ( new shared Mutex ) ;
2017-02-22 17:35:51 +00:00
with ( m_state . lock ) {
2017-02-22 18:52:22 +00:00
queue . setup ( ) ;
2017-02-22 17:35:51 +00:00
threads . length = thread_count ;
foreach ( i ; 0 . . thread_count ) {
WorkerThread thr ;
2017-07-18 20:51:09 +00:00
( ) @trusted {
2017-02-22 17:35:51 +00:00
thr = new WorkerThread ( this ) ;
thr . name = format ( "vibe-%s" , i ) ;
thr . start ( ) ;
} ( ) ;
threads [ i ] = thr ;
}
}
}
2017-09-21 11:38:24 +00:00
/ * * Returns the number of worker threads .
* /
@property size_t threadCount ( ) const shared { return m_threadCount ; }
2017-02-22 17:35:51 +00:00
/ * * Instructs all worker threads to terminate and waits until all have
finished .
* /
void terminate ( )
@safe nothrow {
m_state . lock . term = true ;
m_signal . emit ( ) ;
2017-02-23 13:43:22 +00:00
while ( true ) {
WorkerThread th ;
with ( m_state . lock )
if ( threads . length ) {
th = threads [ 0 ] ;
threads = threads [ 1 . . $ ] ;
}
if ( ! th ) break ;
2018-10-26 22:42:06 +00:00
if ( th is Thread . getThis ( ) )
continue ;
2017-02-23 13:43:22 +00:00
( ) @trusted {
try th . join ( ) ;
catch ( Exception e ) {
logWarn ( "Failed to wait for worker thread exit: %s" , e . msg ) ;
}
} ( ) ;
}
2017-02-22 17:35:51 +00:00
2017-02-22 18:52:22 +00:00
size_t cnt = m_state . lock . queue . length ;
2017-02-22 17:35:51 +00:00
if ( cnt > 0 ) logWarn ( "There were still %d worker tasks pending at exit." , cnt ) ;
}
/ * * Instructs all worker threads to terminate as soon as all tasks have
been processed and waits for them to finish .
* /
void join ( )
@safe nothrow {
assert ( false , "TODO!" ) ;
}
/ * * Runs a new asynchronous task in a worker thread .
Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread - safety .
* /
void runTask ( FT , ARGS . . . ) ( FT func , auto ref ARGS args )
if ( isFunctionPointer ! FT )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
2020-03-14 18:47:20 +00:00
runTask_unsafe ( TaskSettings . init , func , args ) ;
2017-02-22 17:35:51 +00:00
}
/// ditto
void runTask ( alias method , T , ARGS . . . ) ( shared ( T ) object , auto ref ARGS args )
if ( is ( typeof ( __traits ( getMember , object , __traits ( identifier , method ) ) ) ) )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
auto func = & __traits ( getMember , object , __traits ( identifier , method ) ) ;
2020-03-14 18:47:20 +00:00
runTask_unsafe ( TaskSettings . init , func , args ) ;
}
/// ditto
void runTask ( FT , ARGS . . . ) ( TaskSettings settings , FT func , auto ref ARGS args )
if ( isFunctionPointer ! FT )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
runTask_unsafe ( settings , func , args ) ;
}
/// ditto
void runTask ( alias method , T , ARGS . . . ) ( TaskSettings settings , shared ( T ) object , auto ref ARGS args )
if ( is ( typeof ( __traits ( getMember , object , __traits ( identifier , method ) ) ) ) )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
auto func = & __traits ( getMember , object , __traits ( identifier , method ) ) ;
runTask_unsafe ( settings , func , args ) ;
2017-02-22 17:35:51 +00:00
}
/ * * Runs a new asynchronous task in a worker thread , returning the task handle .
This function will yield and wait for the new task to be created and started
in the worker thread , then resume and return it .
Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread - safety .
* /
Task runTaskH ( FT , ARGS . . . ) ( FT func , auto ref ARGS args )
if ( isFunctionPointer ! FT )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
// workaround for runWorkerTaskH to work when called outside of a task
2019-08-21 08:13:09 +00:00
if ( Task . getThis ( ) = = Task . init ) {
2017-02-22 17:35:51 +00:00
Task ret ;
2020-03-14 18:47:20 +00:00
. runTask ( { ret = doRunTaskH ( TaskSettings . init , func , args ) ; } ) . join ( ) ;
2017-02-22 17:35:51 +00:00
return ret ;
2020-03-14 18:47:20 +00:00
} else return doRunTaskH ( TaskSettings . init , func , args ) ;
2017-02-22 17:35:51 +00:00
}
/// ditto
Task runTaskH ( alias method , T , ARGS . . . ) ( shared ( T ) object , auto ref ARGS args )
if ( is ( typeof ( __traits ( getMember , object , __traits ( identifier , method ) ) ) ) )
{
static void wrapper ( ) ( shared ( T ) object , ref ARGS args ) {
__traits ( getMember , object , __traits ( identifier , method ) ) ( args ) ;
}
return runTaskH ( & wrapper ! ( ) , object , args ) ;
}
2020-03-14 18:47:20 +00:00
/// ditto
Task runTaskH ( FT , ARGS . . . ) ( TaskSettings settings , FT func , auto ref ARGS args )
if ( isFunctionPointer ! FT )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
// workaround for runWorkerTaskH to work when called outside of a task
if ( Task . getThis ( ) = = Task . init ) {
Task ret ;
. runTask ( { ret = doRunTaskH ( settings , func , args ) ; } ) . join ( ) ;
return ret ;
} else return doRunTaskH ( settings , func , args ) ;
}
/// ditto
Task runTaskH ( alias method , T , ARGS . . . ) ( TaskSettings settings , shared ( T ) object , auto ref ARGS args )
if ( is ( typeof ( __traits ( getMember , object , __traits ( identifier , method ) ) ) ) )
{
static void wrapper ( ) ( shared ( T ) object , ref ARGS args ) {
__traits ( getMember , object , __traits ( identifier , method ) ) ( args ) ;
}
return runTaskH ( settings , & wrapper ! ( ) , object , args ) ;
}
2017-02-22 17:35:51 +00:00
2019-08-21 08:13:09 +00:00
// NOTE: needs to be a separate function to avoid recursion for the
// workaround above, which breaks @safe inference
2020-03-14 18:47:20 +00:00
private Task doRunTaskH ( FT , ARGS . . . ) ( TaskSettings settings , FT func , ref ARGS args )
2019-08-21 08:13:09 +00:00
if ( isFunctionPointer ! FT )
{
import std.typecons : Typedef ;
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
alias PrivateTask = Typedef ! ( Task , Task . init , __PRETTY_FUNCTION__ ) ;
Task caller = Task . getThis ( ) ;
assert ( caller ! = Task . init , "runWorkderTaskH can currently only be called from within a task." ) ;
static void taskFun ( Task caller , FT func , ARGS args ) {
PrivateTask callee = Task . getThis ( ) ;
caller . tid . prioritySend ( callee ) ;
mixin ( callWithMove ! ARGS ( "func" , "args" ) ) ;
}
2020-03-14 18:47:20 +00:00
runTask_unsafe ( settings , & taskFun , caller , func , args ) ;
2019-08-21 08:13:09 +00:00
return cast ( Task ) ( ) @trusted { return receiveOnly ! PrivateTask ( ) ; } ( ) ;
}
2017-02-22 17:35:51 +00:00
/ * * Runs a new asynchronous task in all worker threads concurrently .
This function is mainly useful for long - living tasks that distribute their
work across all CPU cores . Only function pointers with weakly isolated
arguments are allowed to be able to guarantee thread - safety .
The number of tasks started is guaranteed to be equal to
2019-01-23 12:50:43 +00:00
`threadCount` .
2017-02-22 17:35:51 +00:00
* /
void runTaskDist ( FT , ARGS . . . ) ( FT func , auto ref ARGS args )
if ( is ( typeof ( * func ) = = function ) )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
2020-03-14 18:47:20 +00:00
runTaskDist_unsafe ( TaskSettings . init , func , args ) ;
2017-02-22 17:35:51 +00:00
}
/// ditto
void runTaskDist ( alias method , T , ARGS . . . ) ( shared ( T ) object , auto ref ARGS args )
{
auto func = & __traits ( getMember , object , __traits ( identifier , method ) ) ;
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
2020-03-14 18:47:20 +00:00
runTaskDist_unsafe ( TaskSettings . init , func , args ) ;
}
/// ditto
void runTaskDist ( FT , ARGS . . . ) ( TaskSettings settings , FT func , auto ref ARGS args )
if ( is ( typeof ( * func ) = = function ) )
{
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
runTaskDist_unsafe ( settings , func , args ) ;
}
/// ditto
void runTaskDist ( alias method , T , ARGS . . . ) ( TaskSettings settings , shared ( T ) object , auto ref ARGS args )
{
auto func = & __traits ( getMember , object , __traits ( identifier , method ) ) ;
foreach ( T ; ARGS ) static assert ( isWeaklyIsolated ! T , "Argument type " ~ T . stringof ~ " is not safe to pass between threads." ) ;
runTaskDist_unsafe ( settings , func , args ) ;
2017-02-22 17:35:51 +00:00
}
2019-01-23 12:50:43 +00:00
/ * * Runs a new asynchronous task in all worker threads and returns the handles .
`on_handle` is an alias to a callble that takes a `Task` as its only
argument and is called for every task instance that gets created .
See_also : `runTaskDist`
* /
void runTaskDistH ( HCB , FT , ARGS . . . ) ( scope HCB on_handle , FT func , auto ref ARGS args )
2020-03-14 22:33:41 +00:00
if ( ! is ( HCB = = TaskSettings ) )
{
runTaskDistH ( TaskSettings . init , on_handle , func , args ) ;
}
/// ditto
void runTaskDistH ( HCB , FT , ARGS . . . ) ( TaskSettings settings , scope HCB on_handle , FT func , auto ref ARGS args )
2019-01-23 12:50:43 +00:00
{
// TODO: support non-copyable argument types using .move
import std.concurrency : send , receiveOnly ;
auto caller = Task . getThis ( ) ;
// workaround to work when called outside of a task
if ( caller = = Task . init ) {
. runTask ( { runTaskDistH ( on_handle , func , args ) ; } ) . join ( ) ;
return ;
}
static void call ( Task t , FT func , ARGS args ) {
t . tid . send ( Task . getThis ( ) ) ;
func ( args ) ;
}
2020-03-14 22:33:41 +00:00
runTaskDist ( settings , & call , caller , func , args ) ;
2019-01-23 12:50:43 +00:00
foreach ( i ; 0 . . this . threadCount )
on_handle ( receiveOnly ! Task ) ;
}
2020-03-14 18:47:20 +00:00
private void runTask_unsafe ( CALLABLE , ARGS . . . ) ( TaskSettings settings , CALLABLE callable , ref ARGS args )
2017-02-22 17:35:51 +00:00
{
import std.traits : ParameterTypeTuple ;
import vibe.internal.traits : areConvertibleTo ;
import vibe.internal.typetuple ;
alias FARGS = ParameterTypeTuple ! CALLABLE ;
static assert ( areConvertibleTo ! ( Group ! ARGS , Group ! FARGS ) ,
"Cannot convert arguments '" ~ ARGS . stringof ~ "' to function arguments '" ~ FARGS . stringof ~ "'." ) ;
2020-03-14 18:47:20 +00:00
m_state . lock . queue . put ( settings , callable , args ) ;
2017-02-22 17:35:51 +00:00
m_signal . emitSingle ( ) ;
}
2020-03-14 18:47:20 +00:00
private void runTaskDist_unsafe ( CALLABLE , ARGS . . . ) ( TaskSettings settings , ref CALLABLE callable , ARGS args ) // NOTE: no ref for args, to disallow non-copyable types!
2017-02-22 17:35:51 +00:00
{
import std.traits : ParameterTypeTuple ;
import vibe.internal.traits : areConvertibleTo ;
import vibe.internal.typetuple ;
alias FARGS = ParameterTypeTuple ! CALLABLE ;
static assert ( areConvertibleTo ! ( Group ! ARGS , Group ! FARGS ) ,
"Cannot convert arguments '" ~ ARGS . stringof ~ "' to function arguments '" ~ FARGS . stringof ~ "'." ) ;
2018-03-07 09:39:28 +00:00
{
auto st = m_state . lock ;
foreach ( thr ; st . threads ) {
// create one TFI per thread to properly account for elaborate assignment operators/postblit
2020-03-14 18:47:20 +00:00
thr . m_queue . put ( settings , callable , args ) ;
2018-03-07 09:39:28 +00:00
}
2017-02-22 17:35:51 +00:00
}
m_signal . emit ( ) ;
}
}
2017-07-23 13:04:11 +00:00
private final class WorkerThread : Thread {
2017-02-22 17:35:51 +00:00
private {
shared ( TaskPool ) m_pool ;
TaskQueue m_queue ;
}
this ( shared ( TaskPool ) pool )
{
m_pool = pool ;
2017-02-22 18:52:22 +00:00
m_queue . setup ( ) ;
2017-02-22 17:35:51 +00:00
super ( & main ) ;
}
private void main ( )
nothrow {
2017-07-18 20:51:09 +00:00
import core.stdc.stdlib : abort ;
2017-02-22 17:35:51 +00:00
import core.exception : InvalidMemoryOperationError ;
import std.encoding : sanitize ;
try {
if ( m_pool . m_state . lock . term ) return ;
logDebug ( "entering worker thread" ) ;
2018-10-02 08:42:10 +00:00
handleWorkerTasks ( ) ;
2017-02-22 17:35:51 +00:00
logDebug ( "Worker thread exit." ) ;
} catch ( Throwable th ) {
logFatal ( "Worker thread terminated due to uncaught error: %s" , th . msg ) ;
logDebug ( "Full error: %s" , th . toString ( ) . sanitize ( ) ) ;
2017-07-18 20:51:09 +00:00
abort ( ) ;
2017-02-22 17:35:51 +00:00
}
}
private void handleWorkerTasks ( )
nothrow @safe {
import std.algorithm.iteration : filter ;
2017-02-22 18:52:22 +00:00
import std.algorithm.mutation : swap ;
2017-02-22 17:35:51 +00:00
import std.algorithm.searching : count ;
import std.array : array ;
2019-10-22 09:30:31 +00:00
logTrace ( "worker thread enter" ) ;
2017-02-22 17:35:51 +00:00
TaskFuncInfo taskfunc ;
2018-10-26 22:42:06 +00:00
auto emit_count = m_pool . m_signal . emitCount ;
while ( true ) {
2017-02-22 17:35:51 +00:00
with ( m_pool . m_state . lock ) {
2019-10-22 09:30:31 +00:00
logTrace ( "worker thread check" ) ;
2017-02-22 17:35:51 +00:00
if ( term ) break ;
if ( m_queue . consume ( taskfunc ) ) {
2019-10-22 09:30:31 +00:00
logTrace ( "worker thread got specific task" ) ;
2017-02-22 17:35:51 +00:00
} else if ( queue . consume ( taskfunc ) ) {
2019-10-22 09:30:31 +00:00
logTrace ( "worker thread got unspecific task" ) ;
2017-02-22 17:35:51 +00:00
}
}
2017-02-22 18:52:22 +00:00
if ( taskfunc . func ! is null )
. runTask_internal ! ( ( ref tfi ) { swap ( tfi , taskfunc ) ; } ) ;
2017-02-22 17:35:51 +00:00
else emit_count = m_pool . m_signal . waitUninterruptible ( emit_count ) ;
}
2019-10-22 09:30:31 +00:00
logTrace ( "worker thread exit" ) ;
2017-02-22 17:35:51 +00:00
if ( ! m_queue . empty )
logWarn ( "Worker thread shuts down with specific worker tasks left in its queue." ) ;
with ( m_pool . m_state . lock ) {
threads = threads . filter ! ( t = > t ! is this ) . array ;
if ( threads . length > 0 & & ! queue . empty )
logWarn ( "Worker threads shut down with worker tasks still left in the queue." ) ;
}
}
}
private struct TaskQueue {
nothrow @safe :
2017-02-22 18:52:22 +00:00
// TODO: avoid use of GC
import vibe.internal.array : FixedRingBuffer ;
FixedRingBuffer ! TaskFuncInfo * m_queue ;
void setup ( )
{
m_queue = new FixedRingBuffer ! TaskFuncInfo ;
}
@property bool empty ( ) const { return m_queue . empty ; }
@property size_t length ( ) const { return m_queue . length ; }
2020-03-14 18:47:20 +00:00
void put ( CALLABLE , ARGS . . . ) ( TaskSettings settings , ref CALLABLE c , ref ARGS args )
2017-02-22 18:52:22 +00:00
{
import std.algorithm.comparison : max ;
if ( m_queue . full ) m_queue . capacity = max ( 16 , m_queue . capacity * 3 / 2 ) ;
assert ( ! m_queue . full ) ;
2020-03-14 18:47:20 +00:00
m_queue . peekDst [ 0 ] . settings = settings ;
2017-02-22 18:52:22 +00:00
m_queue . peekDst [ 0 ] . set ( c , args ) ;
m_queue . putN ( 1 ) ;
}
2017-02-22 17:35:51 +00:00
bool consume ( ref TaskFuncInfo tfi )
{
2017-03-27 14:31:52 +00:00
import std.algorithm.mutation : swap ;
2017-07-18 20:51:09 +00:00
2017-02-22 18:52:22 +00:00
if ( m_queue . empty ) return false ;
2017-03-27 14:31:52 +00:00
swap ( tfi , m_queue . front ) ;
m_queue . popFront ( ) ;
2017-02-22 17:35:51 +00:00
return true ;
}
}