2016-03-01 19:30:42 +00:00
|
|
|
/**
|
|
|
|
Interruptible Task synchronization facilities
|
|
|
|
|
|
|
|
Copyright: © 2012-2016 RejectedSoftware e.K.
|
|
|
|
Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
|
|
|
|
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
|
|
|
|
*/
|
|
|
|
module vibe.core.sync;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
import vibe.core.log : logDebugV, logTrace, logInfo;
|
2016-03-01 19:30:42 +00:00
|
|
|
import vibe.core.task;
|
|
|
|
|
|
|
|
import core.atomic;
|
|
|
|
import core.sync.mutex;
|
|
|
|
import core.sync.condition;
|
|
|
|
import eventcore.core;
|
|
|
|
import std.exception;
|
|
|
|
import std.stdio;
|
|
|
|
import std.traits : ReturnType;
|
|
|
|
|
|
|
|
|
|
|
|
enum LockMode {
|
|
|
|
lock,
|
|
|
|
tryLock,
|
|
|
|
defer
|
|
|
|
}
|
|
|
|
|
|
|
|
interface Lockable {
|
|
|
|
@safe:
|
|
|
|
void lock();
|
|
|
|
void unlock();
|
|
|
|
bool tryLock();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** RAII lock for the Mutex class.
|
|
|
|
*/
|
|
|
|
struct ScopedMutexLock
|
|
|
|
{
|
|
|
|
@disable this(this);
|
|
|
|
private {
|
|
|
|
Mutex m_mutex;
|
|
|
|
bool m_locked;
|
|
|
|
LockMode m_mode;
|
|
|
|
}
|
|
|
|
|
|
|
|
this(core.sync.mutex.Mutex mutex, LockMode mode = LockMode.lock) {
|
|
|
|
assert(mutex !is null);
|
|
|
|
m_mutex = mutex;
|
|
|
|
|
|
|
|
final switch (mode) {
|
|
|
|
case LockMode.lock: lock(); break;
|
|
|
|
case LockMode.tryLock: tryLock(); break;
|
|
|
|
case LockMode.defer: break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
~this()
|
|
|
|
{
|
|
|
|
if( m_locked )
|
|
|
|
m_mutex.unlock();
|
|
|
|
}
|
|
|
|
|
|
|
|
@property bool locked() const { return m_locked; }
|
|
|
|
|
|
|
|
void unlock()
|
|
|
|
{
|
|
|
|
enforce(m_locked);
|
|
|
|
m_mutex.unlock();
|
|
|
|
m_locked = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool tryLock()
|
|
|
|
{
|
|
|
|
enforce(!m_locked);
|
|
|
|
return m_locked = m_mutex.tryLock();
|
|
|
|
}
|
|
|
|
|
|
|
|
void lock()
|
|
|
|
{
|
|
|
|
enforce(!m_locked);
|
|
|
|
m_locked = true;
|
|
|
|
m_mutex.lock();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
Only for internal use:
|
|
|
|
Ensures that a mutex is locked while executing the given procedure.
|
|
|
|
|
|
|
|
This function works for all kinds of mutexes, in particular for
|
|
|
|
$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Returns the value returned from $(D PROC), if any.
|
|
|
|
*/
|
|
|
|
/// private
|
|
|
|
ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
|
|
|
|
{
|
|
|
|
mutex.lock();
|
|
|
|
scope (exit) mutex.unlock();
|
|
|
|
return PROC();
|
|
|
|
}
|
|
|
|
|
|
|
|
///
|
|
|
|
unittest {
|
|
|
|
int protected_var = 0;
|
|
|
|
auto mtx = new TaskMutex;
|
|
|
|
mtx.performLocked!({
|
|
|
|
protected_var++;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Thread-local semaphore implementation for tasks.
|
|
|
|
|
|
|
|
When the semaphore runs out of concurrent locks, it will suspend. This class
|
|
|
|
is used in `vibe.core.connectionpool` to limit the number of concurrent
|
|
|
|
connections.
|
|
|
|
*/
|
|
|
|
class LocalTaskSemaphore
|
|
|
|
{
|
|
|
|
// requires a queue
|
|
|
|
import std.container.binaryheap;
|
|
|
|
import std.container.array;
|
|
|
|
//import vibe.utils.memory;
|
|
|
|
|
|
|
|
private {
|
2016-06-15 16:21:33 +00:00
|
|
|
static struct ThreadWaiter {
|
2016-03-01 19:30:42 +00:00
|
|
|
ManualEvent signal;
|
|
|
|
ubyte priority;
|
|
|
|
uint seq;
|
|
|
|
}
|
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
|
2016-03-01 19:30:42 +00:00
|
|
|
uint m_maxLocks;
|
|
|
|
uint m_locks;
|
|
|
|
uint m_seq;
|
|
|
|
}
|
|
|
|
|
|
|
|
this(uint max_locks)
|
|
|
|
{
|
|
|
|
m_maxLocks = max_locks;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Maximum number of concurrent locks
|
|
|
|
@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
|
|
|
|
/// ditto
|
|
|
|
@property uint maxLocks() const { return m_maxLocks; }
|
|
|
|
|
|
|
|
/// Number of concurrent locks still available
|
|
|
|
@property uint available() const { return m_maxLocks - m_locks; }
|
|
|
|
|
|
|
|
/** Try to acquire a lock.
|
|
|
|
|
|
|
|
If a lock cannot be acquired immediately, returns `false` and leaves the
|
|
|
|
semaphore in its previous state.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
`true` is returned $(I iff) the number of available locks is greater
|
|
|
|
than one.
|
|
|
|
*/
|
|
|
|
bool tryLock()
|
|
|
|
{
|
|
|
|
if (available > 0)
|
|
|
|
{
|
|
|
|
m_locks++;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Acquires a lock.
|
|
|
|
|
|
|
|
Once the limit of concurrent locks is reaced, this method will block
|
|
|
|
until the number of locks drops below the limit.
|
|
|
|
*/
|
|
|
|
void lock(ubyte priority = 0)
|
|
|
|
{
|
|
|
|
import std.algorithm.comparison : min;
|
|
|
|
|
|
|
|
if (tryLock())
|
|
|
|
return;
|
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
ThreadWaiter w;
|
2016-03-01 19:30:42 +00:00
|
|
|
w.signal = createManualEvent();
|
|
|
|
w.priority = priority;
|
|
|
|
w.seq = min(0, m_seq - w.priority);
|
|
|
|
if (++m_seq == uint.max)
|
|
|
|
rewindSeq();
|
|
|
|
|
|
|
|
m_waiters.insert(w);
|
|
|
|
do w.signal.wait(); while (!tryLock());
|
|
|
|
// on resume:
|
|
|
|
destroy(w.signal);
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Gives up an existing lock.
|
|
|
|
*/
|
|
|
|
void unlock()
|
|
|
|
{
|
|
|
|
m_locks--;
|
|
|
|
if (m_waiters.length > 0 && available > 0) {
|
2016-06-15 16:21:33 +00:00
|
|
|
ThreadWaiter w = m_waiters.front();
|
2016-03-01 19:30:42 +00:00
|
|
|
w.signal.emit(); // resume one
|
|
|
|
m_waiters.removeFront();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// if true, a goes after b. ie. b comes out front()
|
|
|
|
/// private
|
2016-06-15 16:21:33 +00:00
|
|
|
static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
|
2016-03-01 19:30:42 +00:00
|
|
|
{
|
|
|
|
if (a.seq == b.seq) {
|
|
|
|
if (a.priority == b.priority) {
|
|
|
|
// resolve using the pointer address
|
|
|
|
return (cast(size_t)&a.signal) > (cast(size_t) &b.signal);
|
|
|
|
}
|
|
|
|
// resolve using priority
|
|
|
|
return a.priority < b.priority;
|
|
|
|
}
|
|
|
|
// resolve using seq number
|
|
|
|
return a.seq > b.seq;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void rewindSeq()
|
|
|
|
{
|
2016-06-15 16:21:33 +00:00
|
|
|
Array!ThreadWaiter waiters = m_waiters.release();
|
2016-03-01 19:30:42 +00:00
|
|
|
ushort min_seq;
|
|
|
|
import std.algorithm : min;
|
|
|
|
foreach (ref waiter; waiters[])
|
|
|
|
min_seq = min(waiter.seq, min_seq);
|
|
|
|
foreach (ref waiter; waiters[])
|
|
|
|
waiter.seq -= min_seq;
|
|
|
|
m_waiters.assume(waiters);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Mutex implementation for fibers.
|
|
|
|
|
|
|
|
This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
|
|
|
|
does not block the event loop when contention happens. Note that this
|
|
|
|
mutex does not allow recursive locking.
|
|
|
|
|
|
|
|
Notice:
|
|
|
|
Because this class is annotated nothrow, it cannot be interrupted
|
|
|
|
using $(D vibe.core.task.Task.interrupt()). The corresponding
|
|
|
|
$(D InterruptException) will be deferred until the next blocking
|
|
|
|
operation yields the event loop.
|
|
|
|
|
|
|
|
Use $(D InterruptibleTaskMutex) as an alternative that can be
|
|
|
|
interrupted.
|
|
|
|
|
|
|
|
See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
|
|
|
|
*/
|
|
|
|
class TaskMutex : core.sync.mutex.Mutex, Lockable {
|
|
|
|
private TaskMutexImpl!false m_impl;
|
|
|
|
|
|
|
|
this(Object o) { m_impl.setup(); super(o); }
|
|
|
|
this() { m_impl.setup(); }
|
|
|
|
|
|
|
|
override bool tryLock() nothrow { return m_impl.tryLock(); }
|
|
|
|
override void lock() nothrow { m_impl.lock(); }
|
|
|
|
override void unlock() nothrow { m_impl.unlock(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
unittest {
|
|
|
|
auto mutex = new TaskMutex;
|
|
|
|
|
|
|
|
{
|
|
|
|
auto lock = ScopedMutexLock(mutex);
|
|
|
|
assert(lock.locked);
|
|
|
|
assert(mutex.m_impl.m_locked);
|
|
|
|
|
|
|
|
auto lock2 = ScopedMutexLock(mutex, LockMode.tryLock);
|
|
|
|
assert(!lock2.locked);
|
|
|
|
}
|
|
|
|
assert(!mutex.m_impl.m_locked);
|
|
|
|
|
|
|
|
auto lock = ScopedMutexLock(mutex, LockMode.tryLock);
|
|
|
|
assert(lock.locked);
|
|
|
|
lock.unlock();
|
|
|
|
assert(!lock.locked);
|
|
|
|
|
|
|
|
synchronized(mutex){
|
|
|
|
assert(mutex.m_impl.m_locked);
|
|
|
|
}
|
|
|
|
assert(!mutex.m_impl.m_locked);
|
|
|
|
|
|
|
|
mutex.performLocked!({
|
|
|
|
assert(mutex.m_impl.m_locked);
|
|
|
|
});
|
|
|
|
assert(!mutex.m_impl.m_locked);
|
|
|
|
|
|
|
|
static if (__VERSION__ >= 2067) {
|
|
|
|
with(mutex.ScopedMutexLock) {
|
|
|
|
assert(mutex.m_impl.m_locked);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
|
|
unittest { // test deferred throwing
|
|
|
|
import vibe.core.core;
|
|
|
|
|
|
|
|
auto mutex = new TaskMutex;
|
|
|
|
auto t1 = runTask({
|
|
|
|
scope (failure) assert(false, "No exception expected in first task!");
|
|
|
|
mutex.lock();
|
|
|
|
scope (exit) mutex.unlock();
|
|
|
|
sleep(20.msecs);
|
|
|
|
});
|
|
|
|
|
|
|
|
auto t2 = runTask({
|
|
|
|
mutex.lock();
|
|
|
|
scope (exit) mutex.unlock();
|
|
|
|
try {
|
|
|
|
yield();
|
|
|
|
assert(false, "Yield is supposed to have thrown an InterruptException.");
|
|
|
|
} catch (InterruptException) {
|
|
|
|
// as expected!
|
2016-06-17 20:33:04 +00:00
|
|
|
} catch (Exception) {
|
|
|
|
assert(false, "Only InterruptException supposed to be thrown!");
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
runTask({
|
|
|
|
// mutex is now locked in first task for 20 ms
|
|
|
|
// the second tasks is waiting in lock()
|
|
|
|
t2.interrupt();
|
|
|
|
t1.join();
|
|
|
|
t2.join();
|
|
|
|
assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
|
|
|
|
exitEventLoop();
|
|
|
|
});
|
|
|
|
|
|
|
|
runEventLoop();
|
|
|
|
}
|
|
|
|
|
|
|
|
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
|
|
unittest {
|
|
|
|
runMutexUnitTests!TaskMutex();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Alternative to $(D TaskMutex) that supports interruption.
|
|
|
|
|
|
|
|
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
|
|
|
|
waiting in the $(D lock()) method. However, because the interface is not
|
|
|
|
$(D nothrow), it cannot be used as an object monitor.
|
|
|
|
|
|
|
|
See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
|
|
|
|
*/
|
|
|
|
final class InterruptibleTaskMutex : Lockable {
|
|
|
|
private TaskMutexImpl!true m_impl;
|
|
|
|
|
|
|
|
this() { m_impl.setup(); }
|
|
|
|
|
|
|
|
bool tryLock() nothrow { return m_impl.tryLock(); }
|
|
|
|
void lock() { m_impl.lock(); }
|
|
|
|
void unlock() nothrow { m_impl.unlock(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
|
|
unittest {
|
|
|
|
runMutexUnitTests!InterruptibleTaskMutex();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Recursive mutex implementation for tasks.
|
|
|
|
|
|
|
|
This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
|
|
|
|
does not block the event loop when contention happens.
|
|
|
|
|
|
|
|
Notice:
|
|
|
|
Because this class is annotated nothrow, it cannot be interrupted
|
|
|
|
using $(D vibe.core.task.Task.interrupt()). The corresponding
|
|
|
|
$(D InterruptException) will be deferred until the next blocking
|
|
|
|
operation yields the event loop.
|
|
|
|
|
|
|
|
Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
|
|
|
|
interrupted.
|
|
|
|
|
|
|
|
See_Also: TaskMutex, core.sync.mutex.Mutex
|
|
|
|
*/
|
|
|
|
class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
|
|
|
|
private RecursiveTaskMutexImpl!false m_impl;
|
|
|
|
|
|
|
|
this(Object o) { m_impl.setup(); super(o); }
|
|
|
|
this() { m_impl.setup(); }
|
|
|
|
|
|
|
|
override bool tryLock() { return m_impl.tryLock(); }
|
|
|
|
override void lock() { m_impl.lock(); }
|
|
|
|
override void unlock() { m_impl.unlock(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
|
|
unittest {
|
|
|
|
runMutexUnitTests!RecursiveTaskMutex();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Alternative to $(D RecursiveTaskMutex) that supports interruption.
|
|
|
|
|
|
|
|
This class supports the use of $(D vibe.core.task.Task.interrupt()) while
|
|
|
|
waiting in the $(D lock()) method. However, because the interface is not
|
|
|
|
$(D nothrow), it cannot be used as an object monitor.
|
|
|
|
|
|
|
|
See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
|
|
|
|
*/
|
|
|
|
final class InterruptibleRecursiveTaskMutex : Lockable {
|
|
|
|
private RecursiveTaskMutexImpl!true m_impl;
|
|
|
|
|
|
|
|
this() { m_impl.setup(); }
|
|
|
|
|
|
|
|
bool tryLock() { return m_impl.tryLock(); }
|
|
|
|
void lock() { m_impl.lock(); }
|
|
|
|
void unlock() { m_impl.unlock(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
|
|
unittest {
|
|
|
|
runMutexUnitTests!InterruptibleRecursiveTaskMutex();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void runMutexUnitTests(M)()
|
|
|
|
{
|
|
|
|
import vibe.core.core;
|
|
|
|
|
|
|
|
auto m = new M;
|
|
|
|
Task t1, t2;
|
|
|
|
void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
|
|
|
|
// t1 starts first and acquires the mutex for 20 ms
|
|
|
|
// t2 starts second and has to wait in m.lock()
|
|
|
|
t1 = runTask({
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
m.lock();
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs));
|
|
|
|
else assertNotThrown(sleep(20.msecs));
|
|
|
|
m.unlock();
|
|
|
|
});
|
|
|
|
t2 = runTask({
|
|
|
|
assert(!m.tryLock());
|
|
|
|
if (interrupt_t2) {
|
|
|
|
try m.lock();
|
|
|
|
catch (InterruptException) return;
|
|
|
|
try yield(); // rethrows any deferred exceptions
|
|
|
|
catch (InterruptException) {
|
|
|
|
m.unlock();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
assert(false, "Supposed to have thrown an InterruptException.");
|
|
|
|
} else assertNotThrown(m.lock());
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
sleep(20.msecs);
|
|
|
|
m.unlock();
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// basic lock test
|
|
|
|
m.performLocked!({
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
});
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
|
|
|
|
// basic contention test
|
|
|
|
runContendedTasks(false, false);
|
|
|
|
runTask({
|
|
|
|
assert(t1.running && t2.running);
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
t1.join();
|
|
|
|
assert(!t1.running && t2.running);
|
|
|
|
yield(); // give t2 a chance to take the lock
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
t2.join();
|
|
|
|
assert(!t2.running);
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
exitEventLoop();
|
|
|
|
});
|
|
|
|
runEventLoop();
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
|
|
|
|
// interruption test #1
|
|
|
|
runContendedTasks(true, false);
|
|
|
|
runTask({
|
|
|
|
assert(t1.running && t2.running);
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
t1.interrupt();
|
|
|
|
t1.join();
|
|
|
|
assert(!t1.running && t2.running);
|
|
|
|
yield(); // give t2 a chance to take the lock
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
t2.join();
|
|
|
|
assert(!t2.running);
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
exitEventLoop();
|
|
|
|
});
|
|
|
|
runEventLoop();
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
|
|
|
|
// interruption test #2
|
|
|
|
runContendedTasks(false, true);
|
|
|
|
runTask({
|
|
|
|
assert(t1.running && t2.running);
|
|
|
|
assert(m.m_impl.m_locked);
|
|
|
|
t2.interrupt();
|
|
|
|
t2.join();
|
|
|
|
assert(!t2.running);
|
|
|
|
static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
|
|
|
|
assert(t1.running && m.m_impl.m_locked);
|
|
|
|
t1.join();
|
|
|
|
assert(!t1.running);
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
exitEventLoop();
|
|
|
|
});
|
|
|
|
runEventLoop();
|
|
|
|
assert(!m.m_impl.m_locked);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Event loop based condition variable or "event" implementation.
|
|
|
|
|
|
|
|
This class can be used in exchange for a $(D core.sync.condition.Condition)
|
|
|
|
to avoid blocking the event loop when waiting.
|
|
|
|
|
|
|
|
Notice:
|
|
|
|
Because this class is annotated nothrow, it cannot be interrupted
|
|
|
|
using $(D vibe.core.task.Task.interrupt()). The corresponding
|
|
|
|
$(D InterruptException) will be deferred until the next blocking
|
|
|
|
operation yields to the event loop.
|
|
|
|
|
|
|
|
Use $(D InterruptibleTaskCondition) as an alternative that can be
|
|
|
|
interrupted.
|
|
|
|
|
|
|
|
Note that it is generally not safe to use a `TaskCondition` together with an
|
|
|
|
interruptible mutex type.
|
|
|
|
|
|
|
|
See_Also: InterruptibleTaskCondition
|
|
|
|
*/
|
|
|
|
class TaskCondition : core.sync.condition.Condition {
|
|
|
|
private TaskConditionImpl!(false, Mutex) m_impl;
|
|
|
|
|
|
|
|
this(core.sync.mutex.Mutex mtx) {
|
|
|
|
m_impl.setup(mtx);
|
|
|
|
super(mtx);
|
|
|
|
}
|
|
|
|
override @property Mutex mutex() { return m_impl.mutex; }
|
|
|
|
override void wait() { m_impl.wait(); }
|
|
|
|
override bool wait(Duration timeout) { return m_impl.wait(timeout); }
|
|
|
|
override void notify() { m_impl.notify(); }
|
|
|
|
override void notifyAll() { m_impl.notifyAll(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
/** This example shows the typical usage pattern using a `while` loop to make
|
|
|
|
sure that the final condition is reached.
|
|
|
|
*/
|
|
|
|
unittest {
|
|
|
|
import vibe.core.core;
|
2016-06-17 20:33:04 +00:00
|
|
|
import vibe.core.log;
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
__gshared Mutex mutex;
|
|
|
|
__gshared TaskCondition condition;
|
|
|
|
__gshared int workers_still_running = 0;
|
|
|
|
|
|
|
|
// setup the task condition
|
|
|
|
mutex = new Mutex;
|
|
|
|
condition = new TaskCondition(mutex);
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
logDebug("SETTING UP TASKS");
|
|
|
|
|
2016-03-01 19:30:42 +00:00
|
|
|
// start up the workers and count how many are running
|
|
|
|
foreach (i; 0 .. 4) {
|
|
|
|
workers_still_running++;
|
|
|
|
runWorkerTask({
|
|
|
|
// simulate some work
|
|
|
|
sleep(100.msecs);
|
|
|
|
|
|
|
|
// notify the waiter that we're finished
|
2016-06-17 20:33:04 +00:00
|
|
|
synchronized (mutex) {
|
2016-03-01 19:30:42 +00:00
|
|
|
workers_still_running--;
|
2016-06-17 20:33:04 +00:00
|
|
|
logDebug("DECREMENT %s", workers_still_running);
|
|
|
|
}
|
|
|
|
logDebug("NOTIFY");
|
2016-03-01 19:30:42 +00:00
|
|
|
condition.notify();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
logDebug("STARTING WAIT LOOP");
|
|
|
|
|
2016-03-01 19:30:42 +00:00
|
|
|
// wait until all tasks have decremented the counter back to zero
|
|
|
|
synchronized (mutex) {
|
2016-06-17 20:33:04 +00:00
|
|
|
while (workers_still_running > 0) {
|
|
|
|
logDebug("STILL running %s", workers_still_running);
|
2016-03-01 19:30:42 +00:00
|
|
|
condition.wait();
|
2016-06-17 20:33:04 +00:00
|
|
|
}
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
Alternative to `TaskCondition` that supports interruption.
|
|
|
|
|
|
|
|
This class supports the use of `vibe.core.task.Task.interrupt()` while
|
|
|
|
waiting in the `wait()` method.
|
|
|
|
|
|
|
|
See `TaskCondition` for an example.
|
|
|
|
|
|
|
|
Notice:
|
|
|
|
Note that it is generally not safe to use an
|
|
|
|
`InterruptibleTaskCondition` together with an interruptible mutex type.
|
|
|
|
|
|
|
|
See_Also: `TaskCondition`
|
|
|
|
*/
|
|
|
|
final class InterruptibleTaskCondition {
|
|
|
|
private TaskConditionImpl!(true, Lockable) m_impl;
|
|
|
|
|
|
|
|
this(core.sync.mutex.Mutex mtx) { m_impl.setup(mtx); }
|
|
|
|
this(Lockable mtx) { m_impl.setup(mtx); }
|
|
|
|
|
|
|
|
@property Lockable mutex() { return m_impl.mutex; }
|
|
|
|
void wait() { m_impl.wait(); }
|
|
|
|
bool wait(Duration timeout) { return m_impl.wait(timeout); }
|
|
|
|
void notify() { m_impl.notify(); }
|
|
|
|
void notifyAll() { m_impl.notifyAll(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/** Creates a new signal that can be shared between fibers.
|
|
|
|
*/
|
|
|
|
ManualEvent createManualEvent()
|
|
|
|
{
|
|
|
|
return ManualEvent.init;
|
|
|
|
}
|
2016-06-14 06:01:03 +00:00
|
|
|
/// ditto
|
|
|
|
shared(ManualEvent) createSharedManualEvent()
|
|
|
|
{
|
|
|
|
return shared(ManualEvent).init;
|
|
|
|
}
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
/** A manually triggered cross-task event.
|
|
|
|
|
|
|
|
Note: the ownership can be shared between multiple fibers and threads.
|
|
|
|
*/
|
|
|
|
struct ManualEvent {
|
2016-06-14 06:01:03 +00:00
|
|
|
import core.thread : Thread;
|
2016-06-15 16:21:33 +00:00
|
|
|
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
|
2016-06-14 06:01:03 +00:00
|
|
|
|
|
|
|
private {
|
|
|
|
static struct ThreadWaiter {
|
|
|
|
ThreadWaiter* next;
|
2016-06-17 20:33:04 +00:00
|
|
|
EventID event;
|
|
|
|
EventDriver driver;
|
|
|
|
Thread thread;
|
2016-06-15 16:21:33 +00:00
|
|
|
StackSList!LocalWaiter tasks;
|
|
|
|
}
|
|
|
|
static struct LocalWaiter {
|
|
|
|
LocalWaiter* next;
|
2016-06-14 06:01:03 +00:00
|
|
|
Task task;
|
|
|
|
void delegate() @safe nothrow notifier;
|
2016-06-15 16:21:33 +00:00
|
|
|
bool cancelled = false;
|
2016-06-14 06:01:03 +00:00
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
void wait(void delegate() @safe nothrow del) @safe nothrow {
|
|
|
|
assert(notifier is null, "Local waiter is used twice!");
|
2016-06-14 06:01:03 +00:00
|
|
|
notifier = del;
|
|
|
|
}
|
2016-06-17 20:33:04 +00:00
|
|
|
void cancel() @safe nothrow { cancelled = true; notifier = null; }
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
int m_emitCount;
|
2016-06-15 16:21:33 +00:00
|
|
|
ThreadWaiter* m_waiters;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// thread destructor in vibe.core.core will decrement the ref. count
|
|
|
|
package static EventID ms_threadEvent;
|
|
|
|
|
|
|
|
enum EmitMode {
|
|
|
|
single,
|
|
|
|
all
|
|
|
|
}
|
|
|
|
|
|
|
|
//@disable this(this);
|
|
|
|
|
|
|
|
deprecated("ManualEvent is always non-null!")
|
2016-03-01 19:30:42 +00:00
|
|
|
bool opCast() const nothrow { return true; }
|
2016-06-14 06:01:03 +00:00
|
|
|
deprecated("ManualEvent is always non-null!")
|
|
|
|
bool opCast() const shared nothrow { return true; }
|
|
|
|
|
2016-03-01 19:30:42 +00:00
|
|
|
/// A counter that is increased with every emit() call
|
2016-06-14 06:01:03 +00:00
|
|
|
int emitCount() const nothrow { return m_emitCount; }
|
|
|
|
/// ditto
|
|
|
|
int emitCount() const shared nothrow { return atomicLoad(m_emitCount); }
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
/// Emits the signal, waking up all owners of the signal.
|
2016-06-14 06:01:03 +00:00
|
|
|
int emit(EmitMode mode = EmitMode.all)
|
|
|
|
shared nothrow {
|
|
|
|
import core.atomic : atomicOp, cas;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
logTrace("emit shared");
|
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
auto ec = atomicOp!"+="(m_emitCount, 1);
|
|
|
|
auto thisthr = Thread.getThis();
|
|
|
|
|
|
|
|
final switch (mode) {
|
|
|
|
case EmitMode.all:
|
|
|
|
// FIXME: would be nice to have atomicSwap instead
|
2016-06-15 16:21:33 +00:00
|
|
|
auto w = cast(ThreadWaiter*)atomicLoad(m_waiters);
|
2016-06-17 20:33:04 +00:00
|
|
|
if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null)) {
|
|
|
|
logTrace("Another thread emitted concurrently - returning.");
|
2016-06-14 06:01:03 +00:00
|
|
|
return ec;
|
2016-06-17 20:33:04 +00:00
|
|
|
}
|
2016-06-14 06:01:03 +00:00
|
|
|
while (w !is null) {
|
2016-06-17 20:33:04 +00:00
|
|
|
// Note: emitForThisThread can result in w getting deallocated at any
|
|
|
|
// time, so we need to copy any fields first
|
|
|
|
auto wnext = w.next;
|
|
|
|
atomicStore((cast(shared)w).next, null);
|
|
|
|
assert(wnext !is w, "Same waiter enqueued twice!?");
|
2016-06-15 16:21:33 +00:00
|
|
|
if (w.driver is eventDriver) {
|
2016-06-17 20:33:04 +00:00
|
|
|
logTrace("Same thread emit (%s/%s)", cast(void*)w, cast(void*)w.tasks.first);
|
2016-06-14 06:01:03 +00:00
|
|
|
emitForThisThread(w.tasks.m_first, mode);
|
|
|
|
} else {
|
2016-06-17 20:33:04 +00:00
|
|
|
logTrace("Foreign thread \"%s\" notify: %s", w.thread.name, w.event);
|
2016-06-15 16:21:33 +00:00
|
|
|
auto drv = w.driver;
|
2016-06-14 06:01:03 +00:00
|
|
|
auto evt = w.event;
|
2016-06-15 16:21:33 +00:00
|
|
|
if (evt != EventID.init)
|
2016-10-05 12:40:29 +00:00
|
|
|
(cast(shared)drv.events).trigger(evt, true);
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
2016-06-17 20:33:04 +00:00
|
|
|
w = wnext;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
break;
|
|
|
|
case EmitMode.single:
|
|
|
|
assert(false);
|
|
|
|
}
|
2016-06-17 20:33:04 +00:00
|
|
|
|
|
|
|
logTrace("emit shared done");
|
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
return ec;
|
|
|
|
}
|
|
|
|
/// ditto
|
|
|
|
int emit(EmitMode mode = EmitMode.all)
|
|
|
|
nothrow {
|
|
|
|
auto ec = m_emitCount++;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
logTrace("unshared emit");
|
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
final switch (mode) {
|
|
|
|
case EmitMode.all:
|
|
|
|
auto w = m_waiters;
|
|
|
|
m_waiters = null;
|
|
|
|
if (w !is null) {
|
2016-06-15 16:21:33 +00:00
|
|
|
assert(w.driver is eventDriver, "Unshared ManualEvent has waiters in foreign thread!");
|
2016-06-14 06:01:03 +00:00
|
|
|
assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!");
|
|
|
|
emitForThisThread(w.tasks.m_first, EmitMode.all);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case EmitMode.single:
|
|
|
|
assert(false);
|
|
|
|
}
|
|
|
|
return ec;
|
|
|
|
}
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
/** Acquires ownership and waits until the signal is emitted.
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
Note that in order not to miss any emits it is necessary to use the
|
|
|
|
overload taking an integer.
|
|
|
|
|
2016-03-01 19:30:42 +00:00
|
|
|
Throws:
|
|
|
|
May throw an $(D InterruptException) if the task gets interrupted
|
|
|
|
using $(D Task.interrupt()).
|
|
|
|
*/
|
2016-06-14 06:01:03 +00:00
|
|
|
int wait() { return wait(this.emitCount); }
|
|
|
|
/// ditto
|
|
|
|
int wait() shared { return wait(this.emitCount); }
|
2016-03-01 19:30:42 +00:00
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
/** Acquires ownership and waits until the emit count differs from the
|
|
|
|
given one or until a timeout is reached.
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
Throws:
|
|
|
|
May throw an $(D InterruptException) if the task gets interrupted
|
|
|
|
using $(D Task.interrupt()).
|
|
|
|
*/
|
2016-06-17 20:33:04 +00:00
|
|
|
int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
|
2016-06-14 06:01:03 +00:00
|
|
|
/// ditto
|
2016-06-17 20:33:04 +00:00
|
|
|
int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
|
2016-06-14 06:01:03 +00:00
|
|
|
/// ditto
|
2016-06-17 20:33:04 +00:00
|
|
|
int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
|
|
|
|
/// ditto
|
|
|
|
int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
|
2016-06-14 06:01:03 +00:00
|
|
|
|
2016-03-01 19:30:42 +00:00
|
|
|
/** Same as $(D wait), but defers throwing any $(D InterruptException).
|
|
|
|
|
|
|
|
This method is annotated $(D nothrow) at the expense that it cannot be
|
|
|
|
interrupted.
|
|
|
|
*/
|
2016-06-14 06:01:03 +00:00
|
|
|
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
|
|
|
|
///
|
|
|
|
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
|
|
|
|
/// ditto
|
2016-06-17 20:33:04 +00:00
|
|
|
int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
|
2016-03-01 19:30:42 +00:00
|
|
|
/// ditto
|
2016-06-17 20:33:04 +00:00
|
|
|
int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
|
2016-06-14 06:01:03 +00:00
|
|
|
/// ditto
|
2016-06-17 20:33:04 +00:00
|
|
|
int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
|
|
|
|
/// ditto
|
|
|
|
int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
|
2016-06-14 06:01:03 +00:00
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
private int doWait(bool interruptible)(Duration timeout, int emit_count)
|
|
|
|
{
|
2016-06-14 06:01:03 +00:00
|
|
|
int ec = this.emitCount;
|
|
|
|
while (ec <= emit_count) {
|
2016-06-17 20:33:04 +00:00
|
|
|
ThreadWaiter w;
|
|
|
|
LocalWaiter tw;
|
|
|
|
acquireWaiter(w, tw);
|
|
|
|
|
|
|
|
Waitable!(
|
2016-06-14 06:01:03 +00:00
|
|
|
cb => tw.wait(cb),
|
|
|
|
cb => tw.cancel()
|
2016-06-17 20:33:04 +00:00
|
|
|
) waitable;
|
|
|
|
asyncAwaitAny!interruptible(timeout, waitable);
|
2016-06-14 06:01:03 +00:00
|
|
|
ec = this.emitCount;
|
|
|
|
}
|
|
|
|
return ec;
|
|
|
|
}
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
|
|
|
|
shared {
|
2016-06-14 06:01:03 +00:00
|
|
|
int ec = this.emitCount;
|
|
|
|
while (ec <= emit_count) {
|
2016-06-17 20:33:04 +00:00
|
|
|
shared(ThreadWaiter) w;
|
|
|
|
LocalWaiter tw;
|
2016-06-15 16:21:33 +00:00
|
|
|
acquireWaiter(w, tw);
|
2016-06-17 20:33:04 +00:00
|
|
|
logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next);
|
2016-06-15 16:21:33 +00:00
|
|
|
|
|
|
|
if (tw.next) {
|
|
|
|
// if we are not the first waiter for this thread,
|
|
|
|
// wait for getting resumed by emitForThisThread
|
2016-06-17 20:33:04 +00:00
|
|
|
Waitable!(
|
2016-06-15 16:21:33 +00:00
|
|
|
cb => tw.wait(cb),
|
|
|
|
cb => tw.cancel()
|
2016-06-17 20:33:04 +00:00
|
|
|
) waitable;
|
|
|
|
asyncAwaitAny!interruptible(timeout, waitable);
|
2016-06-15 16:21:33 +00:00
|
|
|
} else {
|
2016-06-17 20:33:04 +00:00
|
|
|
again:
|
2016-06-15 16:21:33 +00:00
|
|
|
// if we are the first waiter for this thread,
|
|
|
|
// wait for the thread event to get emitted
|
|
|
|
Waitable!(
|
2016-10-05 12:40:29 +00:00
|
|
|
cb => eventDriver.events.wait(ms_threadEvent, cb),
|
|
|
|
cb => eventDriver.events.cancelWait(ms_threadEvent, cb),
|
2016-06-15 16:21:33 +00:00
|
|
|
EventID
|
|
|
|
) eventwaiter;
|
|
|
|
Waitable!(
|
|
|
|
cb => tw.wait(cb),
|
|
|
|
cb => tw.cancel()
|
|
|
|
) localwaiter;
|
2016-06-17 20:33:04 +00:00
|
|
|
logDebugV("Wait on event %s", ms_threadEvent);
|
|
|
|
asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter);
|
|
|
|
|
|
|
|
if (!eventwaiter.cancelled) {
|
|
|
|
if (atomicLoad(w.next) == null)
|
|
|
|
emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode
|
|
|
|
else goto again;
|
|
|
|
} else if (localwaiter.cancelled) break; // timeout
|
|
|
|
}
|
2016-06-15 16:21:33 +00:00
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w,
|
|
|
|
"Waiter did not get removed from waiter queue.");
|
2016-06-15 16:21:33 +00:00
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
ec = this.emitCount;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
2016-06-15 16:21:33 +00:00
|
|
|
return ec;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
private static bool emitForThisThread(LocalWaiter* waiters, EmitMode mode)
|
2016-06-14 06:01:03 +00:00
|
|
|
nothrow {
|
|
|
|
if (!waiters) return false;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
logTrace("emitForThisThread");
|
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
final switch (mode) {
|
|
|
|
case EmitMode.all:
|
|
|
|
while (waiters) {
|
2016-06-17 20:33:04 +00:00
|
|
|
auto wnext = waiters.next;
|
|
|
|
assert(wnext !is waiters);
|
|
|
|
if (waiters.notifier !is null) {
|
|
|
|
logTrace("notify task %s %s %s", cast(void*)waiters, cast(void*)waiters.notifier.funcptr, waiters.notifier.ptr);
|
2016-06-14 06:01:03 +00:00
|
|
|
waiters.notifier();
|
2016-06-17 20:33:04 +00:00
|
|
|
waiters.notifier = null;
|
|
|
|
} else logTrace("notify callback is null");
|
|
|
|
waiters = wnext;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
break;
|
|
|
|
case EmitMode.single:
|
|
|
|
assert(false, "TODO!");
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
private void acquireWaiter(ref ThreadWaiter w, ref LocalWaiter tw)
|
2016-06-14 06:01:03 +00:00
|
|
|
nothrow {
|
2016-06-15 16:21:33 +00:00
|
|
|
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling
|
2016-06-14 06:01:03 +00:00
|
|
|
tw.task = Task.getThis();
|
|
|
|
|
|
|
|
if (m_waiters) {
|
|
|
|
m_waiters.tasks.add(&tw);
|
|
|
|
} else {
|
|
|
|
m_waiters = &w;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
private void acquireWaiter(ref shared(ThreadWaiter) w, ref LocalWaiter tw)
|
2016-06-14 06:01:03 +00:00
|
|
|
nothrow shared {
|
|
|
|
tw.task = Task.getThis();
|
|
|
|
|
|
|
|
if (ms_threadEvent == EventID.init)
|
2016-10-05 12:40:29 +00:00
|
|
|
ms_threadEvent = eventDriver.events.create();
|
2016-06-14 06:01:03 +00:00
|
|
|
|
2016-06-15 16:21:33 +00:00
|
|
|
auto sdriver = cast(shared)eventDriver;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
shared(ThreadWaiter)* pw = atomicLoad(m_waiters);
|
|
|
|
assert(pw !is &w, "Waiter is already registered!");
|
|
|
|
while (pw !is null) {
|
|
|
|
if (pw.driver is sdriver)
|
|
|
|
break;
|
|
|
|
pw = atomicLoad(pw.next);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!pw) {
|
|
|
|
pw = &w;
|
|
|
|
shared(ThreadWaiter)* wn;
|
|
|
|
do {
|
|
|
|
wn = atomicLoad(m_waiters);
|
|
|
|
w.next = wn;
|
|
|
|
w.event = ms_threadEvent;
|
|
|
|
w.driver = sdriver;
|
|
|
|
w.thread = cast(shared)Thread.getThis();
|
|
|
|
} while (!cas(&m_waiters, wn, &w));
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
2016-06-17 20:33:04 +00:00
|
|
|
|
|
|
|
(cast(ThreadWaiter*)pw).tasks.add(&tw);
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private struct StackSList(T)
|
|
|
|
{
|
|
|
|
import core.atomic : cas;
|
|
|
|
|
|
|
|
private T* m_first;
|
|
|
|
|
|
|
|
@property T* first() { return m_first; }
|
|
|
|
@property shared(T)* first() shared { return atomicLoad(m_first); }
|
|
|
|
|
|
|
|
void add(shared(T)* elem)
|
|
|
|
shared {
|
|
|
|
do elem.next = atomicLoad(m_first);
|
|
|
|
while (cas(&m_first, elem.next, elem));
|
|
|
|
}
|
|
|
|
|
|
|
|
void remove(shared(T)* elem)
|
|
|
|
shared {
|
|
|
|
while (true) {
|
|
|
|
shared(T)* w = atomicLoad(m_first), wp;
|
|
|
|
while (w !is elem) {
|
|
|
|
wp = w;
|
|
|
|
w = atomicLoad(w.next);
|
|
|
|
}
|
|
|
|
if (wp !is null) {
|
|
|
|
if (cas(&wp.next, w, w.next))
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
if (cas(&m_first, w, w.next))
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2016-03-01 19:30:42 +00:00
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
bool empty() const { return m_first is null; }
|
|
|
|
|
|
|
|
void add(T* elem)
|
|
|
|
{
|
|
|
|
elem.next = m_first;
|
|
|
|
m_first = elem;
|
|
|
|
}
|
|
|
|
|
|
|
|
void remove(T* elem)
|
|
|
|
{
|
|
|
|
T* w = m_first, wp;
|
|
|
|
while (w !is elem) {
|
|
|
|
assert(w !is null);
|
|
|
|
wp = w;
|
|
|
|
w = w.next;
|
|
|
|
}
|
|
|
|
if (wp) wp.next = w.next;
|
|
|
|
else m_first = w.next;
|
|
|
|
}
|
|
|
|
}
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
private struct TaskMutexImpl(bool INTERRUPTIBLE) {
|
|
|
|
private {
|
|
|
|
shared(bool) m_locked = false;
|
|
|
|
shared(uint) m_waiters = 0;
|
2016-06-14 06:01:03 +00:00
|
|
|
shared(ManualEvent) m_signal;
|
2016-03-01 19:30:42 +00:00
|
|
|
debug Task m_owner;
|
|
|
|
}
|
|
|
|
|
|
|
|
void setup()
|
|
|
|
{
|
2016-06-14 06:01:03 +00:00
|
|
|
m_signal = createSharedManualEvent();
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@trusted bool tryLock()
|
|
|
|
{
|
|
|
|
if (cas(&m_locked, false, true)) {
|
|
|
|
debug m_owner = Task.getThis();
|
2016-10-04 15:53:40 +00:00
|
|
|
debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
|
2016-03-01 19:30:42 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void lock()
|
|
|
|
{
|
|
|
|
if (tryLock()) return;
|
|
|
|
debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
|
|
|
|
atomicOp!"+="(m_waiters, 1);
|
2016-10-04 15:53:40 +00:00
|
|
|
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
|
2016-03-01 19:30:42 +00:00
|
|
|
scope(exit) atomicOp!"-="(m_waiters, 1);
|
|
|
|
auto ecnt = m_signal.emitCount();
|
|
|
|
while (!tryLock()) {
|
|
|
|
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
|
|
|
|
else ecnt = m_signal.waitUninterruptible(ecnt);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void unlock()
|
|
|
|
{
|
|
|
|
assert(m_locked);
|
|
|
|
debug {
|
|
|
|
assert(m_owner == Task.getThis());
|
|
|
|
m_owner = Task();
|
|
|
|
}
|
|
|
|
atomicStore!(MemoryOrder.rel)(m_locked, false);
|
2016-10-04 15:53:40 +00:00
|
|
|
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
|
2016-03-01 19:30:42 +00:00
|
|
|
if (atomicLoad(m_waiters) > 0)
|
|
|
|
m_signal.emit();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
|
|
|
|
import std.stdio;
|
|
|
|
private {
|
|
|
|
core.sync.mutex.Mutex m_mutex;
|
|
|
|
Task m_owner;
|
|
|
|
size_t m_recCount = 0;
|
|
|
|
shared(uint) m_waiters = 0;
|
2016-06-14 06:01:03 +00:00
|
|
|
shared(ManualEvent) m_signal;
|
2016-03-01 19:30:42 +00:00
|
|
|
@property bool m_locked() const { return m_recCount > 0; }
|
|
|
|
}
|
|
|
|
|
|
|
|
void setup()
|
|
|
|
{
|
2016-06-14 06:01:03 +00:00
|
|
|
m_signal = createSharedManualEvent();
|
2016-03-01 19:30:42 +00:00
|
|
|
m_mutex = new core.sync.mutex.Mutex;
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted bool tryLock()
|
|
|
|
{
|
|
|
|
auto self = Task.getThis();
|
|
|
|
return m_mutex.performLocked!({
|
|
|
|
if (!m_owner) {
|
|
|
|
assert(m_recCount == 0);
|
|
|
|
m_recCount = 1;
|
|
|
|
m_owner = self;
|
|
|
|
return true;
|
|
|
|
} else if (m_owner == self) {
|
|
|
|
m_recCount++;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void lock()
|
|
|
|
{
|
|
|
|
if (tryLock()) return;
|
|
|
|
atomicOp!"+="(m_waiters, 1);
|
2016-10-04 15:53:40 +00:00
|
|
|
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
|
2016-03-01 19:30:42 +00:00
|
|
|
scope(exit) atomicOp!"-="(m_waiters, 1);
|
|
|
|
auto ecnt = m_signal.emitCount();
|
|
|
|
while (!tryLock()) {
|
|
|
|
static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
|
|
|
|
else ecnt = m_signal.waitUninterruptible(ecnt);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void unlock()
|
|
|
|
{
|
|
|
|
auto self = Task.getThis();
|
|
|
|
m_mutex.performLocked!({
|
|
|
|
assert(m_owner == self);
|
|
|
|
assert(m_recCount > 0);
|
|
|
|
m_recCount--;
|
|
|
|
if (m_recCount == 0) {
|
|
|
|
m_owner = Task.init;
|
|
|
|
}
|
|
|
|
});
|
2016-10-04 15:53:40 +00:00
|
|
|
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
|
2016-03-01 19:30:42 +00:00
|
|
|
if (atomicLoad(m_waiters) > 0)
|
|
|
|
m_signal.emit();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
|
|
|
private {
|
|
|
|
LOCKABLE m_mutex;
|
|
|
|
|
2016-06-14 06:01:03 +00:00
|
|
|
shared(ManualEvent) m_signal;
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
static if (is(LOCKABLE == Lockable)) {
|
|
|
|
final class MutexWrapper : Lockable {
|
|
|
|
private core.sync.mutex.Mutex m_mutex;
|
|
|
|
this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
|
|
|
|
@trusted void lock() { m_mutex.lock(); }
|
|
|
|
@trusted void unlock() { m_mutex.unlock(); }
|
|
|
|
@trusted bool tryLock() { return m_mutex.tryLock(); }
|
|
|
|
}
|
|
|
|
|
|
|
|
void setup(core.sync.mutex.Mutex mtx)
|
|
|
|
{
|
|
|
|
setup(new MutexWrapper(mtx));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void setup(LOCKABLE mtx)
|
|
|
|
{
|
|
|
|
m_mutex = mtx;
|
2016-06-14 06:01:03 +00:00
|
|
|
m_signal = createSharedManualEvent();
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@property LOCKABLE mutex() { return m_mutex; }
|
|
|
|
|
|
|
|
@trusted void wait()
|
|
|
|
{
|
|
|
|
if (auto tm = cast(TaskMutex)m_mutex) {
|
|
|
|
assert(tm.m_impl.m_locked);
|
|
|
|
debug assert(tm.m_impl.m_owner == Task.getThis());
|
|
|
|
}
|
|
|
|
|
|
|
|
auto refcount = m_signal.emitCount;
|
|
|
|
m_mutex.unlock();
|
|
|
|
scope(exit) m_mutex.lock();
|
|
|
|
static if (INTERRUPTIBLE) m_signal.wait(refcount);
|
|
|
|
else m_signal.waitUninterruptible(refcount);
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted bool wait(Duration timeout)
|
|
|
|
{
|
|
|
|
assert(!timeout.isNegative());
|
|
|
|
if (auto tm = cast(TaskMutex)m_mutex) {
|
|
|
|
assert(tm.m_impl.m_locked);
|
|
|
|
debug assert(tm.m_impl.m_owner == Task.getThis());
|
|
|
|
}
|
|
|
|
|
|
|
|
auto refcount = m_signal.emitCount;
|
|
|
|
m_mutex.unlock();
|
|
|
|
scope(exit) m_mutex.lock();
|
|
|
|
|
|
|
|
static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
|
|
|
|
else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void notify()
|
|
|
|
{
|
|
|
|
m_signal.emit();
|
|
|
|
}
|
|
|
|
|
|
|
|
@trusted void notifyAll()
|
|
|
|
{
|
|
|
|
m_signal.emit();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Contains the shared state of a $(D TaskReadWriteMutex).
|
|
|
|
*
|
|
|
|
* Since a $(D TaskReadWriteMutex) consists of two actual Mutex
|
|
|
|
* objects that rely on common memory, this class implements
|
|
|
|
* the actual functionality of their method calls.
|
|
|
|
*
|
|
|
|
* The method implementations are based on two static parameters
|
|
|
|
* ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
|
|
|
|
* template arguments:
|
|
|
|
*
|
|
|
|
* - $(D INTERRUPTIBLE) determines whether the mutex implementation
|
|
|
|
* are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
|
|
|
|
* method or not.
|
|
|
|
*
|
|
|
|
* - $(D INTENT) describes the intent, with which a locking operation is
|
|
|
|
* performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
|
|
|
|
* multiple Tasks holding the mutex, whereas RW locking will cause
|
|
|
|
* a "bottleneck" so that only one Task can write to the protected
|
|
|
|
* data at once.
|
|
|
|
*/
|
|
|
|
private struct ReadWriteMutexState(bool INTERRUPTIBLE)
|
|
|
|
{
|
|
|
|
/** The policy with which the mutex should operate.
|
|
|
|
*
|
|
|
|
* The policy determines how the acquisition of the locks is
|
|
|
|
* performed and can be used to tune the mutex according to the
|
|
|
|
* underlying algorithm in which it is used.
|
|
|
|
*
|
|
|
|
* According to the provided policy, the mutex will either favor
|
|
|
|
* reading or writing tasks and could potentially starve the
|
|
|
|
* respective opposite.
|
|
|
|
*
|
|
|
|
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
|
|
|
|
*/
|
|
|
|
enum Policy : int
|
|
|
|
{
|
|
|
|
/** Readers are prioritized, writers may be starved as a result. */
|
|
|
|
PREFER_READERS = 0,
|
|
|
|
/** Writers are prioritized, readers may be starved as a result. */
|
|
|
|
PREFER_WRITERS
|
|
|
|
}
|
|
|
|
|
|
|
|
/** The intent with which a locking operation is performed.
|
|
|
|
*
|
|
|
|
* Since both locks share the same underlying algorithms, the actual
|
|
|
|
* intent with which a lock operation is performed (i.e read/write)
|
|
|
|
* are passed as a template parameter to each method.
|
|
|
|
*/
|
|
|
|
enum LockingIntent : bool
|
|
|
|
{
|
|
|
|
/** Perform a read lock/unlock operation. Multiple reading locks can be
|
|
|
|
* active at a time. */
|
|
|
|
READ_ONLY = 0,
|
|
|
|
/** Perform a write lock/unlock operation. Only a single writer can
|
|
|
|
* hold a lock at any given time. */
|
|
|
|
READ_WRITE = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
private {
|
|
|
|
//Queue counters
|
|
|
|
/** The number of reading tasks waiting for the lock to become available. */
|
|
|
|
shared(uint) m_waitingForReadLock = 0;
|
|
|
|
/** The number of writing tasks waiting for the lock to become available. */
|
|
|
|
shared(uint) m_waitingForWriteLock = 0;
|
|
|
|
|
|
|
|
//Lock counters
|
|
|
|
/** The number of reading tasks that currently hold the lock. */
|
|
|
|
uint m_activeReadLocks = 0;
|
|
|
|
/** The number of writing tasks that currently hold the lock (binary). */
|
|
|
|
ubyte m_activeWriteLocks = 0;
|
|
|
|
|
|
|
|
/** The policy determining the lock's behavior. */
|
|
|
|
Policy m_policy;
|
|
|
|
|
|
|
|
//Queue Events
|
|
|
|
/** The event used to wake reading tasks waiting for the lock while it is blocked. */
|
2016-06-14 06:01:03 +00:00
|
|
|
shared(ManualEvent) m_readyForReadLock;
|
2016-03-01 19:30:42 +00:00
|
|
|
/** The event used to wake writing tasks waiting for the lock while it is blocked. */
|
2016-06-14 06:01:03 +00:00
|
|
|
shared(ManualEvent) m_readyForWriteLock;
|
2016-03-01 19:30:42 +00:00
|
|
|
|
|
|
|
/** The underlying mutex that gates the access to the shared state. */
|
|
|
|
Mutex m_counterMutex;
|
|
|
|
}
|
|
|
|
|
|
|
|
this(Policy policy)
|
|
|
|
{
|
|
|
|
m_policy = policy;
|
|
|
|
m_counterMutex = new Mutex();
|
2016-06-14 06:01:03 +00:00
|
|
|
m_readyForReadLock = createSharedManualEvent();
|
|
|
|
m_readyForWriteLock = createSharedManualEvent();
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@disable this(this);
|
|
|
|
|
|
|
|
/** The policy with which the lock has been created. */
|
|
|
|
@property policy() const { return m_policy; }
|
|
|
|
|
|
|
|
version(RWMutexPrint)
|
|
|
|
{
|
|
|
|
/** Print out debug information during lock operations. */
|
|
|
|
void printInfo(string OP, LockingIntent INTENT)() nothrow
|
|
|
|
{
|
|
|
|
import std.string;
|
|
|
|
try
|
|
|
|
{
|
|
|
|
import std.stdio;
|
|
|
|
writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
|
|
|
|
OP.leftJustify(10,' '),
|
|
|
|
INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
|
|
|
|
m_activeReadLocks, m_activeWriteLocks,
|
|
|
|
m_waitingForReadLock, m_waitingForWriteLock
|
|
|
|
);
|
|
|
|
}
|
|
|
|
catch (Throwable t){}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to determine the queue event for a given intent. */
|
|
|
|
@property ref auto queueEvent(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
static if (INTENT == LockingIntent.READ_ONLY)
|
|
|
|
return m_readyForReadLock;
|
|
|
|
else
|
|
|
|
return m_readyForWriteLock;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to determine the queue counter for a given intent. */
|
|
|
|
@property ref auto queueCounter(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
static if (INTENT == LockingIntent.READ_ONLY)
|
|
|
|
return m_waitingForReadLock;
|
|
|
|
else
|
|
|
|
return m_waitingForWriteLock;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
|
|
|
|
int emitCount(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
return queueEvent!INTENT.emitCount();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to determine the active counter for a given intent. */
|
|
|
|
@property ref auto activeCounter(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
static if (INTENT == LockingIntent.READ_ONLY)
|
|
|
|
return m_activeReadLocks;
|
|
|
|
else
|
|
|
|
return m_activeWriteLocks;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to wait for the queue event for a given intent.
|
|
|
|
*
|
|
|
|
* This method is used during the `lock()` operation, after a
|
|
|
|
* `tryLock()` operation has been unsuccessfully finished.
|
|
|
|
* The active fiber will yield and be suspended until the queue event
|
|
|
|
* for the given intent will be fired.
|
|
|
|
*/
|
|
|
|
int wait(LockingIntent INTENT)(int count)
|
|
|
|
{
|
|
|
|
static if (INTERRUPTIBLE)
|
|
|
|
return queueEvent!INTENT.wait(count);
|
|
|
|
else
|
|
|
|
return queueEvent!INTENT.waitUninterruptible(count);
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal shortcut method to notify tasks waiting for the lock to become available again.
|
|
|
|
*
|
|
|
|
* This method is called whenever the number of owners of the mutex hits
|
|
|
|
* zero; this is basically the counterpart to `wait()`.
|
|
|
|
* It wakes any Task currently waiting for the mutex to be released.
|
|
|
|
*/
|
|
|
|
@trusted void notify(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
static if (INTENT == LockingIntent.READ_ONLY)
|
|
|
|
{ //If the last reader unlocks the mutex, notify all waiting writers
|
|
|
|
if (atomicLoad(m_waitingForWriteLock) > 0)
|
|
|
|
m_readyForWriteLock.emit();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{ //If a writer unlocks the mutex, notify both readers and writers
|
|
|
|
if (atomicLoad(m_waitingForReadLock) > 0)
|
|
|
|
m_readyForReadLock.emit();
|
|
|
|
|
|
|
|
if (atomicLoad(m_waitingForWriteLock) > 0)
|
|
|
|
m_readyForWriteLock.emit();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** An internal method that performs the acquisition attempt in different variations.
|
|
|
|
*
|
|
|
|
* Since both locks rely on a common TaskMutex object which gates the access
|
|
|
|
* to their common data acquisition attempts for this lock are more complex
|
|
|
|
* than for simple mutex variants. This method will thus be performing the
|
|
|
|
* `tryLock()` operation in two variations, depending on the callee:
|
|
|
|
*
|
|
|
|
* If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
|
|
|
|
* will instantly fail if the underlying mutex is locked (i.e. during another
|
|
|
|
* `tryLock()` or `unlock()` operation), in order to guarantee the fastest
|
|
|
|
* possible locking attempt.
|
|
|
|
*
|
|
|
|
* If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
|
|
|
|
* the operation will wait for the mutex to be available before deciding if
|
|
|
|
* the lock can be acquired, since the attempt would anyway be repeated until
|
|
|
|
* it succeeds. This will prevent frequent retries under heavy loads and thus
|
|
|
|
* should ensure better performance.
|
|
|
|
*/
|
|
|
|
@trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
|
|
|
|
{
|
|
|
|
//Log a debug statement for the attempt
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("tryLock",INTENT)();
|
|
|
|
|
|
|
|
//Try to acquire the lock
|
|
|
|
static if (!WAIT_FOR_BLOCKING_MUTEX)
|
|
|
|
{
|
|
|
|
if (!m_counterMutex.tryLock())
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
m_counterMutex.lock();
|
|
|
|
|
|
|
|
scope(exit)
|
|
|
|
m_counterMutex.unlock();
|
|
|
|
|
|
|
|
//Log a debug statement for the attempt
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("checkCtrs",INTENT)();
|
|
|
|
|
|
|
|
//Check if there's already an active writer
|
|
|
|
if (m_activeWriteLocks > 0)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
//If writers are preferred over readers, check whether there
|
|
|
|
//currently is a writer in the waiting queue and abort if
|
|
|
|
//that's the case.
|
|
|
|
static if (INTENT == LockingIntent.READ_ONLY)
|
|
|
|
if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
//If we are locking the mutex for writing, make sure that
|
|
|
|
//there's no reader active.
|
|
|
|
static if (INTENT == LockingIntent.READ_WRITE)
|
|
|
|
if (m_activeReadLocks > 0)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
//We can successfully acquire the lock!
|
|
|
|
//Log a debug statement for the success.
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("lock",INTENT)();
|
|
|
|
|
|
|
|
//Increase the according counter
|
|
|
|
//(number of active readers/writers)
|
|
|
|
//and return a success code.
|
|
|
|
activeCounter!INTENT += 1;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Attempt to acquire the lock for a given intent.
|
|
|
|
*
|
|
|
|
* Returns:
|
|
|
|
* `true`, if the lock was successfully acquired;
|
|
|
|
* `false` otherwise.
|
|
|
|
*/
|
|
|
|
@trusted bool tryLock(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
//Try to lock this mutex without waiting for the underlying
|
|
|
|
//TaskMutex - fail if it is already blocked.
|
|
|
|
return tryLock!(INTENT,false)();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
|
|
|
|
@trusted void lock(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
//Prepare a waiting action before the first
|
|
|
|
//`tryLock()` call in order to avoid a race
|
|
|
|
//condition that could lead to the queue notification
|
|
|
|
//not being fired.
|
|
|
|
auto count = emitCount!INTENT;
|
|
|
|
atomicOp!"+="(queueCounter!INTENT,1);
|
|
|
|
scope(exit)
|
|
|
|
atomicOp!"-="(queueCounter!INTENT,1);
|
|
|
|
|
|
|
|
//Try to lock the mutex
|
|
|
|
auto locked = tryLock!(INTENT,true)();
|
|
|
|
if (locked)
|
|
|
|
return;
|
|
|
|
|
|
|
|
//Retry until we successfully acquired the lock
|
|
|
|
while(!locked)
|
|
|
|
{
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("wait",INTENT)();
|
|
|
|
|
|
|
|
count = wait!INTENT(count);
|
|
|
|
locked = tryLock!(INTENT,true)();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Unlock the mutex after a successful acquisition. */
|
|
|
|
@trusted void unlock(LockingIntent INTENT)()
|
|
|
|
{
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("unlock",INTENT)();
|
|
|
|
|
|
|
|
debug assert(activeCounter!INTENT > 0);
|
|
|
|
|
|
|
|
synchronized(m_counterMutex)
|
|
|
|
{
|
|
|
|
//Decrement the counter of active lock holders.
|
|
|
|
//If the counter hits zero, notify waiting Tasks
|
|
|
|
activeCounter!INTENT -= 1;
|
|
|
|
if (activeCounter!INTENT == 0)
|
|
|
|
{
|
|
|
|
version(RWMutexPrint)
|
|
|
|
printInfo!("notify",INTENT)();
|
|
|
|
|
|
|
|
notify!INTENT();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** A ReadWriteMutex implementation for fibers.
|
|
|
|
*
|
|
|
|
* This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
|
|
|
|
* but does not block the event loop in contention situations. The `reader` and `writer`
|
|
|
|
* members are used for locking. Locking the `reader` mutex allows access to multiple
|
|
|
|
* readers at once, while the `writer` mutex only allows a single writer to lock it at
|
|
|
|
* any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
|
|
|
|
* writer is active, no readers can be active at the same time, and vice versa).
|
|
|
|
*
|
|
|
|
* Notice:
|
|
|
|
* Mutexes implemented by this class cannot be interrupted
|
|
|
|
* using $(D vibe.core.task.Task.interrupt()). The corresponding
|
|
|
|
* InterruptException will be deferred until the next blocking
|
|
|
|
* operation yields the event loop.
|
|
|
|
*
|
|
|
|
* Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
|
|
|
|
* interrupted.
|
|
|
|
*
|
|
|
|
* cf. $(D core.sync.mutex.ReadWriteMutex)
|
|
|
|
*/
|
|
|
|
class TaskReadWriteMutex
|
|
|
|
{
|
|
|
|
private {
|
|
|
|
alias State = ReadWriteMutexState!false;
|
|
|
|
alias LockingIntent = State.LockingIntent;
|
|
|
|
alias READ_ONLY = LockingIntent.READ_ONLY;
|
|
|
|
alias READ_WRITE = LockingIntent.READ_WRITE;
|
|
|
|
|
|
|
|
/** The shared state used by the reader and writer mutexes. */
|
|
|
|
State m_state;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** The policy with which the mutex should operate.
|
|
|
|
*
|
|
|
|
* The policy determines how the acquisition of the locks is
|
|
|
|
* performed and can be used to tune the mutex according to the
|
|
|
|
* underlying algorithm in which it is used.
|
|
|
|
*
|
|
|
|
* According to the provided policy, the mutex will either favor
|
|
|
|
* reading or writing tasks and could potentially starve the
|
|
|
|
* respective opposite.
|
|
|
|
*
|
|
|
|
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
|
|
|
|
*/
|
|
|
|
alias Policy = State.Policy;
|
|
|
|
|
|
|
|
/** A common baseclass for both of the provided mutexes.
|
|
|
|
*
|
|
|
|
* The intent for the according mutex is specified through the
|
|
|
|
* $(D INTENT) template argument, which determines if a mutex is
|
|
|
|
* used for read or write locking.
|
|
|
|
*/
|
|
|
|
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
|
|
|
|
{
|
|
|
|
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override bool tryLock() { return m_state.tryLock!INTENT(); }
|
|
|
|
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override void lock() { m_state.lock!INTENT(); }
|
|
|
|
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override void unlock() { m_state.unlock!INTENT(); }
|
|
|
|
}
|
|
|
|
alias Reader = Mutex!READ_ONLY;
|
|
|
|
alias Writer = Mutex!READ_WRITE;
|
|
|
|
|
|
|
|
Reader reader;
|
|
|
|
Writer writer;
|
|
|
|
|
|
|
|
this(Policy policy = Policy.PREFER_WRITERS)
|
|
|
|
{
|
|
|
|
m_state = State(policy);
|
|
|
|
reader = new Reader();
|
|
|
|
writer = new Writer();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** The policy with which the lock has been created. */
|
|
|
|
@property Policy policy() const { return m_state.policy; }
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Alternative to $(D TaskReadWriteMutex) that supports interruption.
|
|
|
|
*
|
|
|
|
* This class supports the use of $(D vibe.core.task.Task.interrupt()) while
|
|
|
|
* waiting in the `lock()` method.
|
|
|
|
*
|
|
|
|
* cf. $(D core.sync.mutex.ReadWriteMutex)
|
|
|
|
*/
|
|
|
|
class InterruptibleTaskReadWriteMutex
|
|
|
|
{
|
|
|
|
private {
|
|
|
|
alias State = ReadWriteMutexState!true;
|
|
|
|
alias LockingIntent = State.LockingIntent;
|
|
|
|
alias READ_ONLY = LockingIntent.READ_ONLY;
|
|
|
|
alias READ_WRITE = LockingIntent.READ_WRITE;
|
|
|
|
|
|
|
|
/** The shared state used by the reader and writer mutexes. */
|
|
|
|
State m_state;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** The policy with which the mutex should operate.
|
|
|
|
*
|
|
|
|
* The policy determines how the acquisition of the locks is
|
|
|
|
* performed and can be used to tune the mutex according to the
|
|
|
|
* underlying algorithm in which it is used.
|
|
|
|
*
|
|
|
|
* According to the provided policy, the mutex will either favor
|
|
|
|
* reading or writing tasks and could potentially starve the
|
|
|
|
* respective opposite.
|
|
|
|
*
|
|
|
|
* cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
|
|
|
|
*/
|
|
|
|
alias Policy = State.Policy;
|
|
|
|
|
|
|
|
/** A common baseclass for both of the provided mutexes.
|
|
|
|
*
|
|
|
|
* The intent for the according mutex is specified through the
|
|
|
|
* $(D INTENT) template argument, which determines if a mutex is
|
|
|
|
* used for read or write locking.
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
|
|
|
|
{
|
|
|
|
/** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override bool tryLock() { return m_state.tryLock!INTENT(); }
|
|
|
|
/** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override void lock() { m_state.lock!INTENT(); }
|
|
|
|
/** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
|
|
|
|
override void unlock() { m_state.unlock!INTENT(); }
|
|
|
|
}
|
|
|
|
alias Reader = Mutex!READ_ONLY;
|
|
|
|
alias Writer = Mutex!READ_WRITE;
|
|
|
|
|
|
|
|
Reader reader;
|
|
|
|
Writer writer;
|
|
|
|
|
|
|
|
this(Policy policy = Policy.PREFER_WRITERS)
|
|
|
|
{
|
|
|
|
m_state = State(policy);
|
|
|
|
reader = new Reader();
|
|
|
|
writer = new Writer();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** The policy with which the lock has been created. */
|
|
|
|
@property Policy policy() const { return m_state.policy; }
|
|
|
|
}
|