Use StackSList for ThreadLocalWaiter and add simple loop detection.

This will have to be adjusted to use a circular list with the possibility to insert a pivot element, so that consumption of waiters is safe in all cases (see the comment at 1265).
This commit is contained in:
Sönke Ludwig 2017-07-19 14:54:33 +02:00
parent d39bbf19c0
commit 4bccf6fcb5
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -1173,7 +1173,7 @@ private struct ThreadLocalWaiter {
ThreadLocalWaiter* next; ThreadLocalWaiter* next;
NativeEventDriver m_driver; NativeEventDriver m_driver;
EventID m_event = EventID.invalid; EventID m_event = EventID.invalid;
Waiter* m_waiters; StackSList!Waiter m_waiters;
int m_refCount = 1; int m_refCount = 1;
} }
@ -1189,7 +1189,7 @@ private struct ThreadLocalWaiter {
m_driver.events.releaseRef(m_event); m_driver.events.releaseRef(m_event);
} }
@property bool unused() const @safe nothrow { return m_waiters is null; } @property bool unused() const @safe nothrow { return m_waiters.empty; }
void addRef() @safe nothrow { m_refCount++; } void addRef() @safe nothrow { m_refCount++; }
bool releaseRef() @safe nothrow { return --m_refCount > 0; } bool releaseRef() @safe nothrow { return --m_refCount > 0; }
@ -1201,8 +1201,19 @@ private struct ThreadLocalWaiter {
Waiter w; Waiter w;
Waiter* pw = () @trusted { return &w; } (); Waiter* pw = () @trusted { return &w; } ();
w.next = m_waiters;
m_waiters = pw;
debug void assertWaiterNotInQueue()
{
m_waiters.iterate((w) {
assert(w !is pw, "Thread local waiter already in queue!?");
return true;
});
}
m_waiters.add(pw);
scope (failure) m_waiters.remove(pw);
SysTime target_timeout, now; SysTime target_timeout, now;
if (timeout != Duration.max) { if (timeout != Duration.max) {
@ -1216,22 +1227,6 @@ private struct ThreadLocalWaiter {
cb => w.cancel(), cb => w.cancel(),
) waitable; ) waitable;
void removeWaiter()
@safe nothrow {
Waiter* piw = m_waiters, ppiw = null;
while (piw !is null) {
if (piw is pw) {
if (ppiw) ppiw.next = piw.next;
else m_waiters = piw.next;
break;
}
ppiw = piw;
piw = piw.next;
}
}
scope (failure) removeWaiter();
if (evt != EventID.invalid) { if (evt != EventID.invalid) {
Waitable!(EventCallback, Waitable!(EventCallback,
(cb) { (cb) {
@ -1251,45 +1246,41 @@ private struct ThreadLocalWaiter {
} }
if (waitable.cancelled) { if (waitable.cancelled) {
removeWaiter(); m_waiters.remove(pw);
return false; return false;
} else debug { } else debug assertWaiterNotInQueue();
Waiter* piw = m_waiters;
while (piw !is null) {
assert(piw !is pw, "Thread local waiter still in queue after it got notified!?");
piw = piw.next;
}
}
return true; return true;
} }
bool emit() bool emit()
@safe nothrow { @safe nothrow {
if (!m_waiters) return false; import std.algorithm.mutation : swap;
Waiter* waiters = m_waiters; if (m_waiters.empty) return false;
m_waiters = null;
while (waiters) { StackSList!Waiter waiters;
auto wnext = waiters.next; swap(m_waiters, waiters);
assert(wnext !is waiters);
if (waiters.notifier !is null) { // FIXME: during iteration, waiters could remove themselves, but the head element will always stay in the list!
logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr); waiters.iterate((w) {
waiters.notifier(); if (w.notifier !is null) {
waiters.notifier = null; logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
w.notifier();
w.notifier = null;
} else logTrace("notify callback is null"); } else logTrace("notify callback is null");
waiters = wnext; return true;
} });
return true; return true;
} }
bool emitSingle() bool emitSingle()
@safe nothrow { @safe nothrow {
if (!m_waiters) return false; if (m_waiters.empty) return false;
auto w = m_waiters; auto w = m_waiters.first;
m_waiters = w.next; m_waiters.remove(w);
if (w.notifier !is null) { if (w.notifier !is null) {
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
w.notifier(); w.notifier();
@ -1322,11 +1313,13 @@ private struct StackSList(T)
bool remove(T* elem) bool remove(T* elem)
{ {
debug uint counter = 0;
T* w = m_first, wp; T* w = m_first, wp;
while (w !is elem) { while (w !is elem) {
if (!w) return false; if (!w) return false;
wp = w; wp = w;
w = w.next; w = w.next;
debug assert(++counter < 1_000_000, "Cycle in linked list?");
} }
if (wp) wp.next = w.next; if (wp) wp.next = w.next;
else m_first = w.next; else m_first = w.next;
@ -1335,6 +1328,7 @@ private struct StackSList(T)
void filter(scope bool delegate(T* el) @safe nothrow del) void filter(scope bool delegate(T* el) @safe nothrow del)
{ {
debug uint counter = 0;
T* w = m_first, pw; T* w = m_first, pw;
while (w !is null) { while (w !is null) {
auto wnext = w.next; auto wnext = w.next;
@ -1343,16 +1337,19 @@ private struct StackSList(T)
else m_first = wnext; else m_first = wnext;
} else pw = w; } else pw = w;
w = wnext; w = wnext;
debug assert(++counter < 1_000_000, "Cycle in linked list?");
} }
} }
void iterate(scope bool delegate(T* el) @safe nothrow del) void iterate(scope bool delegate(T* el) @safe nothrow del)
{ {
debug uint counter = 0;
T* w = m_first; T* w = m_first;
while (w !is null) { while (w !is null) {
auto wnext = w.next; auto wnext = w.next;
if (!del(w)) break; if (!del(w)) break;
w = wnext; w = wnext;
debug assert(++counter < 1_000_000, "Cycle in linked list?");
} }
} }
} }
@ -2007,4 +2004,4 @@ class InterruptibleTaskReadWriteMutex
/** The policy with which the lock has been created. */ /** The policy with which the lock has been created. */
@property Policy policy() const { return m_state.policy; } @property Policy policy() const { return m_state.policy; }
} }