vibe-core/source/vibe/core/sync.d
Sönke Ludwig 5640516ba8 Remove SpinLock and use core.sync.mutex.Mutex instead.
The places that used it showed too much contention in load tests, so that the spin lock was more expensive that the regular OS mutex/futex.
2019-09-19 11:25:14 +02:00

2198 lines
58 KiB
D

/**
Event loop compatible task synchronization facilities.
This module provides replacement primitives for the modules in `core.sync`
that do not block vibe.d's event loop in their wait states. These should
always be preferred over the ones in Druntime under usual circumstances.
Using a standard `Mutex` is possible as long as it is ensured that no event
loop based functionality (I/O, task interaction or anything that implicitly
calls `vibe.core.core.yield`) is executed within a section of code that is
protected by the mutex. $(B Failure to do so may result in dead-locks and
high-level race-conditions!)
Copyright: © 2012-2019 Sönke Ludwig
Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
*/
module vibe.core.sync;
import vibe.core.log : logDebugV, logTrace, logInfo;
import vibe.core.task;
import core.atomic;
import core.sync.mutex;
import core.sync.condition;
import eventcore.core;
import std.exception;
import std.stdio;
import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers.
*/
LocalManualEvent createManualEvent()
@safe nothrow {
LocalManualEvent ret;
ret.initialize();
return ret;
}
/// ditto
shared(ManualEvent) createSharedManualEvent()
@trusted nothrow {
shared(ManualEvent) ret;
ret.initialize();
return ret;
}
/** Performs RAII based locking/unlocking of a mutex.
Note that while `TaskMutex` can be used with D's built-in `synchronized`
statement, `InterruptibleTaskMutex` cannot. This function provides a
library based alternative that is suitable for use with all mutex types.
*/
ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
if (is(M : Mutex) || is(M : Lockable))
{
return ScopedMutexLock!M(mutex, mode);
}
///
unittest {
import vibe.core.core : runWorkerTaskH;
__gshared int counter;
__gshared InterruptibleTaskMutex mutex;
mutex = new InterruptibleTaskMutex;
Task[] tasks;
foreach (i; 0 .. 100) {
tasks ~= runWorkerTaskH({
auto l = scopedMutexLock(mutex);
counter++;
});
}
foreach (t; tasks) t.join();
assert(counter == 100);
}
unittest {
scopedMutexLock(new Mutex);
scopedMutexLock(new TaskMutex);
scopedMutexLock(new InterruptibleTaskMutex);
}
enum LockMode {
lock,
tryLock,
defer
}
interface Lockable {
@safe:
void lock();
void unlock();
bool tryLock();
}
/** RAII lock for the Mutex class.
*/
struct ScopedMutexLock(M)
if (is(M : Mutex) || is(M : Lockable))
{
@disable this(this);
private {
M m_mutex;
bool m_locked;
LockMode m_mode;
}
this(M mutex, LockMode mode = LockMode.lock) {
assert(mutex !is null);
m_mutex = mutex;
final switch (mode) {
case LockMode.lock: lock(); break;
case LockMode.tryLock: tryLock(); break;
case LockMode.defer: break;
}
}
~this()
{
if( m_locked )
m_mutex.unlock();
}
@property bool locked() const { return m_locked; }
void unlock()
{
enforce(m_locked);
m_mutex.unlock();
m_locked = false;
}
bool tryLock()
{
enforce(!m_locked);
return m_locked = m_mutex.tryLock();
}
void lock()
{
enforce(!m_locked);
m_locked = true;
m_mutex.lock();
}
}
/*
Only for internal use:
Ensures that a mutex is locked while executing the given procedure.
This function works for all kinds of mutexes, in particular for
$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
Returns:
Returns the value returned from $(D PROC), if any.
*/
/// private
ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
{
mutex.lock();
scope (exit) mutex.unlock();
return PROC();
}
///
unittest {
int protected_var = 0;
auto mtx = new TaskMutex;
mtx.performLocked!({
protected_var++;
});
}
/**
Thread-local semaphore implementation for tasks.
When the semaphore runs out of concurrent locks, it will suspend. This class
is used in `vibe.core.connectionpool` to limit the number of concurrent
connections.
*/
final class LocalTaskSemaphore
{
@safe:
// requires a queue
import std.container.binaryheap;
import std.container.array;
//import vibe.utils.memory;
private {
static struct ThreadWaiter {
ubyte priority;
uint seq;
}
BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
uint m_maxLocks;
uint m_locks;
uint m_seq;
LocalManualEvent m_signal;
}
this(uint max_locks)
{
m_maxLocks = max_locks;
m_signal = createManualEvent();
}
/// Maximum number of concurrent locks
@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
/// ditto
@property uint maxLocks() const { return m_maxLocks; }
/// Number of concurrent locks still available
@property uint available() const { return m_maxLocks - m_locks; }
/** Try to acquire a lock.
If a lock cannot be acquired immediately, returns `false` and leaves the
semaphore in its previous state.
Returns:
`true` is returned $(I iff) the number of available locks is greater
than one.
*/
bool tryLock()
{
if (available > 0)
{
m_locks++;
return true;
}
return false;
}
/** Acquires a lock.
Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit.
*/
void lock(ubyte priority = 0)
{
import std.algorithm.comparison : min;
if (tryLock())
return;
ThreadWaiter w;
w.priority = priority;
w.seq = min(0, m_seq - w.priority);
if (++m_seq == uint.max)
rewindSeq();
() @trusted { m_waiters.insert(w); } ();
while (true) {
m_signal.waitUninterruptible();
if (m_waiters.front.seq == w.seq && tryLock()) {
return;
}
}
}
/** Gives up an existing lock.
*/
void unlock()
{
assert(m_locks >= 1);
m_locks--;
if (m_waiters.length > 0)
m_signal.emit(); // resume one
}
// if true, a goes after b. ie. b comes out front()
/// private
static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
{
if (a.priority != b.priority)
return a.priority < b.priority;
return a.seq > b.seq;
}
private void rewindSeq()
@trusted {
Array!ThreadWaiter waiters = m_waiters.release();
ushort min_seq;
import std.algorithm : min;
foreach (ref waiter; waiters[])
min_seq = min(waiter.seq, min_seq);
foreach (ref waiter; waiters[])
waiter.seq -= min_seq;
m_waiters.assume(waiters);
}
}
/**
Mutex implementation for fibers.
This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
does not block the event loop when contention happens. Note that this
mutex does not allow recursive locking.
Notice:
Because this class is annotated nothrow, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields the event loop.
Use $(D InterruptibleTaskMutex) as an alternative that can be
interrupted.
See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
*/
final class TaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private TaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); }
this() { m_impl.setup(); }
override bool tryLock() nothrow { return m_impl.tryLock(); }
override void lock() nothrow { m_impl.lock(); }
override void unlock() nothrow { m_impl.unlock(); }
}
unittest {
auto mutex = new TaskMutex;
{
auto lock = scopedMutexLock(mutex);
assert(lock.locked);
assert(mutex.m_impl.m_locked);
auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
assert(!lock2.locked);
}
assert(!mutex.m_impl.m_locked);
auto lock = scopedMutexLock(mutex, LockMode.tryLock);
assert(lock.locked);
lock.unlock();
assert(!lock.locked);
synchronized(mutex){
assert(mutex.m_impl.m_locked);
}
assert(!mutex.m_impl.m_locked);
mutex.performLocked!({
assert(mutex.m_impl.m_locked);
});
assert(!mutex.m_impl.m_locked);
static if (__VERSION__ >= 2067) {
with(mutex.scopedMutexLock) {
assert(mutex.m_impl.m_locked);
}
}
}
unittest { // test deferred throwing
import vibe.core.core;
auto mutex = new TaskMutex;
auto t1 = runTask({
scope (failure) assert(false, "No exception expected in first task!");
mutex.lock();
scope (exit) mutex.unlock();
sleep(20.msecs);
});
auto t2 = runTask({
mutex.lock();
scope (exit) mutex.unlock();
try {
yield();
assert(false, "Yield is supposed to have thrown an InterruptException.");
} catch (InterruptException) {
// as expected!
} catch (Exception) {
assert(false, "Only InterruptException supposed to be thrown!");
}
});
runTask({
// mutex is now locked in first task for 20 ms
// the second tasks is waiting in lock()
t2.interrupt();
t1.join();
t2.join();
assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
exitEventLoop();
});
runEventLoop();
}
unittest {
runMutexUnitTests!TaskMutex();
}
/**
Alternative to $(D TaskMutex) that supports interruption.
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
waiting in the $(D lock()) method. However, because the interface is not
$(D nothrow), it cannot be used as an object monitor.
See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
*/
final class InterruptibleTaskMutex : Lockable {
@safe:
private TaskMutexImpl!true m_impl;
this()
{
m_impl.setup();
// detects invalid usage within synchronized(...)
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
}
bool tryLock() nothrow { return m_impl.tryLock(); }
void lock() { m_impl.lock(); }
void unlock() nothrow { m_impl.unlock(); }
}
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
unittest {
runMutexUnitTests!InterruptibleTaskMutex();
}
/**
Recursive mutex implementation for tasks.
This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
does not block the event loop when contention happens.
Notice:
Because this class is annotated `nothrow`, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields the event loop.
Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
interrupted.
See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
*/
final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private RecursiveTaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); }
this() { m_impl.setup(); }
override bool tryLock() { return m_impl.tryLock(); }
override void lock() { m_impl.lock(); }
override void unlock() { m_impl.unlock(); }
}
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
unittest {
runMutexUnitTests!RecursiveTaskMutex();
}
/**
Alternative to $(D RecursiveTaskMutex) that supports interruption.
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
waiting in the $(D lock()) method. However, because the interface is not
$(D nothrow), it cannot be used as an object monitor.
See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
*/
final class InterruptibleRecursiveTaskMutex : Lockable {
@safe:
private RecursiveTaskMutexImpl!true m_impl;
this()
{
m_impl.setup();
// detects invalid usage within synchronized(...)
() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
}
bool tryLock() { return m_impl.tryLock(); }
void lock() { m_impl.lock(); }
void unlock() { m_impl.unlock(); }
}
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
unittest {
runMutexUnitTests!InterruptibleRecursiveTaskMutex();
}
// Helper class to ensure that the non Object.Monitor compatible interruptible
// mutex classes are not accidentally used with the `synchronized` statement
private final class NoUseMonitor : Object.Monitor {
private static shared Proxy st_instance;
static struct Proxy {
Object.Monitor monitor;
}
static @property ref shared(Proxy) instance()
@safe nothrow {
static shared(Proxy)* inst = null;
if (inst) return *inst;
() @trusted { // synchronized {} not @safe for DMD <= 2.078.3
synchronized {
if (!st_instance.monitor)
st_instance.monitor = new shared NoUseMonitor;
inst = &st_instance;
}
} ();
return *inst;
}
override void lock() @safe @nogc nothrow {
assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
}
override void unlock() @safe @nogc nothrow {}
}
private void runMutexUnitTests(M)()
{
import vibe.core.core;
auto m = new M;
Task t1, t2;
void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
assert(!m.m_impl.m_locked);
// t1 starts first and acquires the mutex for 20 ms
// t2 starts second and has to wait in m.lock()
t1 = runTask({
assert(!m.m_impl.m_locked);
m.lock();
assert(m.m_impl.m_locked);
if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs));
else assertNotThrown(sleep(20.msecs));
m.unlock();
});
t2 = runTask({
assert(!m.tryLock());
if (interrupt_t2) {
try m.lock();
catch (InterruptException) return;
try yield(); // rethrows any deferred exceptions
catch (InterruptException) {
m.unlock();
return;
}
assert(false, "Supposed to have thrown an InterruptException.");
} else assertNotThrown(m.lock());
assert(m.m_impl.m_locked);
sleep(20.msecs);
m.unlock();
assert(!m.m_impl.m_locked);
});
}
// basic lock test
m.performLocked!({
assert(m.m_impl.m_locked);
});
assert(!m.m_impl.m_locked);
// basic contention test
runContendedTasks(false, false);
auto t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t1.join();
assert(!t1.running && t2.running);
yield(); // give t2 a chance to take the lock
assert(m.m_impl.m_locked);
t2.join();
assert(!t2.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
// interruption test #1
runContendedTasks(true, false);
t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t1.interrupt();
t1.join();
assert(!t1.running && t2.running);
yield(); // give t2 a chance to take the lock
assert(m.m_impl.m_locked);
t2.join();
assert(!t2.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
// interruption test #2
runContendedTasks(false, true);
t3 = runTask({
assert(t1.running && t2.running);
assert(m.m_impl.m_locked);
t2.interrupt();
t2.join();
assert(!t2.running);
static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
assert(t1.running && m.m_impl.m_locked);
t1.join();
assert(!t1.running);
assert(!m.m_impl.m_locked);
exitEventLoop();
});
runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked);
}
/**
Event loop based condition variable or "event" implementation.
This class can be used in exchange for a $(D core.sync.condition.Condition)
to avoid blocking the event loop when waiting.
Notice:
Because this class is annotated nothrow, it cannot be interrupted
using $(D vibe.core.task.Task.interrupt()). The corresponding
$(D InterruptException) will be deferred until the next blocking
operation yields to the event loop.
Use $(D InterruptibleTaskCondition) as an alternative that can be
interrupted.
Note that it is generally not safe to use a `TaskCondition` together with an
interruptible mutex type.
See_Also: `InterruptibleTaskCondition`
*/
final class TaskCondition : core.sync.condition.Condition {
@safe:
private TaskConditionImpl!(false, Mutex) m_impl;
this(core.sync.mutex.Mutex mtx)
{
assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
"TaskCondition can only be used with Mutex or TaskMutex");
m_impl.setup(mtx);
super(mtx);
}
override @property Mutex mutex() nothrow { return m_impl.mutex; }
override void wait() nothrow { m_impl.wait(); }
override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
override void notify() nothrow { m_impl.notify(); }
override void notifyAll() nothrow { m_impl.notifyAll(); }
}
unittest {
new TaskCondition(new Mutex);
new TaskCondition(new TaskMutex);
}
/** This example shows the typical usage pattern using a `while` loop to make
sure that the final condition is reached.
*/
unittest {
import vibe.core.core;
import vibe.core.log;
__gshared Mutex mutex;
__gshared TaskCondition condition;
__gshared int workers_still_running = 0;
// setup the task condition
mutex = new Mutex;
condition = new TaskCondition(mutex);
logDebug("SETTING UP TASKS");
// start up the workers and count how many are running
foreach (i; 0 .. 4) {
workers_still_running++;
runWorkerTask({
// simulate some work
sleep(100.msecs);
// notify the waiter that we're finished
synchronized (mutex) {
workers_still_running--;
logDebug("DECREMENT %s", workers_still_running);
}
logDebug("NOTIFY");
condition.notify();
});
}
logDebug("STARTING WAIT LOOP");
// wait until all tasks have decremented the counter back to zero
synchronized (mutex) {
while (workers_still_running > 0) {
logDebug("STILL running %s", workers_still_running);
condition.wait();
}
}
}
/**
Alternative to `TaskCondition` that supports interruption.
This class supports the use of `vibe.core.task.Task.interrupt()` while
waiting in the `wait()` method.
See `TaskCondition` for an example.
Notice:
Note that it is generally not safe to use an
`InterruptibleTaskCondition` together with an interruptible mutex type.
See_Also: `TaskCondition`
*/
final class InterruptibleTaskCondition {
@safe:
private TaskConditionImpl!(true, Lockable) m_impl;
this(M)(M mutex)
if (is(M : Mutex) || is (M : Lockable))
{
static if (is(M : Lockable))
m_impl.setup(mutex);
else
m_impl.setupForMutex(mutex);
}
@property Lockable mutex() { return m_impl.mutex; }
void wait() { m_impl.wait(); }
bool wait(Duration timeout) { return m_impl.wait(timeout); }
void notify() { m_impl.notify(); }
void notifyAll() { m_impl.notifyAll(); }
}
unittest {
new InterruptibleTaskCondition(new Mutex);
new InterruptibleTaskCondition(new TaskMutex);
new InterruptibleTaskCondition(new InterruptibleTaskMutex);
}
/** A manually triggered single threaded cross-task event.
Note: the ownership can be shared between multiple fibers of the same thread.
*/
struct LocalManualEvent {
import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
@safe:
private {
alias Waiter = ThreadLocalWaiter!false;
Waiter m_waiter;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
private void initialize()
nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe;
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
}
this(this)
{
if (m_waiter)
return m_waiter.addRef();
}
~this()
nothrow {
import vibe.internal.allocator : Mallocator, disposeGCSafe;
if (m_waiter) {
if (!m_waiter.releaseRef()) {
static if (__VERSION__ < 2087) scope (failure) assert(false);
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
}
}
}
bool opCast() const nothrow { return m_waiter !is null; }
/// A counter that is increased with every emit() call
int emitCount() const nothrow { return m_waiter.m_emitCount; }
/// Emits the signal, waking up all owners of the signal.
int emit()
nothrow {
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
logTrace("unshared emit");
auto ec = m_waiter.m_emitCount++;
m_waiter.emit();
return ec;
}
/// Emits the signal, waking up a single owners of the signal.
int emitSingle()
nothrow {
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
logTrace("unshared single emit");
auto ec = m_waiter.m_emitCount++;
m_waiter.emitSingle();
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait() { return wait(this.emitCount); }
/** Acquires ownership and waits until the signal is emitted and the emit
count is larger than a given one.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
/// ditto
int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
private int doWait(bool interruptible)(Duration timeout, int emit_count)
{
import core.time : MonoTime;
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
while (m_waiter.m_emitCount - emit_count <= 0) {
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
return m_waiter.m_emitCount;
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
auto e = createManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
auto e = createManualEvent();
// integer overflow test
e.m_waiter.m_emitCount = int.max;
auto w1 = runTask({ e.wait(50.msecs, e.emitCount); });
runTask({
sleep(5.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running);
exitEventLoop();
});
runEventLoop();
}
unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
LocalManualEvent l = createManualEvent();
Task t2;
runTask({
l.wait();
t2.interrupt();
sleep(20.msecs);
exitEventLoop();
});
t2 = runTask({
try {
l.wait();
assert(false, "Shouldn't reach this.");
} catch (InterruptException e) {}
});
runTask({
l.emit();
});
runEventLoop();
}
unittest { // ensure that LocalManualEvent behaves correctly after being copied
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
LocalManualEvent l = createManualEvent();
runTask({
auto lc = l;
sleep(100.msecs);
lc.emit();
});
runTask({
assert(l.wait(1.seconds, l.emitCount));
exitEventLoop();
});
runEventLoop();
}
/** A manually triggered multi threaded cross-task event.
Note: the ownership can be shared between multiple fibers and threads.
*/
struct ManualEvent {
import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
import vibe.internal.list : StackSList;
@safe:
private {
alias ThreadWaiter = ThreadLocalWaiter!true;
int m_emitCount;
static struct Waiters {
StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(Mutex)) m_waiters;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
enum EmitMode {
single,
all
}
@disable this(this);
private void initialize()
shared nothrow {
m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
}
deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; }
/// A counter that is increased with every emit() call
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal.
int emit()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadWaiter lw;
auto drv = eventDriver;
m_waiters.lock.active.filter((ThreadWaiter w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.m_driver is drv) {
lw = w;
lw.addRef();
} else {
try {
assert(w.m_event != EventID.init);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg);
}
return true;
});
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) {
lw.emit();
releaseWaiter(lw);
}
logTrace("emit shared done");
return ec;
}
/// Emits the signal, waking up at least one waiting task
int emitSingle()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadWaiter lw;
auto drv = eventDriver;
m_waiters.lock.active.iterate((ThreadWaiter w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.m_driver is drv) {
if (w.unused) return true;
lw = w;
lw.addRef();
} else {
try {
assert(w.m_event != EventID.invalid);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg);
}
return false;
});
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) {
lw.emitSingle();
releaseWaiter(lw);
}
logTrace("emit shared done");
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
Note that in order not to miss any emits it is necessary to use the
overload taking an integer.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the
given one or until a timeout is reached.
Throws:
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
/// ditto
int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
shared {
import core.time : MonoTime;
() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
if (ms_threadEvent is EventID.invalid) {
ms_threadEvent = eventDriver.events.create();
assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
}
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
int ec = this.emitCount;
acquireThreadWaiter((scope ThreadWaiter w) {
while (ec - emit_count <= 0) {
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
ec = this.emitCount;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
}
});
return ec;
}
private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : processAllocator, makeGCSafe;
ThreadWaiter w;
auto drv = eventDriver;
with (m_waiters.lock) {
active.iterate((aw) {
if (aw.m_driver is drv) {
w = aw;
w.addRef();
return false;
}
return true;
});
if (!w) {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
});
if (!w) {
() @trusted {
try {
w = processAllocator.makeGCSafe!ThreadWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
} catch (Exception e) {
assert(false, "Failed to allocate thread waiter.");
}
} ();
}
assert(w.m_refCount == 1);
active.add(w);
}
}
scope (exit) releaseWaiter(w);
del(w);
}
private void releaseWaiter(ThreadWaiter w)
shared nothrow {
if (!w.releaseRef()) {
assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
assert(w.unused, "Waiter still used, but not referenced!?");
with (m_waiters.lock) {
auto rmvd = active.remove(w);
assert(rmvd, "Waiter not in active queue anymore!?");
free.add(w);
// TODO: cap size of m_freeWaiters
}
}
}
}
unittest {
import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
auto e = createSharedManualEvent();
auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); }
auto w2 = runWorkerTaskH(&w, &e);
runTask({
sleep(50.msecs);
e.emit();
sleep(50.msecs);
assert(!w1.running && !w2.running);
exitEventLoop();
});
runEventLoop();
}
unittest {
import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
import vibe.core.taskpool : TaskPool;
import core.time : msecs, usecs;
import std.concurrency : send, receiveOnly;
import std.random : uniform;
auto tpool = new shared TaskPool(4);
scope (exit) tpool.terminate();
static void test(shared(ManualEvent)* evt, Task owner)
{
owner.tid.send(Task.getThis());
int ec = evt.emitCount;
auto thist = Task.getThis();
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
while (ec < 5_000) {
tm.rearm(500.msecs);
sleep(uniform(0, 10_000).usecs);
if (uniform(0, 10) == 0) evt.emit();
auto ecn = evt.wait(ec);
assert(ecn > ec);
ec = ecn;
}
}
auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
scope (exit) watchdog.stop();
auto e = createSharedManualEvent();
Task[] tasks;
runTask({
auto thist = Task.getThis();
// start 25 tasks in each thread
foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
// collect all task handles
foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
int pec = 0;
while (e.emitCount < 5_000) {
tm.rearm(500.msecs);
sleep(50.usecs);
e.emit();
}
// wait for all worker tasks to finish
foreach (t; tasks) t.join();
}).join();
}
package shared struct Monitor(T, M)
{
alias Mutex = M;
alias Data = T;
private {
Mutex mutex;
Data data;
}
static struct Locked {
shared(Monitor)* m;
@disable this(this);
~this() {
() @trusted {
static if (is(typeof(Mutex.init.unlock_nothrow())))
(cast(Mutex)m.mutex).unlock_nothrow();
else (cast(Mutex)m.mutex).unlock();
} ();
}
ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
alias get this;
}
Locked lock() {
() @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return Locked(() @trusted { return &this; } ());
}
const(Locked) lock() const {
() @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return const(Locked)(() @trusted { return &this; } ());
}
}
package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
@trusted {
shared(Monitor!(T, M)) ret;
ret.mutex = cast(shared)mutex;
return ret;
}
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList;
private {
static struct TaskWaiter {
TaskWaiter* prev, next;
void delegate() @safe nothrow notifier;
void wait(void delegate() @safe nothrow del) @safe nothrow {
assert(notifier is null, "Local waiter is used twice!");
notifier = del;
}
void cancel() @safe nothrow { notifier = null; }
void emit() @safe nothrow { auto n = notifier; notifier = null; n(); }
}
static if (EVENT_TRIGGERED) {
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
} else {
int m_emitCount = 0;
}
int m_refCount = 1;
TaskWaiter m_pivot;
TaskWaiter m_emitPivot;
CircularDList!(TaskWaiter*) m_waiters;
}
this()
{
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
}
static if (EVENT_TRIGGERED) {
~this()
{
import vibe.core.internal.release : releaseHandle;
if (m_event != EventID.invalid)
releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
}
}
@property bool unused() const @safe nothrow { return m_waiters.empty; }
void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
@safe {
import core.time : MonoTime;
import vibe.internal.async : Waitable, asyncAwaitAny;
TaskWaiter waiter_store;
TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
m_waiters.insertBack(waiter);
assert(waiter.next !is null);
scope (exit)
if (waiter.next !is null) {
m_waiters.remove(waiter);
assert(!waiter.next);
}
MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
bool cancelled;
alias waitable = Waitable!(typeof(TaskWaiter.notifier),
(cb) { waiter.wait(cb); },
(cb) { cancelled = true; waiter.cancel(); },
() {}
);
alias ewaitable = Waitable!(EventCallback,
(cb) {
eventDriver.events.wait(evt, cb);
// check for exit condition *after* starting to wait for the event
// to avoid a race condition
if (exit_condition()) {
eventDriver.events.cancelWait(evt, cb);
cb(evt);
}
},
(cb) { eventDriver.events.cancelWait(evt, cb); },
(EventID) {}
);
if (evt != EventID.invalid) {
asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout);
} else {
asyncAwaitAny!(interruptible, waitable)(timeout);
}
if (cancelled) {
assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
return false;
} else {
assert(waiter.next is null, "Triggered waiter still in queue!?");
return true;
}
}
void emit()
@safe nothrow {
import std.algorithm.mutation : swap;
import vibe.core.core : yield;
if (m_waiters.empty) return;
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
if (pivot.next) { // another emit in progress?
// shift pivot to the end, so that the other emit call will process all pending waiters
if (pivot !is m_waiters.back) {
m_waiters.remove(pivot);
m_waiters.insertBack(pivot);
}
return;
}
m_waiters.insertBack(pivot);
scope (exit) m_waiters.remove(pivot);
foreach (w; m_waiters) {
if (w is pivot) break;
emitWaiter(w);
}
}
bool emitSingle()
@safe nothrow {
if (m_waiters.empty) return false;
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
if (pivot.next) { // another emit in progress?
// shift pivot to the right, so that the other emit call will process another waiter
if (pivot !is m_waiters.back) {
auto n = pivot.next;
m_waiters.remove(pivot);
m_waiters.insertAfter(pivot, n);
}
return true;
}
emitWaiter(m_waiters.front);
return true;
}
private void emitWaiter(TaskWaiter* w)
@safe nothrow {
m_waiters.remove(w);
if (w.notifier !is null) {
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
w.emit();
} else logTrace("notify callback is null");
}
}
private struct TaskMutexImpl(bool INTERRUPTIBLE) {
private {
shared(bool) m_locked = false;
shared(uint) m_waiters = 0;
shared(ManualEvent) m_signal;
debug Task m_owner;
}
void setup()
{
m_signal.initialize();
}
@trusted bool tryLock()
{
if (cas(&m_locked, false, true)) {
debug m_owner = Task.getThis();
debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
return true;
}
return false;
}
@trusted void lock()
{
if (tryLock()) return;
debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
atomicOp!"+="(m_waiters, 1);
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount();
while (!tryLock()) {
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
else ecnt = m_signal.waitUninterruptible(ecnt);
}
}
@trusted void unlock()
{
assert(m_locked);
debug {
assert(m_owner == Task.getThis());
m_owner = Task();
}
atomicStore!(MemoryOrder.rel)(m_locked, false);
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0)
m_signal.emit();
}
}
private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
import std.stdio;
private {
core.sync.mutex.Mutex m_mutex;
Task m_owner;
size_t m_recCount = 0;
shared(uint) m_waiters = 0;
shared(ManualEvent) m_signal;
@property bool m_locked() const { return m_recCount > 0; }
}
void setup()
{
m_mutex = new core.sync.mutex.Mutex;
m_signal.initialize();
}
@trusted bool tryLock()
{
auto self = Task.getThis();
return m_mutex.performLocked!({
if (!m_owner) {
assert(m_recCount == 0);
m_recCount = 1;
m_owner = self;
return true;
} else if (m_owner == self) {
m_recCount++;
return true;
}
return false;
});
}
@trusted void lock()
{
if (tryLock()) return;
atomicOp!"+="(m_waiters, 1);
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1);
auto ecnt = m_signal.emitCount();
while (!tryLock()) {
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
else ecnt = m_signal.waitUninterruptible(ecnt);
}
}
@trusted void unlock()
{
auto self = Task.getThis();
m_mutex.performLocked!({
assert(m_owner == self);
assert(m_recCount > 0);
m_recCount--;
if (m_recCount == 0) {
m_owner = Task.init;
}
});
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
if (atomicLoad(m_waiters) > 0)
m_signal.emit();
}
}
private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
private {
LOCKABLE m_mutex;
static if (is(LOCKABLE == Mutex))
TaskMutex m_taskMutex;
shared(ManualEvent) m_signal;
}
static if (is(LOCKABLE == Lockable)) {
final class MutexWrapper : Lockable {
private core.sync.mutex.Mutex m_mutex;
this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
@trusted void lock() { m_mutex.lock(); }
@trusted void unlock() { m_mutex.unlock(); }
@trusted bool tryLock() { return m_mutex.tryLock(); }
}
void setupForMutex(core.sync.mutex.Mutex mtx)
{
setup(new MutexWrapper(mtx));
}
}
void setup(LOCKABLE mtx)
{
m_mutex = mtx;
static if (is(typeof(m_taskMutex)))
m_taskMutex = cast(TaskMutex)mtx;
m_signal.initialize();
}
@property LOCKABLE mutex() { return m_mutex; }
@trusted void wait()
{
if (auto tm = cast(TaskMutex)m_mutex) {
assert(tm.m_impl.m_locked);
debug assert(tm.m_impl.m_owner == Task.getThis());
}
auto refcount = m_signal.emitCount;
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.unlock();
else m_mutex.unlock_nothrow();
} else m_mutex.unlock();
scope(exit) {
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.lock();
else m_mutex.lock_nothrow();
} else m_mutex.lock();
}
static if (INTERRUPTIBLE) m_signal.wait(refcount);
else m_signal.waitUninterruptible(refcount);
}
@trusted bool wait(Duration timeout)
{
assert(!timeout.isNegative());
if (auto tm = cast(TaskMutex)m_mutex) {
assert(tm.m_impl.m_locked);
debug assert(tm.m_impl.m_owner == Task.getThis());
}
auto refcount = m_signal.emitCount;
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.unlock();
else m_mutex.unlock_nothrow();
} else m_mutex.unlock();
scope(exit) {
static if (is(LOCKABLE == Mutex)) {
if (m_taskMutex) m_taskMutex.lock();
else m_mutex.lock_nothrow();
} else m_mutex.lock();
}
static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
}
@trusted void notify()
{
m_signal.emit();
}
@trusted void notifyAll()
{
m_signal.emit();
}
}
/** Contains the shared state of a $(D TaskReadWriteMutex).
*
* Since a $(D TaskReadWriteMutex) consists of two actual Mutex
* objects that rely on common memory, this class implements
* the actual functionality of their method calls.
*
* The method implementations are based on two static parameters
* ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
* template arguments:
*
* - $(D INTERRUPTIBLE) determines whether the mutex implementation
* are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
* method or not.
*
* - $(D INTENT) describes the intent, with which a locking operation is
* performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
* multiple Tasks holding the mutex, whereas RW locking will cause
* a "bottleneck" so that only one Task can write to the protected
* data at once.
*/
private struct ReadWriteMutexState(bool INTERRUPTIBLE)
{
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
enum Policy : int
{
/** Readers are prioritized, writers may be starved as a result. */
PREFER_READERS = 0,
/** Writers are prioritized, readers may be starved as a result. */
PREFER_WRITERS
}
/** The intent with which a locking operation is performed.
*
* Since both locks share the same underlying algorithms, the actual
* intent with which a lock operation is performed (i.e read/write)
* are passed as a template parameter to each method.
*/
enum LockingIntent : bool
{
/** Perform a read lock/unlock operation. Multiple reading locks can be
* active at a time. */
READ_ONLY = 0,
/** Perform a write lock/unlock operation. Only a single writer can
* hold a lock at any given time. */
READ_WRITE = 1
}
private {
//Queue counters
/** The number of reading tasks waiting for the lock to become available. */
shared(uint) m_waitingForReadLock = 0;
/** The number of writing tasks waiting for the lock to become available. */
shared(uint) m_waitingForWriteLock = 0;
//Lock counters
/** The number of reading tasks that currently hold the lock. */
uint m_activeReadLocks = 0;
/** The number of writing tasks that currently hold the lock (binary). */
ubyte m_activeWriteLocks = 0;
/** The policy determining the lock's behavior. */
Policy m_policy;
//Queue Events
/** The event used to wake reading tasks waiting for the lock while it is blocked. */
shared(ManualEvent) m_readyForReadLock;
/** The event used to wake writing tasks waiting for the lock while it is blocked. */
shared(ManualEvent) m_readyForWriteLock;
/** The underlying mutex that gates the access to the shared state. */
Mutex m_counterMutex;
}
this(Policy policy)
{
m_policy = policy;
m_counterMutex = new Mutex();
m_readyForReadLock = createSharedManualEvent();
m_readyForWriteLock = createSharedManualEvent();
}
@disable this(this);
/** The policy with which the lock has been created. */
@property policy() const { return m_policy; }
version(RWMutexPrint)
{
/** Print out debug information during lock operations. */
void printInfo(string OP, LockingIntent INTENT)() nothrow
{
import std.string;
try
{
import std.stdio;
writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
OP.leftJustify(10,' '),
INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
m_activeReadLocks, m_activeWriteLocks,
m_waitingForReadLock, m_waitingForWriteLock
);
}
catch (Throwable t){}
}
}
/** An internal shortcut method to determine the queue event for a given intent. */
@property ref auto queueEvent(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_readyForReadLock;
else
return m_readyForWriteLock;
}
/** An internal shortcut method to determine the queue counter for a given intent. */
@property ref auto queueCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_waitingForReadLock;
else
return m_waitingForWriteLock;
}
/** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
int emitCount(LockingIntent INTENT)()
{
return queueEvent!INTENT.emitCount();
}
/** An internal shortcut method to determine the active counter for a given intent. */
@property ref auto activeCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return m_activeReadLocks;
else
return m_activeWriteLocks;
}
/** An internal shortcut method to wait for the queue event for a given intent.
*
* This method is used during the `lock()` operation, after a
* `tryLock()` operation has been unsuccessfully finished.
* The active fiber will yield and be suspended until the queue event
* for the given intent will be fired.
*/
int wait(LockingIntent INTENT)(int count)
{
static if (INTERRUPTIBLE)
return queueEvent!INTENT.wait(count);
else
return queueEvent!INTENT.waitUninterruptible(count);
}
/** An internal shortcut method to notify tasks waiting for the lock to become available again.
*
* This method is called whenever the number of owners of the mutex hits
* zero; this is basically the counterpart to `wait()`.
* It wakes any Task currently waiting for the mutex to be released.
*/
@trusted void notify(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
{ //If the last reader unlocks the mutex, notify all waiting writers
if (atomicLoad(m_waitingForWriteLock) > 0)
m_readyForWriteLock.emit();
}
else
{ //If a writer unlocks the mutex, notify both readers and writers
if (atomicLoad(m_waitingForReadLock) > 0)
m_readyForReadLock.emit();
if (atomicLoad(m_waitingForWriteLock) > 0)
m_readyForWriteLock.emit();
}
}
/** An internal method that performs the acquisition attempt in different variations.
*
* Since both locks rely on a common TaskMutex object which gates the access
* to their common data acquisition attempts for this lock are more complex
* than for simple mutex variants. This method will thus be performing the
* `tryLock()` operation in two variations, depending on the callee:
*
* If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
* will instantly fail if the underlying mutex is locked (i.e. during another
* `tryLock()` or `unlock()` operation), in order to guarantee the fastest
* possible locking attempt.
*
* If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
* the operation will wait for the mutex to be available before deciding if
* the lock can be acquired, since the attempt would anyway be repeated until
* it succeeds. This will prevent frequent retries under heavy loads and thus
* should ensure better performance.
*/
@trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
{
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("tryLock",INTENT)();
//Try to acquire the lock
static if (!WAIT_FOR_BLOCKING_MUTEX)
{
if (!m_counterMutex.tryLock())
return false;
}
else
m_counterMutex.lock();
scope(exit)
m_counterMutex.unlock();
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("checkCtrs",INTENT)();
//Check if there's already an active writer
if (m_activeWriteLocks > 0)
return false;
//If writers are preferred over readers, check whether there
//currently is a writer in the waiting queue and abort if
//that's the case.
static if (INTENT == LockingIntent.READ_ONLY)
if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
return false;
//If we are locking the mutex for writing, make sure that
//there's no reader active.
static if (INTENT == LockingIntent.READ_WRITE)
if (m_activeReadLocks > 0)
return false;
//We can successfully acquire the lock!
//Log a debug statement for the success.
version(RWMutexPrint)
printInfo!("lock",INTENT)();
//Increase the according counter
//(number of active readers/writers)
//and return a success code.
activeCounter!INTENT += 1;
return true;
}
/** Attempt to acquire the lock for a given intent.
*
* Returns:
* `true`, if the lock was successfully acquired;
* `false` otherwise.
*/
@trusted bool tryLock(LockingIntent INTENT)()
{
//Try to lock this mutex without waiting for the underlying
//TaskMutex - fail if it is already blocked.
return tryLock!(INTENT,false)();
}
/** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
@trusted void lock(LockingIntent INTENT)()
{
//Prepare a waiting action before the first
//`tryLock()` call in order to avoid a race
//condition that could lead to the queue notification
//not being fired.
auto count = emitCount!INTENT;
atomicOp!"+="(queueCounter!INTENT,1);
scope(exit)
atomicOp!"-="(queueCounter!INTENT,1);
//Try to lock the mutex
auto locked = tryLock!(INTENT,true)();
if (locked)
return;
//Retry until we successfully acquired the lock
while(!locked)
{
version(RWMutexPrint)
printInfo!("wait",INTENT)();
count = wait!INTENT(count);
locked = tryLock!(INTENT,true)();
}
}
/** Unlock the mutex after a successful acquisition. */
@trusted void unlock(LockingIntent INTENT)()
{
version(RWMutexPrint)
printInfo!("unlock",INTENT)();
debug assert(activeCounter!INTENT > 0);
synchronized(m_counterMutex)
{
//Decrement the counter of active lock holders.
//If the counter hits zero, notify waiting Tasks
activeCounter!INTENT -= 1;
if (activeCounter!INTENT == 0)
{
version(RWMutexPrint)
printInfo!("notify",INTENT)();
notify!INTENT();
}
}
}
}
/** A ReadWriteMutex implementation for fibers.
*
* This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
* but does not block the event loop in contention situations. The `reader` and `writer`
* members are used for locking. Locking the `reader` mutex allows access to multiple
* readers at once, while the `writer` mutex only allows a single writer to lock it at
* any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
* writer is active, no readers can be active at the same time, and vice versa).
*
* Notice:
* Mutexes implemented by this class cannot be interrupted
* using $(D vibe.core.task.Task.interrupt()). The corresponding
* InterruptException will be deferred until the next blocking
* operation yields the event loop.
*
* Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
* interrupted.
*
* cf. $(D core.sync.mutex.ReadWriteMutex)
*/
final class TaskReadWriteMutex
{
private {
alias State = ReadWriteMutexState!false;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State m_state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* $(D INTENT) template argument, which determines if a mutex is
* used for read or write locking.
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override bool tryLock() { return m_state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void lock() { m_state.lock!INTENT(); }
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void unlock() { m_state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
m_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return m_state.policy; }
}
/** Alternative to $(D TaskReadWriteMutex) that supports interruption.
*
* This class supports the use of $(D vibe.core.task.Task.interrupt()) while
* waiting in the `lock()` method.
*
* cf. $(D core.sync.mutex.ReadWriteMutex)
*/
final class InterruptibleTaskReadWriteMutex
{
@safe:
private {
alias State = ReadWriteMutexState!true;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State m_state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* $(D INTENT) template argument, which determines if a mutex is
* used for read or write locking.
*
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override bool tryLock() { return m_state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void lock() { m_state.lock!INTENT(); }
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
override void unlock() { m_state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
m_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return m_state.policy; }
}