Remove SpinLock and use core.sync.mutex.Mutex instead.

The places that used it showed too much contention in load tests, so that the spin lock was more expensive that the regular OS mutex/futex.
This commit is contained in:
Sönke Ludwig 2019-09-17 14:35:52 +02:00
parent 591ab4a944
commit 5640516ba8
3 changed files with 40 additions and 41 deletions

View file

@ -32,15 +32,17 @@ import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers. /** Creates a new signal that can be shared between fibers.
*/ */
LocalManualEvent createManualEvent() LocalManualEvent createManualEvent()
@safe { @safe nothrow {
LocalManualEvent ret; LocalManualEvent ret;
ret.initialize(); ret.initialize();
return ret; return ret;
} }
/// ditto /// ditto
shared(ManualEvent) createSharedManualEvent() shared(ManualEvent) createSharedManualEvent()
@trusted { @trusted nothrow {
return shared(ManualEvent).init; shared(ManualEvent) ret;
ret.initialize();
return ret;
} }
@ -800,7 +802,7 @@ struct LocalManualEvent {
package static EventID ms_threadEvent; package static EventID ms_threadEvent;
private void initialize() private void initialize()
{ nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe; import vibe.internal.allocator : Mallocator, makeGCSafe;
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
} }
@ -812,13 +814,15 @@ struct LocalManualEvent {
} }
~this() ~this()
{ nothrow {
import vibe.internal.allocator : Mallocator, disposeGCSafe; import vibe.internal.allocator : Mallocator, disposeGCSafe;
if (m_waiter) { if (m_waiter) {
if (!m_waiter.releaseRef()) if (!m_waiter.releaseRef()) {
static if (__VERSION__ < 2087) scope (failure) assert(false);
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
} }
} }
}
bool opCast() const nothrow { return m_waiter !is null; } bool opCast() const nothrow { return m_waiter !is null; }
@ -994,7 +998,7 @@ struct ManualEvent {
StackSList!ThreadWaiter active; // actively waiting StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs StackSList!ThreadWaiter free; // free-list of reusable waiter structs
} }
Monitor!(Waiters, shared(SpinLock)) m_waiters; Monitor!(Waiters, shared(Mutex)) m_waiters;
} }
// thread destructor in vibe.core.core will decrement the ref. count // thread destructor in vibe.core.core will decrement the ref. count
@ -1007,6 +1011,11 @@ struct ManualEvent {
@disable this(this); @disable this(this);
private void initialize()
shared nothrow {
m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
}
deprecated("ManualEvent is always non-null!") deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; } bool opCast() const shared nothrow { return true; }
@ -1304,18 +1313,32 @@ package shared struct Monitor(T, M)
static struct Locked { static struct Locked {
shared(Monitor)* m; shared(Monitor)* m;
@disable this(this); @disable this(this);
~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); } ~this() {
() @trusted {
static if (is(typeof(Mutex.init.unlock_nothrow())))
(cast(Mutex)m.mutex).unlock_nothrow();
else (cast(Mutex)m.mutex).unlock();
} ();
}
ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
alias get this; alias get this;
} }
Locked lock() { Locked lock() {
() @trusted { (cast(Mutex)mutex).lock(); } (); () @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return Locked(() @trusted { return &this; } ()); return Locked(() @trusted { return &this; } ());
} }
const(Locked) lock() const { const(Locked) lock() const {
() @trusted { (cast(Mutex)mutex).lock(); } (); () @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return const(Locked)(() @trusted { return &this; } ()); return const(Locked)(() @trusted { return &this; } ());
} }
} }
@ -1328,35 +1351,6 @@ package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
return ret; return ret;
} }
package struct SpinLock {
private shared int locked;
debug static int threadID;
@safe nothrow @nogc shared:
bool tryLock()
@trusted {
debug {
import core.thread : Thread;
if (threadID == 0) threadID = cast(int)cast(void*)Thread.getThis();
if (threadID == 0) threadID = -1; // workaround for non-D threads
assert(atomicLoad(locked) != threadID, "Recursive lock attempt.");
int tid = threadID;
} else int tid = 1;
return cas(&locked, 0, tid);
}
void lock()
{
while (!tryLock()) {}
}
void unlock()
@trusted {
debug assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread.");
atomicStore(locked, 0);
}
}
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList; import vibe.internal.list : CircularDList;
@ -1535,6 +1529,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_signal.initialize();
} }
@trusted bool tryLock() @trusted bool tryLock()
@ -1589,6 +1584,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_mutex = new core.sync.mutex.Mutex; m_mutex = new core.sync.mutex.Mutex;
m_signal.initialize();
} }
@trusted bool tryLock() @trusted bool tryLock()
@ -1666,6 +1662,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
m_mutex = mtx; m_mutex = mtx;
static if (is(typeof(m_taskMutex))) static if (is(typeof(m_taskMutex)))
m_taskMutex = cast(TaskMutex)mtx; m_taskMutex = cast(TaskMutex)mtx;
m_signal.initialize();
} }
@property LOCKABLE mutex() { return m_mutex; } @property LOCKABLE mutex() { return m_mutex; }

View file

@ -342,6 +342,7 @@ final package class TaskFiber : Fiber {
this() this()
@trusted nothrow { @trusted nothrow {
super(&run, ms_taskStackSize); super(&run, ms_taskStackSize);
m_onExit = createSharedManualEvent();
m_thread = Thread.getThis(); m_thread = Thread.getThis();
} }

View file

@ -10,7 +10,7 @@ module vibe.core.taskpool;
import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.concurrency : isWeaklyIsolated;
import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
import vibe.core.log; import vibe.core.log;
import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor; import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor;
import vibe.core.task : Task, TaskFuncInfo, callWithMove; import vibe.core.task : Task, TaskFuncInfo, callWithMove;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
import core.thread : Thread; import core.thread : Thread;
@ -27,7 +27,7 @@ shared final class TaskPool {
TaskQueue queue; TaskQueue queue;
bool term; bool term;
} }
vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state; vibe.core.sync.Monitor!(State, shared(Mutex)) m_state;
shared(ManualEvent) m_signal; shared(ManualEvent) m_signal;
immutable size_t m_threadCount; immutable size_t m_threadCount;
} }
@ -43,6 +43,7 @@ shared final class TaskPool {
m_threadCount = thread_count; m_threadCount = thread_count;
m_signal = createSharedManualEvent(); m_signal = createSharedManualEvent();
m_state = createMonitor!State(new shared Mutex);
with (m_state.lock) { with (m_state.lock) {
queue.setup(); queue.setup();