diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 22fdeb6..9f164a3 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -658,6 +658,8 @@ struct ManualEvent { import core.thread : Thread; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; + @safe: + private { static struct ThreadWaiter { ThreadWaiter* next; @@ -700,11 +702,11 @@ struct ManualEvent { /// A counter that is increased with every emit() call int emitCount() const nothrow { return m_emitCount; } /// ditto - int emitCount() const shared nothrow { return atomicLoad(m_emitCount); } + int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } /// Emits the signal, waking up all owners of the signal. int emit(EmitMode mode = EmitMode.all) - shared nothrow { + shared nothrow @trusted { import core.atomic : atomicOp, cas; logTrace("emit shared"); @@ -821,7 +823,7 @@ struct ManualEvent { while (ec <= emit_count) { ThreadWaiter w; LocalWaiter tw; - acquireWaiter(w, tw); + () @trusted { acquireWaiter(&w, &tw); } (); Waitable!( cb => tw.wait(cb), @@ -839,8 +841,8 @@ struct ManualEvent { while (ec <= emit_count) { shared(ThreadWaiter) w; LocalWaiter tw; - acquireWaiter(w, tw); - logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); + () @trusted { acquireWaiter(&w, &tw); } (); + () @trusted { logDebugV("Acquired waiter %s %s -> %s", cast(void*)m_waiters, cast(void*)&w, cast(void*)w.next); } (); if (tw.next) { // if we are not the first waiter for this thread, @@ -867,14 +869,16 @@ struct ManualEvent { asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter); if (!eventwaiter.cancelled) { - if (atomicLoad(w.next) == null) - emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode + if (() @trusted { return atomicLoad(w.next); } () is null) + emitForThisThread(() @trusted { return cast(LocalWaiter*)w.tasks.m_first; } (), EmitMode.all); // FIXME: use proper emit mode else goto again; } else if (localwaiter.cancelled) break; // timeout } - assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w, - "Waiter did not get removed from waiter queue."); + () @trusted { + assert(atomicLoad(w.next) is null && atomicLoad(m_waiters) !is &w, + "Waiter did not get removed from waiter queue."); + }(); ec = this.emitCount; } @@ -907,48 +911,48 @@ struct ManualEvent { return true; } - private void acquireWaiter(ref ThreadWaiter w, ref LocalWaiter tw) + private void acquireWaiter(ThreadWaiter* w, LocalWaiter* tw) nothrow { // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling tw.task = Task.getThis(); if (m_waiters) { - m_waiters.tasks.add(&tw); + m_waiters.tasks.add(tw); } else { - m_waiters = &w; + m_waiters = w; } } - private void acquireWaiter(ref shared(ThreadWaiter) w, ref LocalWaiter tw) + private void acquireWaiter(shared(ThreadWaiter)* w, LocalWaiter* tw) nothrow shared { tw.task = Task.getThis(); if (ms_threadEvent == EventID.init) ms_threadEvent = eventDriver.events.create(); - auto sdriver = cast(shared)eventDriver; + auto sdriver = () @trusted { return cast(shared)eventDriver; } (); - shared(ThreadWaiter)* pw = atomicLoad(m_waiters); - assert(pw !is &w, "Waiter is already registered!"); + shared(ThreadWaiter)* pw = () @trusted { return atomicLoad(m_waiters); } (); + assert(pw !is w, "Waiter is already registered!"); while (pw !is null) { if (pw.driver is sdriver) break; - pw = atomicLoad(pw.next); + pw = () @trusted { return atomicLoad(pw.next); } (); } if (!pw) { - pw = &w; + pw = w; shared(ThreadWaiter)* wn; do { - wn = atomicLoad(m_waiters); + wn = () @trusted { return atomicLoad(m_waiters); } (); w.next = wn; w.event = ms_threadEvent; w.driver = sdriver; - w.thread = cast(shared)Thread.getThis(); - } while (!cas(&m_waiters, wn, &w)); + w.thread = () @trusted { return cast(shared)Thread.getThis(); } (); + } while (!() @trusted { return cas(&m_waiters, wn, w); } ()); } - (cast(ThreadWaiter*)pw).tasks.add(&tw); + () @trusted { return cast(ThreadWaiter*)pw; } ().tasks.add(tw); } }