Safe-ify sync module.

This commit is contained in:
Sönke Ludwig 2016-11-02 21:01:09 +01:00
parent c7e1468c87
commit 5e89ac4e91

View file

@ -19,6 +19,23 @@ import std.stdio;
import std.traits : ReturnType; import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers.
*/
ManualEvent createManualEvent()
@safe {
return ManualEvent.init;
}
/// ditto
shared(ManualEvent) createSharedManualEvent()
@trusted {
return shared(ManualEvent).init;
}
ScopedMutexLock!M scopedMutexLock(M : Mutex)(M mutex, LockMode mode = LockMode.lock)
{
return ScopedMutexLock!M(mutex, mode);
}
enum LockMode { enum LockMode {
lock, lock,
tryLock, tryLock,
@ -34,16 +51,16 @@ interface Lockable {
/** RAII lock for the Mutex class. /** RAII lock for the Mutex class.
*/ */
struct ScopedMutexLock struct ScopedMutexLock(M : Mutex = core.sync.mutex.Mutex)
{ {
@disable this(this); @disable this(this);
private { private {
Mutex m_mutex; M m_mutex;
bool m_locked; bool m_locked;
LockMode m_mode; LockMode m_mode;
} }
this(core.sync.mutex.Mutex mutex, LockMode mode = LockMode.lock) { this(M mutex, LockMode mode = LockMode.lock) {
assert(mutex !is null); assert(mutex !is null);
m_mutex = mutex; m_mutex = mutex;
@ -121,6 +138,8 @@ unittest {
*/ */
class LocalTaskSemaphore class LocalTaskSemaphore
{ {
@safe:
// requires a queue // requires a queue
import std.container.binaryheap; import std.container.binaryheap;
import std.container.array; import std.container.array;
@ -190,7 +209,7 @@ class LocalTaskSemaphore
if (++m_seq == uint.max) if (++m_seq == uint.max)
rewindSeq(); rewindSeq();
m_waiters.insert(w); () @trusted { m_waiters.insert(w); } ();
do w.signal.wait(); while (!tryLock()); do w.signal.wait(); while (!tryLock());
// on resume: // on resume:
destroy(w.signal); destroy(w.signal);
@ -204,7 +223,7 @@ class LocalTaskSemaphore
if (m_waiters.length > 0 && available > 0) { if (m_waiters.length > 0 && available > 0) {
ThreadWaiter w = m_waiters.front(); ThreadWaiter w = m_waiters.front();
w.signal.emit(); // resume one w.signal.emit(); // resume one
m_waiters.removeFront(); () @trusted { m_waiters.removeFront(); } ();
} }
} }
@ -225,7 +244,7 @@ class LocalTaskSemaphore
} }
private void rewindSeq() private void rewindSeq()
{ @trusted {
Array!ThreadWaiter waiters = m_waiters.release(); Array!ThreadWaiter waiters = m_waiters.release();
ushort min_seq; ushort min_seq;
import std.algorithm : min; import std.algorithm : min;
@ -257,6 +276,8 @@ class LocalTaskSemaphore
See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
*/ */
class TaskMutex : core.sync.mutex.Mutex, Lockable { class TaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private TaskMutexImpl!false m_impl; private TaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); } this(Object o) { m_impl.setup(); super(o); }
@ -271,16 +292,16 @@ unittest {
auto mutex = new TaskMutex; auto mutex = new TaskMutex;
{ {
auto lock = ScopedMutexLock(mutex); auto lock = scopedMutexLock(mutex);
assert(lock.locked); assert(lock.locked);
assert(mutex.m_impl.m_locked); assert(mutex.m_impl.m_locked);
auto lock2 = ScopedMutexLock(mutex, LockMode.tryLock); auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
assert(!lock2.locked); assert(!lock2.locked);
} }
assert(!mutex.m_impl.m_locked); assert(!mutex.m_impl.m_locked);
auto lock = ScopedMutexLock(mutex, LockMode.tryLock); auto lock = scopedMutexLock(mutex, LockMode.tryLock);
assert(lock.locked); assert(lock.locked);
lock.unlock(); lock.unlock();
assert(!lock.locked); assert(!lock.locked);
@ -296,7 +317,7 @@ unittest {
assert(!mutex.m_impl.m_locked); assert(!mutex.m_impl.m_locked);
static if (__VERSION__ >= 2067) { static if (__VERSION__ >= 2067) {
with(mutex.ScopedMutexLock) { with(mutex.scopedMutexLock) {
assert(mutex.m_impl.m_locked); assert(mutex.m_impl.m_locked);
} }
} }
@ -356,6 +377,8 @@ unittest {
See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex) See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
*/ */
final class InterruptibleTaskMutex : Lockable { final class InterruptibleTaskMutex : Lockable {
@safe:
private TaskMutexImpl!true m_impl; private TaskMutexImpl!true m_impl;
this() { m_impl.setup(); } this() { m_impl.setup(); }
@ -390,6 +413,8 @@ unittest {
See_Also: TaskMutex, core.sync.mutex.Mutex See_Also: TaskMutex, core.sync.mutex.Mutex
*/ */
class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable { class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
@safe:
private RecursiveTaskMutexImpl!false m_impl; private RecursiveTaskMutexImpl!false m_impl;
this(Object o) { m_impl.setup(); super(o); } this(Object o) { m_impl.setup(); super(o); }
@ -416,6 +441,8 @@ unittest {
See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex) See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
*/ */
final class InterruptibleRecursiveTaskMutex : Lockable { final class InterruptibleRecursiveTaskMutex : Lockable {
@safe:
private RecursiveTaskMutexImpl!true m_impl; private RecursiveTaskMutexImpl!true m_impl;
this() { m_impl.setup(); } this() { m_impl.setup(); }
@ -551,6 +578,8 @@ private void runMutexUnitTests(M)()
See_Also: InterruptibleTaskCondition See_Also: InterruptibleTaskCondition
*/ */
class TaskCondition : core.sync.condition.Condition { class TaskCondition : core.sync.condition.Condition {
@safe:
private TaskConditionImpl!(false, Mutex) m_impl; private TaskConditionImpl!(false, Mutex) m_impl;
this(core.sync.mutex.Mutex mtx) { this(core.sync.mutex.Mutex mtx) {
@ -625,6 +654,8 @@ unittest {
See_Also: `TaskCondition` See_Also: `TaskCondition`
*/ */
final class InterruptibleTaskCondition { final class InterruptibleTaskCondition {
@safe:
private TaskConditionImpl!(true, Lockable) m_impl; private TaskConditionImpl!(true, Lockable) m_impl;
this(core.sync.mutex.Mutex mtx) { m_impl.setup(mtx); } this(core.sync.mutex.Mutex mtx) { m_impl.setup(mtx); }
@ -638,18 +669,6 @@ final class InterruptibleTaskCondition {
} }
/** Creates a new signal that can be shared between fibers.
*/
ManualEvent createManualEvent()
{
return ManualEvent.init;
}
/// ditto
shared(ManualEvent) createSharedManualEvent()
{
return shared(ManualEvent).init;
}
/** A manually triggered cross-task event. /** A manually triggered cross-task event.
Note: the ownership can be shared between multiple fibers and threads. Note: the ownership can be shared between multiple fibers and threads.
@ -897,7 +916,7 @@ struct ManualEvent {
auto wnext = waiters.next; auto wnext = waiters.next;
assert(wnext !is waiters); assert(wnext !is waiters);
if (waiters.notifier !is null) { if (waiters.notifier !is null) {
logTrace("notify task %s %s %s", cast(void*)waiters, cast(void*)waiters.notifier.funcptr, waiters.notifier.ptr); logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr);
waiters.notifier(); waiters.notifier();
waiters.notifier = null; waiters.notifier = null;
} else logTrace("notify callback is null"); } else logTrace("notify callback is null");
@ -1607,6 +1626,8 @@ class TaskReadWriteMutex
*/ */
class InterruptibleTaskReadWriteMutex class InterruptibleTaskReadWriteMutex
{ {
@safe:
private { private {
alias State = ReadWriteMutexState!true; alias State = ReadWriteMutexState!true;
alias LockingIntent = State.LockingIntent; alias LockingIntent = State.LockingIntent;