Fix multiple issues in (Local)ManualEvent.
- Copying LocalManualEvent now works correctly, using reference counting - ManualEvent correctly pins the reference to the thread-local waiter until it has finished emitting it - ThreadLocalWaiter uses a doubly-linked list to manage task waiters (more efficient deletion, FIFO trigger order) - Fixed a bug in ThreadLocalWaiter.emit() where the head element of the iterated list might already have stopped waiting, resulting in an invocation of a dangling TaskWaiter pointer
This commit is contained in:
parent
19db7732e6
commit
a4b36f08d3
|
@ -23,7 +23,9 @@ import std.traits : ReturnType;
|
|||
*/
|
||||
LocalManualEvent createManualEvent()
|
||||
@safe {
|
||||
return LocalManualEvent.init;
|
||||
LocalManualEvent ret;
|
||||
ret.initialize();
|
||||
return ret;
|
||||
}
|
||||
/// ditto
|
||||
shared(ManualEvent) createSharedManualEvent()
|
||||
|
@ -319,7 +321,6 @@ unittest {
|
|||
}
|
||||
}
|
||||
|
||||
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
||||
unittest { // test deferred throwing
|
||||
import vibe.core.core;
|
||||
|
||||
|
@ -357,7 +358,6 @@ unittest { // test deferred throwing
|
|||
runEventLoop();
|
||||
}
|
||||
|
||||
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
||||
unittest {
|
||||
runMutexUnitTests!TaskMutex();
|
||||
}
|
||||
|
@ -679,28 +679,45 @@ struct LocalManualEvent {
|
|||
@safe:
|
||||
|
||||
private {
|
||||
int m_emitCount;
|
||||
ThreadLocalWaiter m_waiter;
|
||||
alias Waiter = ThreadLocalWaiter!false;
|
||||
|
||||
Waiter m_waiter;
|
||||
}
|
||||
|
||||
// thread destructor in vibe.core.core will decrement the ref. count
|
||||
package static EventID ms_threadEvent;
|
||||
|
||||
//@disable this(this); // FIXME: commenting this out this is not a good idea...
|
||||
private void initialize()
|
||||
{
|
||||
import vibe.internal.allocator : Mallocator, makeGCSafe;
|
||||
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
|
||||
}
|
||||
|
||||
deprecated("LocalManualEvent is always non-null!")
|
||||
bool opCast() const nothrow { return true; }
|
||||
this(this)
|
||||
{
|
||||
if (m_waiter)
|
||||
return m_waiter.addRef();
|
||||
}
|
||||
|
||||
~this()
|
||||
{
|
||||
import vibe.internal.allocator : Mallocator, disposeGCSafe;
|
||||
if (m_waiter) {
|
||||
if (!m_waiter.releaseRef())
|
||||
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
|
||||
}
|
||||
}
|
||||
|
||||
bool opCast() const nothrow { return m_waiter !is null; }
|
||||
|
||||
/// A counter that is increased with every emit() call
|
||||
int emitCount() const nothrow { return m_emitCount; }
|
||||
/// ditto
|
||||
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
|
||||
int emitCount() const nothrow { return m_waiter.m_emitCount; }
|
||||
|
||||
/// Emits the signal, waking up all owners of the signal.
|
||||
int emit()
|
||||
nothrow {
|
||||
logTrace("unshared emit");
|
||||
auto ec = m_emitCount++;
|
||||
auto ec = m_waiter.m_emitCount++;
|
||||
m_waiter.emit();
|
||||
return ec;
|
||||
}
|
||||
|
@ -709,7 +726,7 @@ struct LocalManualEvent {
|
|||
int emitSingle()
|
||||
nothrow {
|
||||
logTrace("unshared single emit");
|
||||
auto ec = m_emitCount++;
|
||||
auto ec = m_waiter.m_emitCount++;
|
||||
m_waiter.emitSingle();
|
||||
return ec;
|
||||
}
|
||||
|
@ -758,14 +775,14 @@ struct LocalManualEvent {
|
|||
target_timeout = now + timeout;
|
||||
}
|
||||
|
||||
while (m_emitCount <= emit_count) {
|
||||
while (m_waiter.m_emitCount <= emit_count) {
|
||||
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
|
||||
try now = Clock.currTime(UTC());
|
||||
catch (Exception e) { assert(false, e.msg); }
|
||||
if (now >= target_timeout) break;
|
||||
}
|
||||
|
||||
return m_emitCount;
|
||||
return m_waiter.m_emitCount;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -793,14 +810,17 @@ unittest {
|
|||
struct ManualEvent {
|
||||
import core.thread : Thread;
|
||||
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
|
||||
import vibe.internal.list : StackSList;
|
||||
|
||||
@safe:
|
||||
|
||||
private {
|
||||
alias ThreadWaiter = ThreadLocalWaiter!true;
|
||||
|
||||
int m_emitCount;
|
||||
static struct Waiters {
|
||||
StackSList!ThreadLocalWaiter active; // actively waiting
|
||||
StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs
|
||||
StackSList!ThreadWaiter active; // actively waiting
|
||||
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
|
||||
}
|
||||
Monitor!(Waiters, shared(SpinLock)) m_waiters;
|
||||
}
|
||||
|
@ -813,7 +833,7 @@ struct ManualEvent {
|
|||
all
|
||||
}
|
||||
|
||||
@disable this(this); // FIXME: commenting this out this is not a good idea...
|
||||
@disable this(this);
|
||||
|
||||
deprecated("ManualEvent is always non-null!")
|
||||
bool opCast() const shared nothrow { return true; }
|
||||
|
@ -831,12 +851,14 @@ struct ManualEvent {
|
|||
auto ec = atomicOp!"+="(m_emitCount, 1);
|
||||
auto thisthr = Thread.getThis();
|
||||
|
||||
ThreadLocalWaiter* lw;
|
||||
ThreadWaiter lw;
|
||||
auto drv = eventDriver;
|
||||
m_waiters.lock.active.filter((ThreadLocalWaiter* w) {
|
||||
m_waiters.lock.active.filter((ThreadWaiter w) {
|
||||
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
||||
if (w.m_driver is drv) lw = w;
|
||||
else {
|
||||
if (w.m_driver is drv) {
|
||||
lw = w;
|
||||
lw.addRef();
|
||||
} else {
|
||||
try {
|
||||
assert(w.m_event != EventID.init);
|
||||
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
||||
|
@ -845,7 +867,10 @@ struct ManualEvent {
|
|||
return true;
|
||||
});
|
||||
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
||||
if (lw) lw.emit();
|
||||
if (lw) {
|
||||
lw.emit();
|
||||
releaseWaiter(lw);
|
||||
}
|
||||
|
||||
logTrace("emit shared done");
|
||||
|
||||
|
@ -862,13 +887,15 @@ struct ManualEvent {
|
|||
auto ec = atomicOp!"+="(m_emitCount, 1);
|
||||
auto thisthr = Thread.getThis();
|
||||
|
||||
ThreadLocalWaiter* lw;
|
||||
ThreadWaiter lw;
|
||||
auto drv = eventDriver;
|
||||
m_waiters.lock.active.iterate((ThreadLocalWaiter* w) {
|
||||
m_waiters.lock.active.iterate((ThreadWaiter w) {
|
||||
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
||||
if (w.unused) return true;
|
||||
if (w.m_driver is drv) lw = w;
|
||||
else {
|
||||
if (w.m_driver is drv) {
|
||||
lw = w;
|
||||
lw.addRef();
|
||||
} else {
|
||||
try {
|
||||
assert(w.m_event != EventID.invalid);
|
||||
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
||||
|
@ -877,7 +904,10 @@ struct ManualEvent {
|
|||
return false;
|
||||
});
|
||||
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
||||
if (lw) lw.emit();
|
||||
if (lw) {
|
||||
lw.emitSingle();
|
||||
releaseWaiter(lw);
|
||||
}
|
||||
|
||||
logTrace("emit shared done");
|
||||
|
||||
|
@ -937,7 +967,7 @@ struct ManualEvent {
|
|||
|
||||
int ec = this.emitCount;
|
||||
|
||||
acquireThreadWaiter((ref ThreadLocalWaiter w) {
|
||||
acquireThreadWaiter((scope ThreadWaiter w) {
|
||||
while (ec <= emit_count) {
|
||||
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count);
|
||||
ec = this.emitCount;
|
||||
|
@ -955,10 +985,9 @@ struct ManualEvent {
|
|||
|
||||
private void acquireThreadWaiter(DEL)(scope DEL del)
|
||||
shared {
|
||||
import vibe.internal.allocator : processAllocator, make;
|
||||
import core.memory : GC;
|
||||
import vibe.internal.allocator : processAllocator, makeGCSafe;
|
||||
|
||||
ThreadLocalWaiter* w;
|
||||
ThreadWaiter w;
|
||||
auto drv = eventDriver;
|
||||
|
||||
with (m_waiters.lock) {
|
||||
|
@ -984,34 +1013,37 @@ struct ManualEvent {
|
|||
if (!w) {
|
||||
() @trusted {
|
||||
try {
|
||||
w = processAllocator.make!ThreadLocalWaiter;
|
||||
w = processAllocator.makeGCSafe!ThreadWaiter;
|
||||
w.m_driver = drv;
|
||||
w.m_event = ms_threadEvent;
|
||||
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
|
||||
} catch (Exception e) {
|
||||
assert(false, "Failed to allocate thread waiter.");
|
||||
}
|
||||
} ();
|
||||
}
|
||||
|
||||
assert(w.m_refCount == 1);
|
||||
active.add(w);
|
||||
}
|
||||
}
|
||||
|
||||
scope (exit) {
|
||||
if (!w.releaseRef()) {
|
||||
assert(w.m_driver is drv);
|
||||
assert(w.unused);
|
||||
with (m_waiters.lock) {
|
||||
auto rmvd = active.remove(w);
|
||||
assert(rmvd);
|
||||
free.add(w);
|
||||
// TODO: cap size of m_freeWaiters
|
||||
}
|
||||
scope (exit) releaseWaiter(w);
|
||||
|
||||
del(w);
|
||||
}
|
||||
|
||||
private void releaseWaiter(ThreadWaiter w)
|
||||
shared nothrow {
|
||||
if (!w.releaseRef()) {
|
||||
assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
|
||||
assert(w.unused, "Waiter still used, but not referenced!?");
|
||||
with (m_waiters.lock) {
|
||||
auto rmvd = active.remove(w);
|
||||
assert(rmvd, "Waiter not in active queue anymore!?");
|
||||
free.add(w);
|
||||
// TODO: cap size of m_freeWaiters
|
||||
}
|
||||
}
|
||||
|
||||
del(*w);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1155,10 +1187,12 @@ package struct SpinLock {
|
|||
}
|
||||
}
|
||||
|
||||
private struct ThreadLocalWaiter {
|
||||
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
|
||||
import vibe.internal.list : CircularDList;
|
||||
|
||||
private {
|
||||
static struct Waiter {
|
||||
Waiter* next;
|
||||
static struct TaskWaiter {
|
||||
TaskWaiter* prev, next;
|
||||
Task task;
|
||||
void delegate() @safe nothrow notifier;
|
||||
bool cancelled;
|
||||
|
@ -1170,50 +1204,52 @@ private struct ThreadLocalWaiter {
|
|||
void cancel() @safe nothrow { cancelled = true; notifier = null; }
|
||||
}
|
||||
|
||||
ThreadLocalWaiter* next;
|
||||
NativeEventDriver m_driver;
|
||||
EventID m_event = EventID.invalid;
|
||||
StackSList!Waiter m_waiters;
|
||||
static if (EVENT_TRIGGERED) {
|
||||
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
|
||||
NativeEventDriver m_driver;
|
||||
EventID m_event = EventID.invalid;
|
||||
} else {
|
||||
int m_emitCount = 0;
|
||||
}
|
||||
int m_refCount = 1;
|
||||
TaskWaiter m_pivot;
|
||||
TaskWaiter m_emitPivot;
|
||||
CircularDList!(TaskWaiter*) m_waiters;
|
||||
}
|
||||
|
||||
this(this)
|
||||
@safe nothrow {
|
||||
if (m_event != EventID.invalid)
|
||||
m_driver.events.addRef(m_event);
|
||||
this()
|
||||
{
|
||||
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
|
||||
}
|
||||
|
||||
~this()
|
||||
@safe nothrow {
|
||||
if (m_event != EventID.invalid)
|
||||
m_driver.events.releaseRef(m_event);
|
||||
static if (EVENT_TRIGGERED) {
|
||||
~this()
|
||||
{
|
||||
if (m_event != EventID.invalid)
|
||||
eventDriver.events.releaseRef(m_event);
|
||||
}
|
||||
}
|
||||
|
||||
@property bool unused() const @safe nothrow { return m_waiters.empty; }
|
||||
|
||||
void addRef() @safe nothrow { m_refCount++; }
|
||||
bool releaseRef() @safe nothrow { return --m_refCount > 0; }
|
||||
void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
|
||||
bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
|
||||
|
||||
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
|
||||
@safe {
|
||||
import std.datetime : SysTime, Clock, UTC;
|
||||
import vibe.internal.async : Waitable, asyncAwaitAny;
|
||||
|
||||
Waiter w;
|
||||
Waiter* pw = () @trusted { return &w; } ();
|
||||
TaskWaiter waiter_store;
|
||||
TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
|
||||
|
||||
|
||||
debug void assertWaiterNotInQueue()
|
||||
{
|
||||
m_waiters.iterate((w) {
|
||||
assert(w !is pw, "Thread local waiter already in queue!?");
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
m_waiters.add(pw);
|
||||
|
||||
scope (failure) m_waiters.remove(pw);
|
||||
m_waiters.insertBack(waiter);
|
||||
assert(waiter.next !is null);
|
||||
scope (exit)
|
||||
if (waiter.next !is null) {
|
||||
m_waiters.remove(waiter);
|
||||
assert(!waiter.next);
|
||||
}
|
||||
|
||||
SysTime target_timeout, now;
|
||||
if (timeout != Duration.max) {
|
||||
|
@ -1222,16 +1258,16 @@ private struct ThreadLocalWaiter {
|
|||
target_timeout = now + timeout;
|
||||
}
|
||||
|
||||
Waitable!(typeof(Waiter.notifier),
|
||||
cb => w.wait(cb),
|
||||
cb => w.cancel(),
|
||||
Waitable!(typeof(TaskWaiter.notifier),
|
||||
cb => waiter.wait(cb),
|
||||
cb => waiter.cancel(),
|
||||
) waitable;
|
||||
|
||||
if (evt != EventID.invalid) {
|
||||
Waitable!(EventCallback,
|
||||
(cb) {
|
||||
eventDriver.events.wait(evt, cb);
|
||||
// check for exit codition *after* starting to wait for the event
|
||||
// check for exit condition *after* starting to wait for the event
|
||||
// to avoid a race condition
|
||||
if (exit_condition()) {
|
||||
eventDriver.events.cancelWait(evt, cb);
|
||||
|
@ -1246,111 +1282,57 @@ private struct ThreadLocalWaiter {
|
|||
}
|
||||
|
||||
if (waitable.cancelled) {
|
||||
m_waiters.remove(pw);
|
||||
assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
|
||||
return false;
|
||||
} else debug assertWaiterNotInQueue();
|
||||
|
||||
return true;
|
||||
} else {
|
||||
assert(waiter.next is null, "Triggered waiter still in queue!?");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool emit()
|
||||
void emit()
|
||||
@safe nothrow {
|
||||
import std.algorithm.mutation : swap;
|
||||
import vibe.core.core : yield;
|
||||
|
||||
if (m_waiters.empty) return false;
|
||||
if (m_waiters.empty) return;
|
||||
|
||||
StackSList!Waiter waiters;
|
||||
swap(m_waiters, waiters);
|
||||
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
|
||||
|
||||
// FIXME: during iteration, waiters could remove themselves, but the head element will always stay in the list!
|
||||
waiters.iterate((w) {
|
||||
if (w.notifier !is null) {
|
||||
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
|
||||
w.notifier();
|
||||
w.notifier = null;
|
||||
} else logTrace("notify callback is null");
|
||||
return true;
|
||||
});
|
||||
if (pivot.next) { // another emit in progress?
|
||||
// shift pivot to the end, so that the other emit call will process all pending waiters
|
||||
if (pivot !is m_waiters.back) {
|
||||
m_waiters.remove(pivot);
|
||||
m_waiters.insertBack(pivot);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
return true;
|
||||
m_waiters.insertBack(pivot);
|
||||
scope (exit) m_waiters.remove(pivot);
|
||||
|
||||
foreach (w; m_waiters) {
|
||||
if (w is pivot) break;
|
||||
emitWaiter(w);
|
||||
}
|
||||
}
|
||||
|
||||
bool emitSingle()
|
||||
@safe nothrow {
|
||||
if (m_waiters.empty) return false;
|
||||
emitWaiter(m_waiters.front);
|
||||
return true;
|
||||
}
|
||||
|
||||
auto w = m_waiters.first;
|
||||
private void emitWaiter(TaskWaiter* w)
|
||||
@safe nothrow {
|
||||
m_waiters.remove(w);
|
||||
|
||||
if (w.notifier !is null) {
|
||||
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
|
||||
w.notifier();
|
||||
w.notifier = null;
|
||||
} else logTrace("notify callback is null");
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private struct StackSList(T)
|
||||
{
|
||||
@safe nothrow:
|
||||
|
||||
import core.atomic : cas;
|
||||
|
||||
private T* m_first;
|
||||
|
||||
@property T* first() { return m_first; }
|
||||
@property shared(T)* first() shared @trusted { return atomicLoad(m_first); }
|
||||
|
||||
bool empty() const { return m_first is null; }
|
||||
|
||||
void add(T* elem)
|
||||
{
|
||||
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
|
||||
elem.next = m_first;
|
||||
m_first = elem;
|
||||
}
|
||||
|
||||
bool remove(T* elem)
|
||||
{
|
||||
debug uint counter = 0;
|
||||
T* w = m_first, wp;
|
||||
while (w !is elem) {
|
||||
if (!w) return false;
|
||||
wp = w;
|
||||
w = w.next;
|
||||
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||
}
|
||||
if (wp) wp.next = w.next;
|
||||
else m_first = w.next;
|
||||
return true;
|
||||
}
|
||||
|
||||
void filter(scope bool delegate(T* el) @safe nothrow del)
|
||||
{
|
||||
debug uint counter = 0;
|
||||
T* w = m_first, pw;
|
||||
while (w !is null) {
|
||||
auto wnext = w.next;
|
||||
if (!del(w)) {
|
||||
if (pw) pw.next = wnext;
|
||||
else m_first = wnext;
|
||||
} else pw = w;
|
||||
w = wnext;
|
||||
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||
}
|
||||
}
|
||||
|
||||
void iterate(scope bool delegate(T* el) @safe nothrow del)
|
||||
{
|
||||
debug uint counter = 0;
|
||||
T* w = m_first;
|
||||
while (w !is null) {
|
||||
auto wnext = w.next;
|
||||
if (!del(w)) break;
|
||||
w = wnext;
|
||||
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1366,7 +1348,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
|
|||
{
|
||||
}
|
||||
|
||||
|
||||
@trusted bool tryLock()
|
||||
{
|
||||
if (cas(&m_locked, false, true)) {
|
||||
|
|
Loading…
Reference in a new issue