Improve the sync module.

- Change the API of Monitor to work without a callback
- Add ManualEvent.emitSingle
This commit is contained in:
Sönke Ludwig 2017-02-22 17:42:20 +01:00
parent 6f26766c70
commit f9372446b1

View file

@ -500,7 +500,7 @@ private void runMutexUnitTests(M)()
// basic contention test // basic contention test
runContendedTasks(false, false); runContendedTasks(false, false);
runTask({ auto t3 = runTask({
assert(t1.running && t2.running); assert(t1.running && t2.running);
assert(m.m_impl.m_locked); assert(m.m_impl.m_locked);
t1.join(); t1.join();
@ -513,11 +513,12 @@ private void runMutexUnitTests(M)()
exitEventLoop(); exitEventLoop();
}); });
runEventLoop(); runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked); assert(!m.m_impl.m_locked);
// interruption test #1 // interruption test #1
runContendedTasks(true, false); runContendedTasks(true, false);
runTask({ t3 = runTask({
assert(t1.running && t2.running); assert(t1.running && t2.running);
assert(m.m_impl.m_locked); assert(m.m_impl.m_locked);
t1.interrupt(); t1.interrupt();
@ -531,11 +532,12 @@ private void runMutexUnitTests(M)()
exitEventLoop(); exitEventLoop();
}); });
runEventLoop(); runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked); assert(!m.m_impl.m_locked);
// interruption test #2 // interruption test #2
runContendedTasks(false, true); runContendedTasks(false, true);
runTask({ t3 = runTask({
assert(t1.running && t2.running); assert(t1.running && t2.running);
assert(m.m_impl.m_locked); assert(m.m_impl.m_locked);
t2.interrupt(); t2.interrupt();
@ -549,6 +551,7 @@ private void runMutexUnitTests(M)()
exitEventLoop(); exitEventLoop();
}); });
runEventLoop(); runEventLoop();
assert(!t3.running);
assert(!m.m_impl.m_locked); assert(!m.m_impl.m_locked);
} }
@ -799,7 +802,7 @@ struct ManualEvent {
StackSList!ThreadLocalWaiter active; StackSList!ThreadLocalWaiter active;
StackSList!ThreadLocalWaiter free; StackSList!ThreadLocalWaiter free;
} }
Monitor!(Waiters, SpinLock) m_waiters; Monitor!(Waiters, shared(SpinLock)) m_waiters;
} }
// thread destructor in vibe.core.core will decrement the ref. count // thread destructor in vibe.core.core will decrement the ref. count
@ -830,17 +833,48 @@ struct ManualEvent {
ThreadLocalWaiter* lw; ThreadLocalWaiter* lw;
auto drv = eventDriver; auto drv = eventDriver;
m_waiters.lock((ref waiters) { m_waiters.lock.active.filter((ThreadLocalWaiter* w) {
waiters.active.filter((ThreadLocalWaiter* w) { () @trusted { logTrace("waiter %s", cast(void*)w); } ();
() @trusted { logTrace("waiter %s", cast(void*)w); } (); if (w.m_driver is drv) lw = w;
if (w.m_driver is drv) lw = w; else {
else { try {
try { assert(w.m_event != EventID.init);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
return true; return true;
}); });
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) lw.emit();
logTrace("emit shared done");
return ec;
}
/// Emits the signal, waking up at least one waiting task
int emitSingle()
shared nothrow @trusted {
import core.atomic : atomicOp, cas;
() @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
ThreadLocalWaiter* lw;
auto drv = eventDriver;
m_waiters.lock.active.iterate((ThreadLocalWaiter* w) {
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
if (w.unused) return true;
if (w.m_driver is drv) lw = w;
else {
try {
assert(w.m_event != EventID.invalid);
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
} catch (Exception e) assert(false, e.msg);
}
return false;
}); });
() @trusted { logTrace("lw %s", cast(void*)lw); } (); () @trusted { logTrace("lw %s", cast(void*)lw); } ();
if (lw) lw.emit(); if (lw) lw.emit();
@ -925,8 +959,8 @@ struct ManualEvent {
ThreadLocalWaiter* w; ThreadLocalWaiter* w;
auto drv = eventDriver; auto drv = eventDriver;
m_waiters.lock((ref waiters) { with (m_waiters.lock) {
waiters.active.filter((aw) { active.iterate((aw) {
if (aw.m_driver is drv) { if (aw.m_driver is drv) {
w = aw; w = aw;
} }
@ -934,7 +968,7 @@ struct ManualEvent {
}); });
if (!w) { if (!w) {
waiters.free.filter((fw) { free.filter((fw) {
if (fw.m_driver is drv) { if (fw.m_driver is drv) {
w = fw; w = fw;
return false; return false;
@ -955,17 +989,17 @@ struct ManualEvent {
} (); } ();
} }
waiters.active.add(w); active.add(w);
} }
}); }
scope (exit) { scope (exit) {
if (w.unused) { if (w.unused) {
m_waiters.lock((ref waiters) { with (m_waiters.lock) {
waiters.active.remove(w); active.remove(w);
waiters.free.add(w); free.add(w);
// TODO: cap size of m_freeWaiters // TODO: cap size of m_freeWaiters
}); }
} }
} }
@ -990,19 +1024,40 @@ unittest {
runEventLoop(); runEventLoop();
} }
private shared struct Monitor(T, M) package shared struct Monitor(T, M)
{ {
alias Mutex = M;
alias Data = T;
private { private {
shared T m_data; Mutex mutex;
shared M m_mutex; Data data;
} }
void lock(scope void delegate(ref T) @safe nothrow access) static struct Locked {
shared { shared(Monitor)* m;
m_mutex.lock(); @disable this(this);
scope (exit) m_mutex.unlock(); ~this() { () @trusted { (cast(Mutex)m.mutex).unlock(); } (); }
access(*() @trusted { return cast(T*)&m_data; } ()); ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
alias get this;
} }
Locked lock() {
() @trusted { (cast(Mutex)mutex).lock(); } ();
return Locked(() @trusted { return &this; } ());
}
const(Locked) lock() const {
() @trusted { (cast(Mutex)mutex).lock(); } ();
return const(Locked)(() @trusted { return &this; } ());
}
}
package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
@trusted {
shared(Monitor!(T, M)) ret;
ret.mutex = cast(shared)mutex;
return ret;
} }
package struct SpinLock { package struct SpinLock {
@ -1053,7 +1108,7 @@ private struct ThreadLocalWaiter {
ThreadLocalWaiter* next; ThreadLocalWaiter* next;
NativeEventDriver m_driver; NativeEventDriver m_driver;
EventID m_event; EventID m_event = EventID.invalid;
Waiter* m_waiters; Waiter* m_waiters;
} }
@ -1221,6 +1276,16 @@ private struct StackSList(T)
w = wnext; w = wnext;
} }
} }
void iterate(scope bool delegate(T* el) @safe nothrow del)
{
T* w = m_first;
while (w !is null) {
auto wnext = w.next;
if (!del(w)) break;
w = wnext;
}
}
} }
private struct TaskMutexImpl(bool INTERRUPTIBLE) { private struct TaskMutexImpl(bool INTERRUPTIBLE) {