vibe-core/source/vibe/core/sync.d
Sönke Ludwig 34703d412d Make ScopedMutexLock nothrow.
Wrong order of lock/unlock calls are programming errors and should be treated as such.
2020-12-12 10:11:17 +01:00

2196 lines
58 KiB
D

/**
Event loop compatible task synchronization facilities.
This module provides replacement primitives for the modules in `core.sync`
that do not block vibe.d's event loop in their wait states. These should
always be preferred over the ones in Druntime under usual circumstances.
Using a standard `Mutex` is possible as long as it is ensured that no event
loop based functionality (I/O, task interaction or anything that implicitly
calls `vibe.core.core.yield`) is executed within a section of code that is
protected by the mutex. $(B Failure to do so may result in dead-locks and
high-level race-conditions!)
Copyright: © 2012-2019 Sönke Ludwig
Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
*/
module vibe.core.sync;
import vibe.core.log : logDebugV, logTrace, logInfo;
import vibe.core.task;
import core.atomic;
import core.sync.mutex;
import core.sync.condition;
import eventcore.core;
import std.exception;
import std.stdio;
import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers.
*/
LocalManualEvent createManualEvent()
@safe nothrow {
LocalManualEvent ret;
ret.initialize();
return ret;
}
/// ditto
shared(ManualEvent) createSharedManualEvent()
@trusted nothrow {
shared(ManualEvent) ret;
ret.initialize();
return ret;
}
/** Performs RAII based locking/unlocking of a mutex.
Note that while `TaskMutex` can be used with D's built-in `synchronized`
statement, `InterruptibleTaskMutex` cannot. This function provides a
library based alternative that is suitable for use with all mutex types.
*/
ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
if (is(M : Mutex) || is(M : Lockable))
{
return ScopedMutexLock!M(mutex, mode);
}
///
unittest {
import vibe.core.core : runWorkerTaskH;
__gshared int counter;
__gshared InterruptibleTaskMutex mutex;
mutex = new InterruptibleTaskMutex;
Task[] tasks;
foreach (i; 0 .. 100) {
tasks ~= runWorkerTaskH({
auto l = scopedMutexLock(mutex);
counter++;
});
}
foreach (t; tasks) t.join();
assert(counter == 100);
}
unittest {
scopedMutexLock(new Mutex);
scopedMutexLock(new TaskMutex);
scopedMutexLock(new InterruptibleTaskMutex);
}
enum LockMode {
lock,
tryLock,
defer
}
interface Lockable {
@safe:
void lock();
void unlock();
bool tryLock();
}
/** RAII lock for the Mutex class.
*/
struct ScopedMutexLock(M)
if (is(M : Mutex) || is(M : Lockable))
{
@disable this(this);
private {
M m_mutex;
bool m_locked;
LockMode m_mode;
}
this(M mutex, LockMode mode = LockMode.lock) {
assert(mutex !is null);
m_mutex = mutex;
final switch (mode) {
case LockMode.lock: lock(); break;
case LockMode.tryLock: tryLock(); break;
case LockMode.defer: break;
}
}
~this()
{
if( m_locked )
m_mutex.unlock();
}
@property bool locked() const { return m_locked; }
void unlock()
in (this.locked)
do {
enforce(m_locked);
m_mutex.unlock();
m_locked = false;
}
bool tryLock()
in (!this.locked)
do {
return m_locked = m_mutex.tryLock();
}
void lock()
in (!this.locked)
do {
m_locked = true;
m_mutex.lock();
}
}
/*
Only for internal use:
Ensures that a mutex is locked while executing the given procedure.
This function works for all kinds of mutexes, in particular for
$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
Returns:
Returns the value returned from $(D PROC), if any.
*/
/// private
ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
{
mutex.lock();
scope (exit) mutex.unlock();
return PROC();
}
///
unittest {
int protected_var = 0;
auto mtx = new TaskMutex;
mtx.performLocked!({
protected_var++;
});
}
/**
Thread-local semaphore implementation for tasks.
When the semaphore runs out of concurrent locks, it will suspend. This class
is used in `vibe.core.connectionpool` to limit the number of concurrent
connections.
*/
final class LocalTaskSemaphore
{
@safe:
// requires a queue
import std.container.binaryheap;
import std.container.array;
//import vibe.utils.memory;
private {
static struct ThreadWaiter {
ubyte priority;
uint seq;
}
BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
uint m_maxLocks;
uint m_locks;
uint m_seq;
LocalManualEvent m_signal;
}
this(uint max_locks)
{
m_maxLocks = max_locks;
m_signal = createManualEvent();
}
/// Maximum number of concurrent locks
@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
/// ditto
@property uint maxLocks() const { return m_maxLocks; }
/// Number of concurrent locks still available
@property uint available() const { return m_maxLocks - m_locks; }
/** Try to acquire a lock.
If a lock cannot be acquired immediately, returns `false` and leaves the
semaphore in its previous state.
Returns:
`true` is returned $(I iff) the number of available locks is greater
than one.
*/
bool tryLock()
{
if (available > 0)
{
m_locks++;
return true;
}
return false;
}
/** Acquires a lock.
Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit.
*/
void lock(ubyte priority = 0)
{
import std.algorithm.comparison : min;
if (tryLock())
return;
ThreadWaiter w;
w.priority = priority;
w.seq = min(0, m_seq - w.priority);
if (++m_seq == uint.max)
rewindSeq();
() @trusted { m_waiters.insert(w); } ();
while (true) {
m_signal.waitUninterruptible();
if (m_waiters.front.seq == w.seq && tryLock()) {
return;
}
}
}
/** Gives up an existing lock.
*/
void unlock()
{
assert(m_locks >= 1);
m_locks--;
if (m_waiters.length > 0)
m_signal.emit(); // resume one
}
// if true, a goes after b. ie. b comes out front()
/// private
static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
{
if (a.priority != b.priority)
return a.priority < b.priority;
return a.seq > b.seq;
}
private void rewindSeq()
@trusted {
Array!ThreadWaiter waiters = m_waiters.release();
ushort min_seq;
import std.algorithm : min;
foreach (ref waiter; waiters[])
min_seq = min(waiter.seq, min_seq);
foreach (ref waiter; waiters[])
waiter.seq -= min_seq;
m_waiters.assume(waiters);
}
}
/**
Mutex implementation for fibers.
This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
does not block the event loop when contention happens. Note that this
mutex does not allow recursive locking.
Notice:
Because this class is annotated nothrow, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields the event loop.
Use $(D InterruptibleTaskMutex) as an alternative that can be
interrupted.
See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
*/
final class TaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private TaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); }
this() { m_impl.setup(); }
override bool tryLock() nothrow { return m_impl.tryLock(); }
override void lock() nothrow { m_impl.lock(); }
override void unlock() nothrow { m_impl.unlock(); }
}
unittest {
auto mutex = new TaskMutex;
{
auto lock = scopedMutexLock(mutex);
assert(lock.locked);
assert(mutex.m_impl.m_locked);
auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
assert(!lock2.locked);
}
assert(!mutex.m_impl.m_locked);
auto lock = scopedMutexLock(mutex, LockMode.tryLock);
assert(lock.locked);
lock.unlock();
assert(!lock.locked);
synchronized(mutex){
assert(mutex.m_impl.m_locked);
}
assert(!mutex.m_impl.m_locked);
mutex.performLocked!({
assert(mutex.m_impl.m_locked);
});
assert(!mutex.m_impl.m_locked);
static if (__VERSION__ >= 2067) {
with(mutex.scopedMutexLock) {
assert(mutex.m_impl.m_locked);
}
}
}
unittest { // test deferred throwing
import vibe.core.core;
auto mutex = new TaskMutex;
auto t1 = runTask({
scope (failure) assert(false, "No exception expected in first task!");
mutex.lock();
scope (exit) mutex.unlock();
sleep(20.msecs);
});
auto t2 = runTask({
mutex.lock();
scope (exit) mutex.unlock();
try {
yield();
assert(false, "Yield is supposed to have thrown an InterruptException.");
} catch (InterruptException) {
// as expected!
} catch (Exception) {
assert(false, "Only InterruptException supposed to be thrown!");
}
});
runTask({
// mutex is now locked in first task for 20 ms
// the second tasks is waiting in lock()
t2.interrupt();
t1.join();
t2.join();
assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
exitEventLoop();
});
runEventLoop();
}
unittest {
runMutexUnitTests!TaskMutex();
}
/**
Alternative to $(D TaskMutex) that supports interruption.
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
waiting in the $(D lock()) method. However, because the interface is not
$(D nothrow), it cannot be used as an object monitor.
See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
*/
final class InterruptibleTaskMutex : Lockable {
@safe:
private TaskMutexImpl!true m_impl;
this()
{
m_impl.setup();
// detects invalid usage within synchronized(...)
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
}
bool tryLock() nothrow { return m_impl.tryLock(); }
void lock() { m_impl.lock(); }
void unlock() nothrow { m_impl.unlock(); }
}
unittest {
runMutexUnitTests!InterruptibleTaskMutex();
}
/**
Recursive mutex implementation for tasks.
This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
does not block the event loop when contention happens.
Notice:
Because this class is annotated `nothrow`, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields the event loop.
Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
interrupted.
See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
*/
final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private RecursiveTaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); }
this() { m_impl.setup(); }
override bool tryLock() { return m_impl.tryLock(); }
override void lock() { m_impl.lock(); }
override void unlock() { m_impl.unlock(); }
}
unittest {
runMutexUnitTests!RecursiveTaskMutex();
}
/**
Alternative to $(D RecursiveTaskMutex) that supports interruption.
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
waiting in the $(D lock()) method. However, because the interface is not
$(D nothrow), it cannot be used as an object monitor.
See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
*/
final class InterruptibleRecursiveTaskMutex : Lockable {
@safe:
private RecursiveTaskMutexImpl!true m_impl;
this()
{
m_impl.setup();
// detects invalid usage within synchronized(...)
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
}
bool tryLock() { return m_impl.tryLock(); }
void lock() { m_impl.lock(); }
void unlock() { m_impl.unlock(); }
}
unittest {
runMutexUnitTests!InterruptibleRecursiveTaskMutex();
}
// Helper class to ensure that the non Object.Monitor compatible interruptible
// mutex classes are not accidentally used with the `synchronized` statement
private final class NoUseMonitor : Object.Monitor {
private static shared Proxy st_instance;
static struct Proxy {
Object.Monitor monitor;
}
static @property ref shared(Proxy) instance()
@safe nothrow {
static shared(Proxy)* inst = null;
if (inst) return *inst;
() @trusted { // synchronized {} not @safe for DMD <= 2.078.3
synchronized {
if (!st_instance.monitor)
st_instance.monitor = new shared NoUseMonitor;
inst = &st_instance;
}
} ();
return *inst;
}
override void lock() @safe @nogc nothrow {
assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
}
override void unlock() @safe @nogc nothrow {}
}
private void runMutexUnitTests(M)()
{
import vibe.core.core;
auto m = new M;
Task t1, t2;
void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
assert(!m.m_impl.m_locked);
// t1 starts first and acquires the mutex for 20 ms
// t2 starts second and has to wait in m.lock()
t1 = runTask({
assert(!m.m_impl.m_locked);
m.lock();
assert(m.m_impl.m_locked);
if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs));
else assertNotThrown(sleep(20.msecs));
m.unlock();
});
t2 = runTask({
assert(!m.tryLock());
if (interrupt_t2) {
try m.lock();
catch (InterruptException) return;
try yield(); // rethrows any deferred exceptions
catch (InterruptException) {
m.unlock();
return;
}
assert(false, "Supposed to have thrown an InterruptException.");
} else assertNotThrown(m.lock());
assert(m.m_impl.m_locked);
sleep(20.msecs);
m.unlock();
assert(!m.m_impl.m_locked);
});
}
// basic lock test
m.performLocked!({
assert(m.m_impl.m_locked);
});
assert(!m.m_impl.m_locked);
// basic contention test
runContendedTasks(false, false);
auto t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t1.join();
assert(!t1.running && t2.running);
yield(); // give t2 a chance to take the lock
assert(m.m_impl.m_locked);
t2.join();
assert(!t2.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
// interruption test #1
runContendedTasks(true, false);
t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t1.interrupt();
t1.join();
assert(!t1.running && t2.running);
yield(); // give t2 a chance to take the lock
assert(m.m_impl.m_locked);
t2.join();
assert(!t2.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
// interruption test #2
runContendedTasks(false, true);
t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t2.interrupt();
t2.join();
assert(!t2.running);
static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
assert(t1.running && m.m_impl.m_locked);
t1.join();
assert(!t1.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
}
/**
Event loop based condition variable or "event" implementation.
This class can be used in exchange for a $(D core.sync.condition.Condition)
to avoid blocking the event loop when waiting.
Notice:
Because this class is annotated nothrow, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields to the event loop.
Use $(D InterruptibleTaskCondition) as an alternative that can be
interrupted.
Note that it is generally not safe to use a `TaskCondition` together with an
interruptible mutex type.
See_Also: `InterruptibleTaskCondition`
*/
final class TaskCondition : core.sync.condition.Condition {
@safe:
private TaskConditionImpl!(false, Mutex) m_impl;
this(core.sync.mutex.Mutex mtx)
{
assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
"TaskCondition can only be used with Mutex or TaskMutex");
m_impl.setup(mtx);
super(mtx);
}
override @property Mutex mutex() nothrow { return m_impl.mutex; }
override void wait() nothrow { m_impl.wait(); }
override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
override void notify() nothrow { m_impl.notify(); }
override void notifyAll() nothrow { m_impl.notifyAll(); }
}
unittest {
new TaskCondition(new Mutex);
new TaskCondition(new TaskMutex);
}
/** This example shows the typical usage pattern using a `while` loop to make
sure that the final condition is reached.
*/
unittest {
import vibe.core.core;
import vibe.core.log;
__gshared Mutex mutex;
__gshared TaskCondition condition;
__gshared int workers_still_running = 0;
// setup the task condition
mutex = new Mutex;
condition = new TaskCondition(mutex);
logDebug("SETTING UP TASKS");
// start up the workers and count how many are running
foreach (i; 0 .. 4) {
workers_still_running++;
runWorkerTask({
// simulate some work
sleep(100.msecs);
// notify the waiter that we're finished
synchronized (mutex) {
workers_still_running--;
logDebug("DECREMENT %s", workers_still_running);
}
logDebug("NOTIFY");
condition.notify();
});
}
logDebug("STARTING WAIT LOOP");
// wait until all tasks have decremented the counter back to zero
synchronized (mutex) {
while (workers_still_running > 0) {
logDebug("STILL running %s", workers_still_running);
condition.wait();
}
}
}
/**
Alternative to `TaskCondition` that supports interruption.
This class supports the use of `vibe.core.task.Task.interrupt()` while
waiting in the `wait()` method.
See `TaskCondition` for an example.
Notice:
Note that it is generally not safe to use an
`InterruptibleTaskCondition` together with an interruptible mutex type.
See_Also: `TaskCondition`
*/
final class InterruptibleTaskCondition {
@safe:
private TaskConditionImpl!(true, Lockable) m_impl;
this(M)(M mutex)
if (is(M : Mutex) || is (M : Lockable))
{
static if (is(M : Lockable))
m_impl.setup(mutex);
else
m_impl.setupForMutex(mutex);
}
@property Lockable mutex() { return m_impl.mutex; }
void wait() { m_impl.wait(); }
bool wait(Duration timeout) { return m_impl.wait(timeout); }
void notify() { m_impl.notify(); }
void notifyAll() { m_impl.notifyAll(); }
}
unittest {
new InterruptibleTaskCondition(new Mutex);
new InterruptibleTaskCondition(new TaskMutex);
new InterruptibleTaskCondition(new InterruptibleTaskMutex);
}
/** A manually triggered single threaded cross-task event.
Note: the ownership can be shared between multiple fibers of the same thread.
*/
struct LocalManualEvent {
import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
@safe:
private {
alias Waiter = ThreadLocalWaiter!false;
Waiter m_waiter;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
private void initialize()
nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe;
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
}
this(this)
{
if (m_waiter)
return m_waiter.addRef();
}
~this()
nothrow {
import vibe.internal.allocator : Mallocator, disposeGCSafe;
if (m_waiter) {
if (!m_waiter.releaseRef()) {
static if (__VERSION__ < 2087) scope (failure) assert(false);
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
}
}
}
bool opCast() const nothrow { return m_waiter !is null; }
/// A counter that is increased with every emit() call
int emitCount() const nothrow { return m_waiter.m_emitCount; }
/// Emits the signal, waking up all owners of the signal.
int emit()
nothrow {
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
logTrace("unshared emit");
auto ec = m_waiter.m_emitCount++;
m_waiter.emit();
return ec;
}
/// Emits the signal, waking up a single owners of the signal.
int emitSingle()
nothrow {
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
logTrace("unshared single emit");
auto ec = m_waiter.m_emitCount++;
m_waiter.emitSingle();
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait() { return wait(this.emitCount); }
/** Acquires ownership and waits until the signal is emitted and the emit
count is larger than a given one.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
/// ditto
int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
private int doWait(bool interruptible)(Duration timeout, int emit_count)
{
import core.time : MonoTime;
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
while (m_waiter.m_emitCount - emit_count <= 0) {
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
return m_waiter.m_emitCount;
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
auto e = createManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
auto e = createManualEvent();
// integer overflow test
e.m_waiter.m_emitCount = int.max;
auto w1 = runTask({ e.wait(50.msecs, e.emitCount); });
runTask({
sleep(5.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running);
exitEventLoop();
});
runEventLoop();
}
unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
LocalManualEvent l = createManualEvent();
Task t2;
runTask({
l.wait();
t2.interrupt();
sleep(20.msecs);
exitEventLoop();
});
t2 = runTask({
try {
l.wait();
assert(false, "Shouldn't reach this.");
} catch (InterruptException e) {}
});
runTask({
l.emit();
});
runEventLoop();
}
unittest { // ensure that LocalManualEvent behaves correctly after being copied
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
LocalManualEvent l = createManualEvent();
runTask({
auto lc = l;
sleep(100.msecs);
lc.emit();
});
runTask({
assert(l.wait(1.seconds, l.emitCount));
exitEventLoop();
});
runEventLoop();
}
/** A manually triggered multi threaded cross-task event.
Note: the ownership can be shared between multiple fibers and threads.
*/
struct ManualEvent {
import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
import vibe.internal.list : StackSList;
@safe:
private {
alias ThreadWaiter = ThreadLocalWaiter!true;
int m_emitCount;
static struct Waiters {
StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(Mutex)) m_waiters;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
enum EmitMode {
single,
all
}
@disable this(this);
private void initialize()
shared nothrow {
m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
}
deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; }
/// A counter that is increased with every emit() call
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal.
int emit()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadWaiter lw;
auto drv = eventDriver;
m_waiters.lock.active.filter((ThreadWaiter w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.m_driver is drv) {
lw = w;
lw.addRef();
} else {
try {
assert(w.m_event != EventID.init);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg);
}
return true;
});
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) {
lw.emit();
releaseWaiter(lw);
}
logTrace("emit shared done");
return ec;
}
/// Emits the signal, waking up at least one waiting task
int emitSingle()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadWaiter lw;
auto drv = eventDriver;
m_waiters.lock.active.iterate((ThreadWaiter w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.m_driver is drv) {
if (w.unused) return true;
lw = w;
lw.addRef();
} else {
try {
assert(w.m_event != EventID.invalid);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg);
}
return false;
});
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) {
lw.emitSingle();
releaseWaiter(lw);
}
logTrace("emit shared done");
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the
given one or until a timeout is reached.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
/// ditto
int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
shared {
import core.time : MonoTime;
() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
if (ms_threadEvent is EventID.invalid) {
ms_threadEvent = eventDriver.events.create();
assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
}
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
int ec = this.emitCount;
acquireThreadWaiter((scope ThreadWaiter w) {
while (ec - emit_count <= 0) {
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
ec = this.emitCount;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
}
});
return ec;
}
private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : processAllocator, makeGCSafe;
ThreadWaiter w;
auto drv = eventDriver;
with (m_waiters.lock) {
active.iterate((aw) {
if (aw.m_driver is drv) {
w = aw;
w.addRef();
return false;
}
return true;
});
if (!w) {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
});
if (!w) {
() @trusted {
try {
w = processAllocator.makeGCSafe!ThreadWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
} catch (Exception e) {
assert(false, "Failed to allocate thread waiter.");
}
} ();
}
assert(w.m_refCount == 1);
active.add(w);
}
}
scope (exit) releaseWaiter(w);
del(w);
}
private void releaseWaiter(ThreadWaiter w)
shared nothrow {
if (!w.releaseRef()) {
assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
assert(w.unused, "Waiter still used, but not referenced!?");
with (m_waiters.lock) {
auto rmvd = active.remove(w);
assert(rmvd, "Waiter not in active queue anymore!?");
free.add(w);
// TODO: cap size of m_freeWaiters
}
}
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
auto e = createSharedManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); }
auto w2 = runWorkerTaskH(&w, &e);
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
unittest {
import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
import vibe.core.taskpool : TaskPool;
import core.time : msecs, usecs;
import std.concurrency : send, receiveOnly;
import std.random : uniform;
auto tpool = new shared TaskPool(4);
scope (exit) tpool.terminate();
static void test(shared(ManualEvent)* evt, Task owner)
{
owner.tid.send(Task.getThis());
int ec = evt.emitCount;
auto thist = Task.getThis();
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
while (ec < 5_000) {
tm.rearm(500.msecs);
sleep(uniform(0, 10_000).usecs);
if (uniform(0, 10) == 0) evt.emit();
auto ecn = evt.wait(ec);
assert(ecn > ec);
ec = ecn;
}
}
auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
scope (exit) watchdog.stop();
auto e = createSharedManualEvent();
Task[] tasks;
runTask({
auto thist = Task.getThis();
// start 25 tasks in each thread
foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
// collect all task handles
foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
int pec = 0;
while (e.emitCount < 5_000) {
tm.rearm(500.msecs);
sleep(50.usecs);
e.emit();
}
// wait for all worker tasks to finish
foreach (t; tasks) t.join();
}).join();
}
package shared struct Monitor(T, M)
{
alias Mutex = M;
alias Data = T;
private {
Mutex mutex;
Data data;
}
static struct Locked {
shared(Monitor)* m;
@disable this(this);
~this() {
() @trusted {
static if (is(typeof(Mutex.init.unlock_nothrow())))
(cast(Mutex)m.mutex).unlock_nothrow();
else (cast(Mutex)m.mutex).unlock();
} ();
}
ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
alias get this;
}
Locked lock() {
() @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return Locked(() @trusted { return &this; } ());
}
const(Locked) lock() const {
() @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return const(Locked)(() @trusted { return &this; } ());
}
}
package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
@trusted {
shared(Monitor!(T, M)) ret;
ret.mutex = cast(shared)mutex;
return ret;
}
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList;
private {
static struct TaskWaiter {
TaskWaiter* prev, next;
void delegate() @safe nothrow notifier;
void wait(void delegate() @safe nothrow del) @safe nothrow {
assert(notifier is null, "Local waiter is used twice!");
notifier = del;
}
void cancel() @safe nothrow { notifier = null; }
void emit() @safe nothrow { auto n = notifier; notifier = null; n(); }
}
static if (EVENT_TRIGGERED) {
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
} else {
int m_emitCount = 0;
}
int m_refCount = 1;
TaskWaiter m_pivot;
TaskWaiter m_emitPivot;
CircularDList!(TaskWaiter*) m_waiters;
}
this()
{
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
}
static if (EVENT_TRIGGERED) {
~this()
{
import vibe.core.internal.release : releaseHandle;
if (m_event != EventID.invalid)
releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
}
}
@property bool unused() const @safe nothrow { return m_waiters.empty; }
void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
@safe {
import core.time : MonoTime;
import vibe.internal.async : Waitable, asyncAwaitAny;
TaskWaiter waiter_store;
TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
m_waiters.insertBack(waiter);
assert(waiter.next !is null);
scope (exit)
if (waiter.next !is null) {
m_waiters.remove(waiter);
assert(!waiter.next);
}
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
bool cancelled;
alias waitable = Waitable!(typeof(TaskWaiter.notifier),
(cb) { waiter.wait(cb); },
(cb) { cancelled = true; waiter.cancel(); },
() {}
);
alias ewaitable = Waitable!(EventCallback,
(cb) {
eventDriver.events.wait(evt, cb);
// check for exit condition *after* starting to wait for the event
// to avoid a race condition
if (exit_condition()) {
eventDriver.events.cancelWait(evt, cb);
cb(evt);
}
},
(cb) { eventDriver.events.cancelWait(evt, cb); },
(EventID) {}
);
if (evt != EventID.invalid) {
asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout);
} else {
asyncAwaitAny!(interruptible, waitable)(timeout);
}
if (cancelled) {
assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
return false;
} else {
assert(waiter.next is null, "Triggered waiter still in queue!?");
return true;
}
}
void emit()
@safe nothrow {
import std.algorithm.mutation : swap;
import vibe.core.core : yield;
if (m_waiters.empty) return;
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
if (pivot.next) { // another emit in progress?
// shift pivot to the end, so that the other emit call will process all pending waiters
if (pivot !is m_waiters.back) {
m_waiters.remove(pivot);
m_waiters.insertBack(pivot);
}
return;
}
m_waiters.insertBack(pivot);
scope (exit) m_waiters.remove(pivot);
foreach (w; m_waiters) {
if (w is pivot) break;
emitWaiter(w);
}
}
bool emitSingle()
@safe nothrow {
if (m_waiters.empty) return false;
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
if (pivot.next) { // another emit in progress?
// shift pivot to the right, so that the other emit call will process another waiter
if (pivot !is m_waiters.back) {
auto n = pivot.next;
m_waiters.remove(pivot);
m_waiters.insertAfter(pivot, n);
}
return true;
}
emitWaiter(m_waiters.front);
return true;
}
private void emitWaiter(TaskWaiter* w)
@safe nothrow {
m_waiters.remove(w);
if (w.notifier !is null) {
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
w.emit();
} else logTrace("notify callback is null");
}
}
private struct TaskMutexImpl(bool INTERRUPTIBLE) {
private {
shared(bool) m_locked = false;
shared(uint) m_waiters = 0;
shared(ManualEvent) m_signal;
debug Task m_owner;
}
void setup()
{
m_signal.initialize();
}
@trusted bool tryLock()
{
if (cas(&m_locked, false, true)) {
debug m_owner = Task.getThis();
debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
return true;
}
return false;
}
@trusted void lock()
{
if (tryLock()) return;
debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
atomicOp!"+="(m_waiters, 1);
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount();
while (!tryLock()) {
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
else ecnt = m_signal.waitUninterruptible(ecnt);
}
}
@trusted void unlock()
{
assert(m_locked);
debug {
assert(m_owner == Task.getThis());
m_owner = Task();
}
atomicStore!(MemoryOrder.rel)(m_locked, false);
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0)
m_signal.emit();
}
}
private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
import std.stdio;
private {
core.sync.mutex.Mutex m_mutex;
Task m_owner;
size_t m_recCount = 0;
shared(uint) m_waiters = 0;
shared(ManualEvent) m_signal;
@property bool m_locked() const { return m_recCount > 0; }
}
void setup()
{
m_mutex = new core.sync.mutex.Mutex;
m_signal.initialize();
}
@trusted bool tryLock()
{
auto self = Task.getThis();
return m_mutex.performLocked!({
if (!m_owner) {
assert(m_recCount == 0);
m_recCount = 1;
m_owner = self;
return true;
} else if (m_owner == self) {
m_recCount++;
return true;
}
return false;
});
}
@trusted void lock()
{
if (tryLock()) return;
atomicOp!"+="(m_waiters, 1);
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount();
while (!tryLock()) {
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
else ecnt = m_signal.waitUninterruptible(ecnt);
}
}
@trusted void unlock()
{
auto self = Task.getThis();
m_mutex.performLocked!({
assert(m_owner == self);
assert(m_recCount > 0);
m_recCount--;
if (m_recCount == 0) {
m_owner = Task.init;
}
});
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0)
m_signal.emit();
}
}
private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
private {
LOCKABLE m_mutex;
static if (is(LOCKABLE == Mutex))
TaskMutex m_taskMutex;
shared(ManualEvent) m_signal;
}
static if (is(LOCKABLE == Lockable)) {
final class MutexWrapper : Lockable {
private core.sync.mutex.Mutex m_mutex;
this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
@trusted void lock() { m_mutex.lock(); }
@trusted void unlock() { m_mutex.unlock(); }
@trusted bool tryLock() { return m_mutex.tryLock(); }
}
void setupForMutex(core.sync.mutex.Mutex mtx)
{
setup(new MutexWrapper(mtx));
}
}
void setup(LOCKABLE mtx)
{
m_mutex = mtx;
static if (is(typeof(m_taskMutex)))
m_taskMutex = cast(TaskMutex)mtx;
m_signal.initialize();
}
@property LOCKABLE mutex() { return m_mutex; }
@trusted void wait()
{
if (auto tm = cast(TaskMutex)m_mutex) {
assert(tm.m_impl.m_locked);
debug assert(tm.m_impl.m_owner == Task.getThis());
}
auto refcount = m_signal.emitCount;
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.unlock();
else m_mutex.unlock_nothrow();
} else m_mutex.unlock();
scope(exit) {
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.lock();
else m_mutex.lock_nothrow();
} else m_mutex.lock();
}
static if (INTERRUPTIBLE) m_signal.wait(refcount);
else m_signal.waitUninterruptible(refcount);
}
@trusted bool wait(Duration timeout)
{
assert(!timeout.isNegative());
if (auto tm = cast(TaskMutex)m_mutex) {
assert(tm.m_impl.m_locked);
debug assert(tm.m_impl.m_owner == Task.getThis());
}
auto refcount = m_signal.emitCount;
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.unlock();
else m_mutex.unlock_nothrow();
} else m_mutex.unlock();
scope(exit) {
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.lock();
else m_mutex.lock_nothrow();
} else m_mutex.lock();
}
static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
}
@trusted void notify()
{
m_signal.emit();
}
@trusted void notifyAll()
{
m_signal.emit();
}
}
/** Contains the shared state of a $(D TaskReadWriteMutex).
*
* Since a $(D TaskReadWriteMutex) consists of two actual Mutex
* objects that rely on common memory, this class implements
* the actual functionality of their method calls.
*
* The method implementations are based on two static parameters
* ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
* template arguments:
*
* - $(D INTERRUPTIBLE) determines whether the mutex implementation
* are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
* method or not.
*
* - $(D INTENT) describes the intent, with which a locking operation is
* performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
* multiple Tasks holding the mutex, whereas RW locking will cause
* a "bottleneck" so that only one Task can write to the protected
* data at once.
*/
private struct ReadWriteMutexState(bool INTERRUPTIBLE)
{
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
enum Policy : int
{
/** Readers are prioritized, writers may be starved as a result. */
PREFER_READERS = 0,
/** Writers are prioritized, readers may be starved as a result. */
PREFER_WRITERS
}
/** The intent with which a locking operation is performed.
*
* Since both locks share the same underlying algorithms, the actual
* intent with which a lock operation is performed (i.e read/write)
* are passed as a template parameter to each method.
*/
enum LockingIntent : bool
{
/** Perform a read lock/unlock operation. Multiple reading locks can be
* active at a time. */
READ_ONLY = 0,
/** Perform a write lock/unlock operation. Only a single writer can
* hold a lock at any given time. */
READ_WRITE = 1
}
private {
//Queue counters
/** The number of reading tasks waiting for the lock to become available. */
shared(uint) m_waitingForReadLock = 0;
/** The number of writing tasks waiting for the lock to become available. */
shared(uint) m_waitingForWriteLock = 0;
//Lock counters
/** The number of reading tasks that currently hold the lock. */
uint m_activeReadLocks = 0;
/** The number of writing tasks that currently hold the lock (binary). */
ubyte m_activeWriteLocks = 0;
/** The policy determining the lock's behavior. */
Policy m_policy;
//Queue Events
/** The event used to wake reading tasks waiting for the lock while it is blocked. */
shared(ManualEvent) m_readyForReadLock;
/** The event used to wake writing tasks waiting for the lock while it is blocked. */
shared(ManualEvent) m_readyForWriteLock;
/** The underlying mutex that gates the access to the shared state. */
Mutex m_counterMutex;
}
this(Policy policy)
{
m_policy = policy;
m_counterMutex = new Mutex();
m_readyForReadLock = createSharedManualEvent();
m_readyForWriteLock = createSharedManualEvent();
}
@disable this(this);
/** The policy with which the lock has been created. */
@property policy() const { return m_policy; }
version(RWMutexPrint)
{
/** Print out debug information during lock operations. */
void printInfo(string OP, LockingIntent INTENT)() nothrow
{
import std.string;
try
{
import std.stdio;
writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
OP.leftJustify(10,' '),
INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
m_activeReadLocks, m_activeWriteLocks,
m_waitingForReadLock, m_waitingForWriteLock
);
}
catch (Throwable t){}
}
}
/** An internal shortcut method to determine the queue event for a given intent. */
@property ref auto queueEvent(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_readyForReadLock;
else
return m_readyForWriteLock;
}
/** An internal shortcut method to determine the queue counter for a given intent. */
@property ref auto queueCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_waitingForReadLock;
else
return m_waitingForWriteLock;
}
/** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
int emitCount(LockingIntent INTENT)()
{
return queueEvent!INTENT.emitCount();
}
/** An internal shortcut method to determine the active counter for a given intent. */
@property ref auto activeCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_activeReadLocks;
else
return m_activeWriteLocks;
}
/** An internal shortcut method to wait for the queue event for a given intent.
*
* This method is used during the `lock()` operation, after a
* `tryLock()` operation has been unsuccessfully finished.
* The active fiber will yield and be suspended until the queue event
* for the given intent will be fired.
*/
int wait(LockingIntent INTENT)(int count)
{
static if (INTERRUPTIBLE)
return queueEvent!INTENT.wait(count);
else
return queueEvent!INTENT.waitUninterruptible(count);
}
/** An internal shortcut method to notify tasks waiting for the lock to become available again.
*
* This method is called whenever the number of owners of the mutex hits
* zero; this is basically the counterpart to `wait()`.
* It wakes any Task currently waiting for the mutex to be released.
*/
@trusted void notify(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
{ //If the last reader unlocks the mutex, notify all waiting writers
if (atomicLoad(m_waitingForWriteLock) > 0)
m_readyForWriteLock.emit();
}
else
{ //If a writer unlocks the mutex, notify both readers and writers
if (atomicLoad(m_waitingForReadLock) > 0)
m_readyForReadLock.emit();
if (atomicLoad(m_waitingForWriteLock) > 0)
m_readyForWriteLock.emit();
}
}
/** An internal method that performs the acquisition attempt in different variations.
*
* Since both locks rely on a common TaskMutex object which gates the access
* to their common data acquisition attempts for this lock are more complex
* than for simple mutex variants. This method will thus be performing the
* `tryLock()` operation in two variations, depending on the callee:
*
* If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
* will instantly fail if the underlying mutex is locked (i.e. during another
* `tryLock()` or `unlock()` operation), in order to guarantee the fastest
* possible locking attempt.
*
* If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
* the operation will wait for the mutex to be available before deciding if
* the lock can be acquired, since the attempt would anyway be repeated until
* it succeeds. This will prevent frequent retries under heavy loads and thus
* should ensure better performance.
*/
@trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
{
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("tryLock",INTENT)();
//Try to acquire the lock
static if (!WAIT_FOR_BLOCKING_MUTEX)
{
if (!m_counterMutex.tryLock())
return false;
}
else
m_counterMutex.lock();
scope(exit)
m_counterMutex.unlock();
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("checkCtrs",INTENT)();
//Check if there's already an active writer
if (m_activeWriteLocks > 0)
return false;
//If writers are preferred over readers, check whether there
//currently is a writer in the waiting queue and abort if
//that's the case.
static if (INTENT == LockingIntent.READ_ONLY)
if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
return false;
//If we are locking the mutex for writing, make sure that
//there's no reader active.
static if (INTENT == LockingIntent.READ_WRITE)
if (m_activeReadLocks > 0)
return false;
//We can successfully acquire the lock!
//Log a debug statement for the success.
version(RWMutexPrint)
printInfo!("lock",INTENT)();
//Increase the according counter
//(number of active readers/writers)
//and return a success code.
activeCounter!INTENT += 1;
return true;
}
/** Attempt to acquire the lock for a given intent.
*
* Returns:
* `true`, if the lock was successfully acquired;
* `false` otherwise.
*/
@trusted bool tryLock(LockingIntent INTENT)()
{
//Try to lock this mutex without waiting for the underlying
//TaskMutex - fail if it is already blocked.
return tryLock!(INTENT,false)();
}
/** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
@trusted void lock(LockingIntent INTENT)()
{
//Prepare a waiting action before the first
//`tryLock()` call in order to avoid a race
//condition that could lead to the queue notification
//not being fired.
auto count = emitCount!INTENT;
atomicOp!"+="(queueCounter!INTENT,1);
scope(exit)
atomicOp!"-="(queueCounter!INTENT,1);
//Try to lock the mutex
auto locked = tryLock!(INTENT,true)();
if (locked)
return;
//Retry until we successfully acquired the lock
while(!locked)
{
version(RWMutexPrint)
printInfo!("wait",INTENT)();
count = wait!INTENT(count);
locked = tryLock!(INTENT,true)();
}
}
/** Unlock the mutex after a successful acquisition. */
@trusted void unlock(LockingIntent INTENT)()
{
version(RWMutexPrint)
printInfo!("unlock",INTENT)();
debug assert(activeCounter!INTENT > 0);
synchronized(m_counterMutex)
{
//Decrement the counter of active lock holders.
//If the counter hits zero, notify waiting Tasks
activeCounter!INTENT -= 1;
if (activeCounter!INTENT == 0)
{
version(RWMutexPrint)
printInfo!("notify",INTENT)();
notify!INTENT();
}
}
}
}
/** A ReadWriteMutex implementation for fibers.
*
* This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
* but does not block the event loop in contention situations. The `reader` and `writer`
* members are used for locking. Locking the `reader` mutex allows access to multiple
* readers at once, while the `writer` mutex only allows a single writer to lock it at
* any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
* writer is active, no readers can be active at the same time, and vice versa).
*
* Notice:
* Mutexes implemented by this class cannot be interrupted
* using $(D vibe.core.task.Task.interrupt()). The corresponding
* InterruptException will be deferred until the next blocking
* operation yields the event loop.
*
* Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
* interrupted.
*
* cf. $(D core.sync.mutex.ReadWriteMutex)
*/
final class TaskReadWriteMutex
{
private {
alias State = ReadWriteMutexState!false;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State m_state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* $(D INTENT) template argument, which determines if a mutex is
* used for read or write locking.
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override bool tryLock() { return m_state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void lock() { m_state.lock!INTENT(); }
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void unlock() { m_state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
m_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return m_state.policy; }
}
/** Alternative to $(D TaskReadWriteMutex) that supports interruption.
*
* This class supports the use of $(D vibe.core.task.Task.interrupt()) while
* waiting in the `lock()` method.
*
* cf. $(D core.sync.mutex.ReadWriteMutex)
*/
final class InterruptibleTaskReadWriteMutex
{
@safe:
private {
alias State = ReadWriteMutexState!true;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State m_state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* $(D INTENT) template argument, which determines if a mutex is
* used for read or write locking.
*
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override bool tryLock() { return m_state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void lock() { m_state.lock!INTENT(); }
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void unlock() { m_state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
m_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return m_state.policy; }
}