Fix double-free of ThreadLocalWaiter in ManualEvent.

Also adds assertions and a randomized multi-thread test to rule out similar issues with a higher confidence.
This commit is contained in:
Sönke Ludwig 2017-07-15 18:16:13 +02:00
parent 254d91dcdf
commit 7efb496208
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -799,8 +799,8 @@ struct ManualEvent {
private {
int m_emitCount;
static struct Waiters {
StackSList!ThreadLocalWaiter active;
StackSList!ThreadLocalWaiter free;
StackSList!ThreadLocalWaiter active; // actively waiting
StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(SpinLock)) m_waiters;
}
@ -955,7 +955,7 @@ struct ManualEvent {
private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : theAllocator, make;
import vibe.internal.allocator : processAllocator, make;
import core.memory : GC;
ThreadLocalWaiter* w;
@ -965,6 +965,8 @@ struct ManualEvent {
active.iterate((aw) {
if (aw.m_driver is drv) {
w = aw;
w.addRef();
return false;
}
return true;
});
@ -973,6 +975,7 @@ struct ManualEvent {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
@ -981,7 +984,7 @@ struct ManualEvent {
if (!w) {
() @trusted {
try {
w = theAllocator.make!ThreadLocalWaiter;
w = processAllocator.make!ThreadLocalWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
@ -996,9 +999,12 @@ struct ManualEvent {
}
scope (exit) {
if (w.unused) {
if (!w.releaseRef()) {
assert(w.m_driver is drv);
assert(w.unused);
with (m_waiters.lock) {
active.remove(w);
auto rmvd = active.remove(w);
assert(rmvd);
free.add(w);
// TODO: cap size of m_freeWaiters
}
@ -1026,6 +1032,62 @@ unittest {
runEventLoop();
}
unittest {
import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
import vibe.core.taskpool : TaskPool;
import core.time : msecs, usecs;
import std.concurrency : send, receiveOnly;
import std.random : uniform;
auto tpool = new shared TaskPool(4);
scope (exit) tpool.terminate();
static void test(shared(ManualEvent)* evt, Task owner)
{
owner.tid.send(Task.getThis());
int ec = evt.emitCount;
auto thist = Task.getThis();
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
while (ec < 5_000) {
tm.rearm(500.msecs);
sleep(uniform(0, 10_000).usecs);
if (uniform(0, 10) == 0) evt.emit();
auto ecn = evt.wait(ec);
assert(ecn > ec);
ec = ecn;
}
}
auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
scope (exit) watchdog.stop();
auto e = createSharedManualEvent();
Task[] tasks;
runTask({
auto thist = Task.getThis();
// start 25 tasks in each thread
foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
// collect all task handles
foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
int pec = 0;
while (e.emitCount < 5_000) {
tm.rearm(500.msecs);
sleep(50.usecs);
e.emit();
}
// wait for all worker tasks to finish
foreach (t; tasks) t.join();
}).join();
}
package shared struct Monitor(T, M)
{
alias Mutex = M;
@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter {
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
Waiter* m_waiters;
int m_refCount = 1;
}
this(this)
@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter {
@property bool unused() const @safe nothrow { return m_waiters is null; }
void addRef() @safe nothrow { m_refCount++; }
bool releaseRef() @safe nothrow { return --m_refCount > 0; }
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
@safe {
import std.datetime : SysTime, Clock, UTC;
@ -1249,6 +1315,7 @@ private struct StackSList(T)
void add(T* elem)
{
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
elem.next = m_first;
m_first = elem;
}