Merge pull request #119 from vibe-d/issue-118-interruptible-task-mutex-use
Improve robustness of the sync module. Fixes #118. merged-on-behalf-of: Leonid Kramer <l-kramer@users.noreply.github.com>
This commit is contained in:
commit
8edca75696
|
@ -1123,9 +1123,9 @@ struct Future(T) {
|
||||||
used and the result is computed within a separate task within the calling thread.
|
used and the result is computed within a separate task within the calling thread.
|
||||||
|
|
||||||
Params:
|
Params:
|
||||||
callable: A callable value, can be either a function, a delegate, or a
|
callable = A callable value, can be either a function, a delegate, or a
|
||||||
user defined type that defines an $(D opCall).
|
user defined type that defines an $(D opCall).
|
||||||
args: Arguments to pass to the callable.
|
args = Arguments to pass to the callable.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Returns a $(D Future) object that can be used to access the result.
|
Returns a $(D Future) object that can be used to access the result.
|
||||||
|
|
|
@ -1,7 +1,17 @@
|
||||||
/**
|
/**
|
||||||
Interruptible Task synchronization facilities
|
Event loop compatible task synchronization facilities.
|
||||||
|
|
||||||
Copyright: © 2012-2016 RejectedSoftware e.K.
|
This module provides replacement primitives for the modules in `core.sync`
|
||||||
|
that do not block vibe.d's event loop in their wait states. These should
|
||||||
|
always be preferred over the ones in Druntime under usual circumstances.
|
||||||
|
|
||||||
|
Using a standard `Mutex` is possible as long as it is ensured that no event
|
||||||
|
loop based functionality (I/O, task interaction or anything that implicitly
|
||||||
|
calls `vibe.core.core.yield`) is executed within a section of code that is
|
||||||
|
protected by the mutex. $(B Failure to do so may result in dead-locks and
|
||||||
|
high-level race-conditions!)
|
||||||
|
|
||||||
|
Copyright: © 2012-2019 Sönke Ludwig
|
||||||
Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
|
Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
|
||||||
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
|
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
|
||||||
*/
|
*/
|
||||||
|
@ -33,11 +43,48 @@ shared(ManualEvent) createSharedManualEvent()
|
||||||
return shared(ManualEvent).init;
|
return shared(ManualEvent).init;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScopedMutexLock!M scopedMutexLock(M : Mutex)(M mutex, LockMode mode = LockMode.lock)
|
|
||||||
|
/** Performs RAII based locking/unlocking of a mutex.
|
||||||
|
|
||||||
|
Note that while `TaskMutex` can be used with D's built-in `synchronized`
|
||||||
|
statement, `InterruptibleTaskMutex` cannot. This function provides a
|
||||||
|
library based alternative that is suitable for use with all mutex types.
|
||||||
|
*/
|
||||||
|
ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
|
||||||
|
if (is(M : Mutex) || is(M : Lockable))
|
||||||
{
|
{
|
||||||
return ScopedMutexLock!M(mutex, mode);
|
return ScopedMutexLock!M(mutex, mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
unittest {
|
||||||
|
import vibe.core.core : runWorkerTaskH;
|
||||||
|
|
||||||
|
__gshared int counter;
|
||||||
|
__gshared InterruptibleTaskMutex mutex;
|
||||||
|
|
||||||
|
mutex = new InterruptibleTaskMutex;
|
||||||
|
|
||||||
|
Task[] tasks;
|
||||||
|
|
||||||
|
foreach (i; 0 .. 100) {
|
||||||
|
tasks ~= runWorkerTaskH({
|
||||||
|
auto l = scopedMutexLock(mutex);
|
||||||
|
counter++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (t; tasks) t.join();
|
||||||
|
|
||||||
|
assert(counter == 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
scopedMutexLock(new Mutex);
|
||||||
|
scopedMutexLock(new TaskMutex);
|
||||||
|
scopedMutexLock(new InterruptibleTaskMutex);
|
||||||
|
}
|
||||||
|
|
||||||
enum LockMode {
|
enum LockMode {
|
||||||
lock,
|
lock,
|
||||||
tryLock,
|
tryLock,
|
||||||
|
@ -53,7 +100,8 @@ interface Lockable {
|
||||||
|
|
||||||
/** RAII lock for the Mutex class.
|
/** RAII lock for the Mutex class.
|
||||||
*/
|
*/
|
||||||
struct ScopedMutexLock(M : Mutex = core.sync.mutex.Mutex)
|
struct ScopedMutexLock(M)
|
||||||
|
if (is(M : Mutex) || is(M : Lockable))
|
||||||
{
|
{
|
||||||
@disable this(this);
|
@disable this(this);
|
||||||
private {
|
private {
|
||||||
|
@ -274,7 +322,6 @@ final class LocalTaskSemaphore
|
||||||
*/
|
*/
|
||||||
final class TaskMutex : core.sync.mutex.Mutex, Lockable {
|
final class TaskMutex : core.sync.mutex.Mutex, Lockable {
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
private TaskMutexImpl!false m_impl;
|
private TaskMutexImpl!false m_impl;
|
||||||
|
|
||||||
this(Object o) { m_impl.setup(); super(o); }
|
this(Object o) { m_impl.setup(); super(o); }
|
||||||
|
@ -376,7 +423,13 @@ final class InterruptibleTaskMutex : Lockable {
|
||||||
|
|
||||||
private TaskMutexImpl!true m_impl;
|
private TaskMutexImpl!true m_impl;
|
||||||
|
|
||||||
this() { m_impl.setup(); }
|
this()
|
||||||
|
{
|
||||||
|
m_impl.setup();
|
||||||
|
|
||||||
|
// detects invalid usage within synchronized(...)
|
||||||
|
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
|
||||||
|
}
|
||||||
|
|
||||||
bool tryLock() nothrow { return m_impl.tryLock(); }
|
bool tryLock() nothrow { return m_impl.tryLock(); }
|
||||||
void lock() { m_impl.lock(); }
|
void lock() { m_impl.lock(); }
|
||||||
|
@ -393,11 +446,11 @@ unittest {
|
||||||
/**
|
/**
|
||||||
Recursive mutex implementation for tasks.
|
Recursive mutex implementation for tasks.
|
||||||
|
|
||||||
This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
|
This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
|
||||||
does not block the event loop when contention happens.
|
does not block the event loop when contention happens.
|
||||||
|
|
||||||
Notice:
|
Notice:
|
||||||
Because this class is annotated nothrow, it cannot be interrupted
|
Because this class is annotated `nothrow`, it cannot be interrupted
|
||||||
using $(D vibe.core.task.Task.interrupt()). The corresponding
|
using $(D vibe.core.task.Task.interrupt()). The corresponding
|
||||||
$(D InterruptException) will be deferred until the next blocking
|
$(D InterruptException) will be deferred until the next blocking
|
||||||
operation yields the event loop.
|
operation yields the event loop.
|
||||||
|
@ -405,7 +458,7 @@ unittest {
|
||||||
Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
|
Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
|
||||||
interrupted.
|
interrupted.
|
||||||
|
|
||||||
See_Also: TaskMutex, core.sync.mutex.Mutex
|
See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
|
||||||
*/
|
*/
|
||||||
final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
|
final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
|
||||||
@safe:
|
@safe:
|
||||||
|
@ -437,10 +490,15 @@ unittest {
|
||||||
*/
|
*/
|
||||||
final class InterruptibleRecursiveTaskMutex : Lockable {
|
final class InterruptibleRecursiveTaskMutex : Lockable {
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
private RecursiveTaskMutexImpl!true m_impl;
|
private RecursiveTaskMutexImpl!true m_impl;
|
||||||
|
|
||||||
this() { m_impl.setup(); }
|
this()
|
||||||
|
{
|
||||||
|
m_impl.setup();
|
||||||
|
|
||||||
|
// detects invalid usage within synchronized(...)
|
||||||
|
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
|
||||||
|
}
|
||||||
|
|
||||||
bool tryLock() { return m_impl.tryLock(); }
|
bool tryLock() { return m_impl.tryLock(); }
|
||||||
void lock() { m_impl.lock(); }
|
void lock() { m_impl.lock(); }
|
||||||
|
@ -453,6 +511,39 @@ unittest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Helper class to ensure that the non Object.Monitor compatible interruptible
|
||||||
|
// mutex classes are not accidentally used with the `synchronized` statement
|
||||||
|
private final class NoUseMonitor : Object.Monitor {
|
||||||
|
private static shared Proxy st_instance;
|
||||||
|
|
||||||
|
static struct Proxy {
|
||||||
|
Object.Monitor monitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
static @property ref shared(Proxy) instance()
|
||||||
|
@safe nothrow {
|
||||||
|
static shared(Proxy)* inst = null;
|
||||||
|
if (inst) return *inst;
|
||||||
|
|
||||||
|
() @trusted { // synchronized {} not @safe for DMD <= 2.078.3
|
||||||
|
synchronized {
|
||||||
|
if (!st_instance.monitor)
|
||||||
|
st_instance.monitor = new shared NoUseMonitor;
|
||||||
|
inst = &st_instance;
|
||||||
|
}
|
||||||
|
} ();
|
||||||
|
|
||||||
|
return *inst;
|
||||||
|
}
|
||||||
|
|
||||||
|
override void lock() @safe @nogc nothrow {
|
||||||
|
assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
|
||||||
|
}
|
||||||
|
|
||||||
|
override void unlock() @safe @nogc nothrow {}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void runMutexUnitTests(M)()
|
private void runMutexUnitTests(M)()
|
||||||
{
|
{
|
||||||
import vibe.core.core;
|
import vibe.core.core;
|
||||||
|
@ -656,8 +747,14 @@ final class InterruptibleTaskCondition {
|
||||||
|
|
||||||
private TaskConditionImpl!(true, Lockable) m_impl;
|
private TaskConditionImpl!(true, Lockable) m_impl;
|
||||||
|
|
||||||
this(core.sync.mutex.Mutex mtx) { m_impl.setup(mtx); }
|
this(M)(M mutex)
|
||||||
this(Lockable mtx) { m_impl.setup(mtx); }
|
if (is(M : Mutex) || is (M : Lockable))
|
||||||
|
{
|
||||||
|
static if (is(M : Lockable))
|
||||||
|
m_impl.setup(mutex);
|
||||||
|
else
|
||||||
|
m_impl.setupForMutex(mutex);
|
||||||
|
}
|
||||||
|
|
||||||
@property Lockable mutex() { return m_impl.mutex; }
|
@property Lockable mutex() { return m_impl.mutex; }
|
||||||
void wait() { m_impl.wait(); }
|
void wait() { m_impl.wait(); }
|
||||||
|
@ -666,6 +763,12 @@ final class InterruptibleTaskCondition {
|
||||||
void notifyAll() { m_impl.notifyAll(); }
|
void notifyAll() { m_impl.notifyAll(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
new InterruptibleTaskCondition(new Mutex);
|
||||||
|
new InterruptibleTaskCondition(new TaskMutex);
|
||||||
|
new InterruptibleTaskCondition(new InterruptibleTaskMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** A manually triggered single threaded cross-task event.
|
/** A manually triggered single threaded cross-task event.
|
||||||
|
|
||||||
|
@ -1541,7 +1644,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
||||||
@trusted bool tryLock() { return m_mutex.tryLock(); }
|
@trusted bool tryLock() { return m_mutex.tryLock(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
void setup(core.sync.mutex.Mutex mtx)
|
void setupForMutex(core.sync.mutex.Mutex mtx)
|
||||||
{
|
{
|
||||||
setup(new MutexWrapper(mtx));
|
setup(new MutexWrapper(mtx));
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ shared final class TaskPool {
|
||||||
/** Creates a new task pool with the specified number of threads.
|
/** Creates a new task pool with the specified number of threads.
|
||||||
|
|
||||||
Params:
|
Params:
|
||||||
thread_count: The number of worker threads to create
|
thread_count = The number of worker threads to create
|
||||||
*/
|
*/
|
||||||
this(size_t thread_count = logicalProcessorCount())
|
this(size_t thread_count = logicalProcessorCount())
|
||||||
@safe {
|
@safe {
|
||||||
|
|
Loading…
Reference in a new issue