diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 11afc0d..6aef38b 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -32,15 +32,17 @@ import std.traits : ReturnType; /** Creates a new signal that can be shared between fibers. */ LocalManualEvent createManualEvent() -@safe { +@safe nothrow { LocalManualEvent ret; ret.initialize(); return ret; } /// ditto shared(ManualEvent) createSharedManualEvent() -@trusted { - return shared(ManualEvent).init; +@trusted nothrow { + shared(ManualEvent) ret; + ret.initialize(); + return ret; } @@ -800,7 +802,7 @@ struct LocalManualEvent { package static EventID ms_threadEvent; private void initialize() - { + nothrow { import vibe.internal.allocator : Mallocator, makeGCSafe; m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); } @@ -812,11 +814,13 @@ struct LocalManualEvent { } ~this() - { + nothrow { import vibe.internal.allocator : Mallocator, disposeGCSafe; if (m_waiter) { - if (!m_waiter.releaseRef()) + if (!m_waiter.releaseRef()) { + static if (__VERSION__ < 2087) scope (failure) assert(false); () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); + } } } @@ -994,7 +998,7 @@ struct ManualEvent { StackSList!ThreadWaiter active; // actively waiting StackSList!ThreadWaiter free; // free-list of reusable waiter structs } - Monitor!(Waiters, shared(SpinLock)) m_waiters; + Monitor!(Waiters, shared(Mutex)) m_waiters; } // thread destructor in vibe.core.core will decrement the ref. count @@ -1007,6 +1011,11 @@ struct ManualEvent { @disable this(this); + private void initialize() + shared nothrow { + m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex); + } + deprecated("ManualEvent is always non-null!") bool opCast() const shared nothrow { return true; } @@ -1304,18 +1313,32 @@ package shared struct Monitor(T, M) static struct Locked { shared(Monitor)* m; @disable this(this); - ~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); } + ~this() { + () @trusted { + static if (is(typeof(Mutex.init.unlock_nothrow()))) + (cast(Mutex)m.mutex).unlock_nothrow(); + else (cast(Mutex)m.mutex).unlock(); + } (); + } ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } alias get this; } Locked lock() { - () @trusted { (cast(Mutex)mutex).lock(); } (); + () @trusted { + static if (is(typeof(Mutex.init.lock_nothrow()))) + (cast(Mutex)mutex).lock_nothrow(); + else (cast(Mutex)mutex).lock(); + } (); return Locked(() @trusted { return &this; } ()); } const(Locked) lock() const { - () @trusted { (cast(Mutex)mutex).lock(); } (); + () @trusted { + static if (is(typeof(Mutex.init.lock_nothrow()))) + (cast(Mutex)mutex).lock_nothrow(); + else (cast(Mutex)mutex).lock(); + } (); return const(Locked)(() @trusted { return &this; } ()); } } @@ -1328,35 +1351,6 @@ package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex) return ret; } -package struct SpinLock { - private shared int locked; - debug static int threadID; - - @safe nothrow @nogc shared: - - bool tryLock() - @trusted { - debug { - import core.thread : Thread; - if (threadID == 0) threadID = cast(int)cast(void*)Thread.getThis(); - if (threadID == 0) threadID = -1; // workaround for non-D threads - assert(atomicLoad(locked) != threadID, "Recursive lock attempt."); - int tid = threadID; - } else int tid = 1; - return cas(&locked, 0, tid); - } - - void lock() - { - while (!tryLock()) {} - } - - void unlock() - @trusted { - debug assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread."); - atomicStore(locked, 0); - } -} private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { import vibe.internal.list : CircularDList; @@ -1535,6 +1529,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { void setup() { + m_signal.initialize(); } @trusted bool tryLock() @@ -1589,6 +1584,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { void setup() { m_mutex = new core.sync.mutex.Mutex; + m_signal.initialize(); } @trusted bool tryLock() @@ -1666,6 +1662,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { m_mutex = mtx; static if (is(typeof(m_taskMutex))) m_taskMutex = cast(TaskMutex)mtx; + m_signal.initialize(); } @property LOCKABLE mutex() { return m_mutex; } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 7950da9..7b32eac 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -342,6 +342,7 @@ final package class TaskFiber : Fiber { this() @trusted nothrow { super(&run, ms_taskStackSize); + m_onExit = createSharedManualEvent(); m_thread = Thread.getThis(); } diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index e2bb428..7f9631e 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -10,7 +10,7 @@ module vibe.core.taskpool; import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.log; -import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor; +import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor; import vibe.core.task : Task, TaskFuncInfo, callWithMove; import core.sync.mutex : Mutex; import core.thread : Thread; @@ -27,7 +27,7 @@ shared final class TaskPool { TaskQueue queue; bool term; } - vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state; + vibe.core.sync.Monitor!(State, shared(Mutex)) m_state; shared(ManualEvent) m_signal; immutable size_t m_threadCount; } @@ -43,6 +43,7 @@ shared final class TaskPool { m_threadCount = thread_count; m_signal = createSharedManualEvent(); + m_state = createMonitor!State(new shared Mutex); with (m_state.lock) { queue.setup();