diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index 28ab92e..e0d1326 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -1123,9 +1123,9 @@ struct Future(T) { used and the result is computed within a separate task within the calling thread. Params: - callable: A callable value, can be either a function, a delegate, or a + callable = A callable value, can be either a function, a delegate, or a user defined type that defines an $(D opCall). - args: Arguments to pass to the callable. + args = Arguments to pass to the callable. Returns: Returns a $(D Future) object that can be used to access the result. diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 20b50d6..724a769 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1,7 +1,17 @@ /** - Interruptible Task synchronization facilities + Event loop compatible task synchronization facilities. - Copyright: © 2012-2016 RejectedSoftware e.K. + This module provides replacement primitives for the modules in `core.sync` + that do not block vibe.d's event loop in their wait states. These should + always be preferred over the ones in Druntime under usual circumstances. + + Using a standard `Mutex` is possible as long as it is ensured that no event + loop based functionality (I/O, task interaction or anything that implicitly + calls `vibe.core.core.yield`) is executed within a section of code that is + protected by the mutex. $(B Failure to do so may result in dead-locks and + high-level race-conditions!) + + Copyright: © 2012-2019 Sönke Ludwig Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. */ @@ -33,11 +43,48 @@ shared(ManualEvent) createSharedManualEvent() return shared(ManualEvent).init; } -ScopedMutexLock!M scopedMutexLock(M : Mutex)(M mutex, LockMode mode = LockMode.lock) + +/** Performs RAII based locking/unlocking of a mutex. + + Note that while `TaskMutex` can be used with D's built-in `synchronized` + statement, `InterruptibleTaskMutex` cannot. This function provides a + library based alternative that is suitable for use with all mutex types. +*/ +ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock) + if (is(M : Mutex) || is(M : Lockable)) { return ScopedMutexLock!M(mutex, mode); } +/// +unittest { + import vibe.core.core : runWorkerTaskH; + + __gshared int counter; + __gshared InterruptibleTaskMutex mutex; + + mutex = new InterruptibleTaskMutex; + + Task[] tasks; + + foreach (i; 0 .. 100) { + tasks ~= runWorkerTaskH({ + auto l = scopedMutexLock(mutex); + counter++; + }); + } + + foreach (t; tasks) t.join(); + + assert(counter == 100); +} + +unittest { + scopedMutexLock(new Mutex); + scopedMutexLock(new TaskMutex); + scopedMutexLock(new InterruptibleTaskMutex); +} + enum LockMode { lock, tryLock, @@ -53,7 +100,8 @@ interface Lockable { /** RAII lock for the Mutex class. */ -struct ScopedMutexLock(M : Mutex = core.sync.mutex.Mutex) +struct ScopedMutexLock(M) + if (is(M : Mutex) || is(M : Lockable)) { @disable this(this); private { @@ -274,7 +322,6 @@ final class LocalTaskSemaphore */ final class TaskMutex : core.sync.mutex.Mutex, Lockable { @safe: - private TaskMutexImpl!false m_impl; this(Object o) { m_impl.setup(); super(o); } @@ -376,7 +423,13 @@ final class InterruptibleTaskMutex : Lockable { private TaskMutexImpl!true m_impl; - this() { m_impl.setup(); } + this() + { + m_impl.setup(); + + // detects invalid usage within synchronized(...) + () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); + } bool tryLock() nothrow { return m_impl.tryLock(); } void lock() { m_impl.lock(); } @@ -393,11 +446,11 @@ unittest { /** Recursive mutex implementation for tasks. - This mutex type can be used in exchange for a core.sync.mutex.Mutex, but + This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but does not block the event loop when contention happens. Notice: - Because this class is annotated nothrow, it cannot be interrupted + Because this class is annotated `nothrow`, it cannot be interrupted using $(D vibe.core.task.Task.interrupt()). The corresponding $(D InterruptException) will be deferred until the next blocking operation yields the event loop. @@ -405,7 +458,7 @@ unittest { Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be interrupted. - See_Also: TaskMutex, core.sync.mutex.Mutex + See_Also: `TaskMutex`, `core.sync.mutex.Mutex` */ final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable { @safe: @@ -437,10 +490,15 @@ unittest { */ final class InterruptibleRecursiveTaskMutex : Lockable { @safe: - private RecursiveTaskMutexImpl!true m_impl; - this() { m_impl.setup(); } + this() + { + m_impl.setup(); + + // detects invalid usage within synchronized(...) + () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); + } bool tryLock() { return m_impl.tryLock(); } void lock() { m_impl.lock(); } @@ -453,6 +511,39 @@ unittest { } +// Helper class to ensure that the non Object.Monitor compatible interruptible +// mutex classes are not accidentally used with the `synchronized` statement +private final class NoUseMonitor : Object.Monitor { + private static shared Proxy st_instance; + + static struct Proxy { + Object.Monitor monitor; + } + + static @property ref shared(Proxy) instance() + @safe nothrow { + static shared(Proxy)* inst = null; + if (inst) return *inst; + + () @trusted { // synchronized {} not @safe for DMD <= 2.078.3 + synchronized { + if (!st_instance.monitor) + st_instance.monitor = new shared NoUseMonitor; + inst = &st_instance; + } + } (); + + return *inst; + } + + override void lock() @safe @nogc nothrow { + assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead."); + } + + override void unlock() @safe @nogc nothrow {} +} + + private void runMutexUnitTests(M)() { import vibe.core.core; @@ -656,8 +747,14 @@ final class InterruptibleTaskCondition { private TaskConditionImpl!(true, Lockable) m_impl; - this(core.sync.mutex.Mutex mtx) { m_impl.setup(mtx); } - this(Lockable mtx) { m_impl.setup(mtx); } + this(M)(M mutex) + if (is(M : Mutex) || is (M : Lockable)) + { + static if (is(M : Lockable)) + m_impl.setup(mutex); + else + m_impl.setupForMutex(mutex); + } @property Lockable mutex() { return m_impl.mutex; } void wait() { m_impl.wait(); } @@ -666,6 +763,12 @@ final class InterruptibleTaskCondition { void notifyAll() { m_impl.notifyAll(); } } +unittest { + new InterruptibleTaskCondition(new Mutex); + new InterruptibleTaskCondition(new TaskMutex); + new InterruptibleTaskCondition(new InterruptibleTaskMutex); +} + /** A manually triggered single threaded cross-task event. @@ -1541,7 +1644,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { @trusted bool tryLock() { return m_mutex.tryLock(); } } - void setup(core.sync.mutex.Mutex mtx) + void setupForMutex(core.sync.mutex.Mutex mtx) { setup(new MutexWrapper(mtx)); } diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 87b0c32..6d85596 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -35,7 +35,7 @@ shared final class TaskPool { /** Creates a new task pool with the specified number of threads. Params: - thread_count: The number of worker threads to create + thread_count = The number of worker threads to create */ this(size_t thread_count = logicalProcessorCount()) @safe {