Merge pull request #178 from vibe-d/drop_spinlock

Remove SpinLock and use core.sync.mutex.Mutex instead.
This commit is contained in:
Leonid Kramer 2019-09-26 10:37:37 +02:00 committed by GitHub
commit 09eb24c5f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 41 deletions

View file

@ -32,15 +32,17 @@ import std.traits : ReturnType;
/** Creates a new signal that can be shared between fibers. /** Creates a new signal that can be shared between fibers.
*/ */
LocalManualEvent createManualEvent() LocalManualEvent createManualEvent()
@safe { @safe nothrow {
LocalManualEvent ret; LocalManualEvent ret;
ret.initialize(); ret.initialize();
return ret; return ret;
} }
/// ditto /// ditto
shared(ManualEvent) createSharedManualEvent() shared(ManualEvent) createSharedManualEvent()
@trusted { @trusted nothrow {
return shared(ManualEvent).init; shared(ManualEvent) ret;
ret.initialize();
return ret;
} }
@ -800,7 +802,7 @@ struct LocalManualEvent {
package static EventID ms_threadEvent; package static EventID ms_threadEvent;
private void initialize() private void initialize()
{ nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe; import vibe.internal.allocator : Mallocator, makeGCSafe;
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
} }
@ -812,11 +814,13 @@ struct LocalManualEvent {
} }
~this() ~this()
{ nothrow {
import vibe.internal.allocator : Mallocator, disposeGCSafe; import vibe.internal.allocator : Mallocator, disposeGCSafe;
if (m_waiter) { if (m_waiter) {
if (!m_waiter.releaseRef()) if (!m_waiter.releaseRef()) {
static if (__VERSION__ < 2087) scope (failure) assert(false);
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
}
} }
} }
@ -994,7 +998,7 @@ struct ManualEvent {
StackSList!ThreadWaiter active; // actively waiting StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs StackSList!ThreadWaiter free; // free-list of reusable waiter structs
} }
Monitor!(Waiters, shared(SpinLock)) m_waiters; Monitor!(Waiters, shared(Mutex)) m_waiters;
} }
// thread destructor in vibe.core.core will decrement the ref. count // thread destructor in vibe.core.core will decrement the ref. count
@ -1007,6 +1011,11 @@ struct ManualEvent {
@disable this(this); @disable this(this);
private void initialize()
shared nothrow {
m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
}
deprecated("ManualEvent is always non-null!") deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; } bool opCast() const shared nothrow { return true; }
@ -1304,18 +1313,32 @@ package shared struct Monitor(T, M)
static struct Locked { static struct Locked {
shared(Monitor)* m; shared(Monitor)* m;
@disable this(this); @disable this(this);
~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); } ~this() {
() @trusted {
static if (is(typeof(Mutex.init.unlock_nothrow())))
(cast(Mutex)m.mutex).unlock_nothrow();
else (cast(Mutex)m.mutex).unlock();
} ();
}
ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
alias get this; alias get this;
} }
Locked lock() { Locked lock() {
() @trusted { (cast(Mutex)mutex).lock(); } (); () @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return Locked(() @trusted { return &this; } ()); return Locked(() @trusted { return &this; } ());
} }
const(Locked) lock() const { const(Locked) lock() const {
() @trusted { (cast(Mutex)mutex).lock(); } (); () @trusted {
static if (is(typeof(Mutex.init.lock_nothrow())))
(cast(Mutex)mutex).lock_nothrow();
else (cast(Mutex)mutex).lock();
} ();
return const(Locked)(() @trusted { return &this; } ()); return const(Locked)(() @trusted { return &this; } ());
} }
} }
@ -1328,35 +1351,6 @@ package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
return ret; return ret;
} }
package struct SpinLock {
private shared int locked;
debug static int threadID;
@safe nothrow @nogc shared:
bool tryLock()
@trusted {
debug {
import core.thread : Thread;
if (threadID == 0) threadID = cast(int)cast(void*)Thread.getThis();
if (threadID == 0) threadID = -1; // workaround for non-D threads
assert(atomicLoad(locked) != threadID, "Recursive lock attempt.");
int tid = threadID;
} else int tid = 1;
return cas(&locked, 0, tid);
}
void lock()
{
while (!tryLock()) {}
}
void unlock()
@trusted {
debug assert(atomicLoad(locked) == threadID, "Unlocking spin lock that is not owned by the current thread.");
atomicStore(locked, 0);
}
}
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList; import vibe.internal.list : CircularDList;
@ -1535,6 +1529,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_signal.initialize();
} }
@trusted bool tryLock() @trusted bool tryLock()
@ -1589,6 +1584,7 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
void setup() void setup()
{ {
m_mutex = new core.sync.mutex.Mutex; m_mutex = new core.sync.mutex.Mutex;
m_signal.initialize();
} }
@trusted bool tryLock() @trusted bool tryLock()
@ -1666,6 +1662,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
m_mutex = mtx; m_mutex = mtx;
static if (is(typeof(m_taskMutex))) static if (is(typeof(m_taskMutex)))
m_taskMutex = cast(TaskMutex)mtx; m_taskMutex = cast(TaskMutex)mtx;
m_signal.initialize();
} }
@property LOCKABLE mutex() { return m_mutex; } @property LOCKABLE mutex() { return m_mutex; }

View file

@ -342,6 +342,7 @@ final package class TaskFiber : Fiber {
this() this()
@trusted nothrow { @trusted nothrow {
super(&run, ms_taskStackSize); super(&run, ms_taskStackSize);
m_onExit = createSharedManualEvent();
m_thread = Thread.getThis(); m_thread = Thread.getThis();
} }

View file

@ -10,7 +10,7 @@ module vibe.core.taskpool;
import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.concurrency : isWeaklyIsolated;
import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal; import vibe.core.core : exitEventLoop, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
import vibe.core.log; import vibe.core.log;
import vibe.core.sync : ManualEvent, Monitor, SpinLock, createSharedManualEvent, createMonitor; import vibe.core.sync : ManualEvent, Monitor, createSharedManualEvent, createMonitor;
import vibe.core.task : Task, TaskFuncInfo, callWithMove; import vibe.core.task : Task, TaskFuncInfo, callWithMove;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
import core.thread : Thread; import core.thread : Thread;
@ -27,7 +27,7 @@ shared final class TaskPool {
TaskQueue queue; TaskQueue queue;
bool term; bool term;
} }
vibe.core.sync.Monitor!(State, shared(SpinLock)) m_state; vibe.core.sync.Monitor!(State, shared(Mutex)) m_state;
shared(ManualEvent) m_signal; shared(ManualEvent) m_signal;
immutable size_t m_threadCount; immutable size_t m_threadCount;
} }
@ -43,6 +43,7 @@ shared final class TaskPool {
m_threadCount = thread_count; m_threadCount = thread_count;
m_signal = createSharedManualEvent(); m_signal = createSharedManualEvent();
m_state = createMonitor!State(new shared Mutex);
with (m_state.lock) { with (m_state.lock) {
queue.setup(); queue.setup();