Mark ManualEvent's interface as safe.

This commit is contained in:
Sönke Ludwig 2016-10-25 08:55:48 +02:00
parent 4db9b3f100
commit 1eb06c4b1a

View file

@ -658,6 +658,8 @@ struct ManualEvent {
import core.thread : Thread; import core.thread : Thread;
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
@safe:
private { private {
static struct ThreadWaiter { static struct ThreadWaiter {
ThreadWaiter* next; ThreadWaiter* next;
@ -700,11 +702,11 @@ struct ManualEvent {
/// A counter that is increased with every emit() call /// A counter that is increased with every emit() call
int emitCount() const nothrow { return m_emitCount; } int emitCount() const nothrow { return m_emitCount; }
/// ditto /// ditto
int emitCount() const shared nothrow { return atomicLoad(m_emitCount); } int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal. /// Emits the signal, waking up all owners of the signal.
int emit(EmitMode mode = EmitMode.all) int emit(EmitMode mode = EmitMode.all)
shared nothrow { shared nothrow @trusted {
import core.atomic : atomicOp, cas; import core.atomic : atomicOp, cas;
logTrace("emit shared"); logTrace("emit shared");
@ -821,7 +823,7 @@ struct ManualEvent {
while (ec <= emit_count) { while (ec <= emit_count) {
ThreadWaiter w; ThreadWaiter w;
LocalWaiter tw; LocalWaiter tw;
acquireWaiter(w, tw); () @trusted { acquireWaiter(&w, &tw); } ();
Waitable!( Waitable!(
cb => tw.wait(cb), cb => tw.wait(cb),
@ -839,8 +841,8 @@ struct ManualEvent {
while (ec <= emit_count) { while (ec <= emit_count) {
shared(ThreadWaiter) w; shared(ThreadWaiter) w;
LocalWaiter tw; LocalWaiter tw;
acquireWaiter(w, tw); () @trusted { acquireWaiter(&w, &tw); } ();
logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); () @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } ();
if (tw.next) { if (tw.next) {
// if we are not the first waiter for this thread, // if we are not the first waiter for this thread,
@ -867,14 +869,16 @@ struct ManualEvent {
asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter); asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter);
if (!eventwaiter.cancelled) { if (!eventwaiter.cancelled) {
if (atomicLoad(w.next) == null) if (() @trusted { return atomicLoad(w.next); } () is null)
emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode emitForThisThread(() @trusted { return cast(LocalWaiter*)w.tasks.m_first; } (), EmitMode.all); // FIXME: use proper emit mode
else goto again; else goto again;
} else if (localwaiter.cancelled) break; // timeout } else if (localwaiter.cancelled) break; // timeout
} }
() @trusted {
assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w, assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w,
"Waiter did not get removed from waiter queue."); "Waiter did not get removed from waiter queue.");
}();
ec = this.emitCount; ec = this.emitCount;
} }
@ -907,48 +911,48 @@ struct ManualEvent {
return true; return true;
} }
private void acquireWaiter(ref ThreadWaiter w, ref LocalWaiter tw) private void acquireWaiter(ThreadWaiter* w, LocalWaiter* tw)
nothrow { nothrow {
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling
tw.task = Task.getThis(); tw.task = Task.getThis();
if (m_waiters) { if (m_waiters) {
m_waiters.tasks.add(&tw); m_waiters.tasks.add(tw);
} else { } else {
m_waiters = &w; m_waiters = w;
} }
} }
private void acquireWaiter(ref shared(ThreadWaiter) w, ref LocalWaiter tw) private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* tw)
nothrow shared { nothrow shared {
tw.task = Task.getThis(); tw.task = Task.getThis();
if (ms_threadEvent == EventID.init) if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.events.create(); ms_threadEvent = eventDriver.events.create();
auto sdriver = cast(shared)eventDriver; auto sdriver = () @trusted { return cast(shared)eventDriver; } ();
shared(ThreadWaiter)* pw = atomicLoad(m_waiters); shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } ();
assert(pw !is &w, "Waiter is already registered!"); assert(pw !is w, "Waiter is already registered!");
while (pw !is null) { while (pw !is null) {
if (pw.driver is sdriver) if (pw.driver is sdriver)
break; break;
pw = atomicLoad(pw.next); pw = () @trusted { return atomicLoad(pw.next); } ();
} }
if (!pw) { if (!pw) {
pw = &w; pw = w;
shared(ThreadWaiter)* wn; shared(ThreadWaiter)* wn;
do { do {
wn = atomicLoad(m_waiters); wn = () @trusted { return atomicLoad(m_waiters); } ();
w.next = wn; w.next = wn;
w.event = ms_threadEvent; w.event = ms_threadEvent;
w.driver = sdriver; w.driver = sdriver;
w.thread = cast(shared)Thread.getThis(); w.thread = () @trusted { return cast(shared)Thread.getThis(); } ();
} while (!cas(&m_waiters, wn, &w)); } while (!() @trusted { return cas(&m_waiters, wn, w); } ());
} }
(cast(ThreadWaiter*)pw).tasks.add(&tw); () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(tw);
} }
} }