From 5640516ba811d82f26bfe625cfeff4e7bf5ef7f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 17 Sep 2019 14:35:52 +0200 Subject: [PATCH] Remove SpinLock and use core.sync.mutex.Mutex instead. The places that used it showed too much contention in load tests, so that the spin lock was more expensive that the regular OS mutex/futex. --- source/vibe/core/sync.d | 75 ++++++++++++++++++------------------- source/vibe/core/task.d | 1 + source/vibe/core/taskpool.d | 5 ++- 3 files changed, 40 insertions(+), 41 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 11afc0d..6aef38b 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -32,15 +32,17 @@ import std.traits : ReturnType; /** Creates a new signal that can be shared between fibers. */ LocalManualEvent createManualEvent() -@safe { +@safe nothrow { LocalManualEvent ret; ret.initialize(); return ret; } /// ditto shared(ManualEvent) createSharedManualEvent() -@trusted { - return shared(ManualEvent).init; +@trusted nothrow { + shared(ManualEvent) ret; + ret.initialize(); + return ret; } @@ -800,7 +802,7 @@ struct LocalManualEvent { package static EventID ms_threadEvent; private void initialize() - { + nothrow { import vibe.internal.allocator : Mallocator, makeGCSafe; m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); } @@ -812,11 +814,13 @@ struct LocalManualEvent { } ~this() - { + nothrow { import vibe.internal.allocator : Mallocator, disposeGCSafe; if (m_waiter) { - if (!m_waiter.releaseRef()) + if (!m_waiter.releaseRef()) { + static if (__VERSION__ < 2087) scope (failure) assert(false); () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); + } } } @@ -994,7 +998,7 @@ struct ManualEvent { StackSList!ThreadWaiter active; // actively waiting StackSList!ThreadWaiter free; // free-list of reusable waiter structs } - Monitor!(Waiters, shared(SpinLock)) m_waiters; + Monitor!(Waiters, shared(Mutex)) m_waiters; } // thread destructor in vibe.core.core will decrement the ref. count @@ -1007,6 +1011,11 @@ struct ManualEvent { @disable this(this); + private void initialize() + shared nothrow { + m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex); + } + deprecated("ManualEvent is always non-null!") bool opCast() const shared nothrow { return true; } @@ -1304,18 +1313,32 @@ package shared struct Monitor(T, M) static struct Locked { shared(Monitor)* m; @disable this(this); - ~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); } + ~this() { + () @trusted { + static if (is(typeof(Mutex.init.unlock_nothrow()))) + (cast(Mutex)m.mutex).unlock_nothrow(); + else (cast(Mutex)m.mutex).unlock(); + } (); + } ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } alias get this; } Locked lock() { - () @trusted { (cast(Mutex)mutex).lock(); } (); + () @trusted { + static if (is(typeof(Mutex.init.lock_nothrow()))) + (cast(Mutex)mutex).lock_nothrow(); + else (cast(Mutex)mutex).lock(); + } (); return Locked(() @trusted { return &this; } ()); } const(Locked) lock() const { - () @trusted { (cast(Mutex)mutex).lock(); } (); + () @trusted { + static if (is(typeof(Mutex.init.lock_nothrow()))) + (cast(Mutex)mutex).lock_nothrow(); + else (cast(Mutex)mutex).lock(); + } (); return const(Locked)(() @trusted { return &this; } ()); } } @@ -1328,35 +1351,6 @@ package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex) return ret; } -package struct SpinLock { - private shared int locked; - debug static int threadID; - - @safe nothrow @nogc shared: - - bool tryLock() - @trusted { - debug { - import core.thread : Thread; - if (threadID == 0) threadID = cast(int)cast(void*)Thread.getThis(); - if (threadID == 0) threadID = -1; // workaround for non-D threads - assert(atomicLoad(locked) != threadID, "Recursive lock attempt."); - int tid = threadID; - } else int tid = 1; - return cas(&locked, 0, tid); - } - - void lock() - { - while (!tryLock()) {} - } - - void unlock() - @trusted { - debug assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread."); - atomicStore(locked, 0); - } -} private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { import vibe.internal.list : CircularDList; @@ -1535,6 +1529,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { void setup() { + m_signal.initialize(); } @trusted bool tryLock() @@ -1589,6 +1584,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { void setup() { m_mutex = new core.sync.mutex.Mutex; + m_signal.initialize(); } @trusted bool tryLock() @@ -1666,6 +1662,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { m_mutex = mtx; static if (is(typeof(m_taskMutex))) m_taskMutex = cast(TaskMutex)mtx; + m_signal.initialize(); } @property LOCKABLE mutex() { return m_mutex; } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 7950da9..7b32eac 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -342,6 +342,7 @@ final package class TaskFiber : Fiber { this() @trusted nothrow { super(&run, ms_taskStackSize); + m_onExit = createSharedManualEvent(); m_thread = Thread.getThis(); } diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index e2bb428..7f9631e 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -10,7 +10,7 @@ module vibe.core.taskpool; import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.log; -import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor; +import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor; import vibe.core.task : Task, TaskFuncInfo, callWithMove; import core.sync.mutex : Mutex; import core.thread : Thread; @@ -27,7 +27,7 @@ shared final class TaskPool { TaskQueue queue; bool term; } - vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state; + vibe.core.sync.Monitor!(State, shared(Mutex)) m_state; shared(ManualEvent) m_signal; immutable size_t m_threadCount; } @@ -43,6 +43,7 @@ shared final class TaskPool { m_threadCount = thread_count; m_signal = createSharedManualEvent(); + m_state = createMonitor!State(new shared Mutex); with (m_state.lock) { queue.setup();