Merge pull request #23 from vibe-d/manual_event_fix
Fix double-free of ThreadLocalWaiter in ManualEvent. merged-on-behalf-of: Sönke Ludwig <s-ludwig@users.noreply.github.com>
This commit is contained in:
commit
9e390de9bf
|
@ -799,8 +799,8 @@ struct ManualEvent {
|
|||
private {
|
||||
int m_emitCount;
|
||||
static struct Waiters {
|
||||
StackSList!ThreadLocalWaiter active;
|
||||
StackSList!ThreadLocalWaiter free;
|
||||
StackSList!ThreadLocalWaiter active; // actively waiting
|
||||
StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs
|
||||
}
|
||||
Monitor!(Waiters, shared(SpinLock)) m_waiters;
|
||||
}
|
||||
|
@ -955,7 +955,7 @@ struct ManualEvent {
|
|||
|
||||
private void acquireThreadWaiter(DEL)(scope DEL del)
|
||||
shared {
|
||||
import vibe.internal.allocator : theAllocator, make;
|
||||
import vibe.internal.allocator : processAllocator, make;
|
||||
import core.memory : GC;
|
||||
|
||||
ThreadLocalWaiter* w;
|
||||
|
@ -965,6 +965,8 @@ struct ManualEvent {
|
|||
active.iterate((aw) {
|
||||
if (aw.m_driver is drv) {
|
||||
w = aw;
|
||||
w.addRef();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
@ -973,6 +975,7 @@ struct ManualEvent {
|
|||
free.filter((fw) {
|
||||
if (fw.m_driver is drv) {
|
||||
w = fw;
|
||||
w.addRef();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -981,7 +984,7 @@ struct ManualEvent {
|
|||
if (!w) {
|
||||
() @trusted {
|
||||
try {
|
||||
w = theAllocator.make!ThreadLocalWaiter;
|
||||
w = processAllocator.make!ThreadLocalWaiter;
|
||||
w.m_driver = drv;
|
||||
w.m_event = ms_threadEvent;
|
||||
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
|
||||
|
@ -996,9 +999,12 @@ struct ManualEvent {
|
|||
}
|
||||
|
||||
scope (exit) {
|
||||
if (w.unused) {
|
||||
if (!w.releaseRef()) {
|
||||
assert(w.m_driver is drv);
|
||||
assert(w.unused);
|
||||
with (m_waiters.lock) {
|
||||
active.remove(w);
|
||||
auto rmvd = active.remove(w);
|
||||
assert(rmvd);
|
||||
free.add(w);
|
||||
// TODO: cap size of m_freeWaiters
|
||||
}
|
||||
|
@ -1026,6 +1032,62 @@ unittest {
|
|||
runEventLoop();
|
||||
}
|
||||
|
||||
unittest {
|
||||
import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
|
||||
import vibe.core.taskpool : TaskPool;
|
||||
import core.time : msecs, usecs;
|
||||
import std.concurrency : send, receiveOnly;
|
||||
import std.random : uniform;
|
||||
|
||||
auto tpool = new shared TaskPool(4);
|
||||
scope (exit) tpool.terminate();
|
||||
|
||||
static void test(shared(ManualEvent)* evt, Task owner)
|
||||
{
|
||||
owner.tid.send(Task.getThis());
|
||||
|
||||
int ec = evt.emitCount;
|
||||
auto thist = Task.getThis();
|
||||
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
|
||||
scope (exit) tm.stop();
|
||||
while (ec < 5_000) {
|
||||
tm.rearm(500.msecs);
|
||||
sleep(uniform(0, 10_000).usecs);
|
||||
if (uniform(0, 10) == 0) evt.emit();
|
||||
auto ecn = evt.wait(ec);
|
||||
assert(ecn > ec);
|
||||
ec = ecn;
|
||||
}
|
||||
}
|
||||
|
||||
auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
|
||||
scope (exit) watchdog.stop();
|
||||
|
||||
auto e = createSharedManualEvent();
|
||||
Task[] tasks;
|
||||
|
||||
runTask({
|
||||
auto thist = Task.getThis();
|
||||
|
||||
// start 25 tasks in each thread
|
||||
foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
|
||||
// collect all task handles
|
||||
foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
|
||||
|
||||
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
|
||||
scope (exit) tm.stop();
|
||||
int pec = 0;
|
||||
while (e.emitCount < 5_000) {
|
||||
tm.rearm(500.msecs);
|
||||
sleep(50.usecs);
|
||||
e.emit();
|
||||
}
|
||||
|
||||
// wait for all worker tasks to finish
|
||||
foreach (t; tasks) t.join();
|
||||
}).join();
|
||||
}
|
||||
|
||||
package shared struct Monitor(T, M)
|
||||
{
|
||||
alias Mutex = M;
|
||||
|
@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter {
|
|||
NativeEventDriver m_driver;
|
||||
EventID m_event = EventID.invalid;
|
||||
Waiter* m_waiters;
|
||||
int m_refCount = 1;
|
||||
}
|
||||
|
||||
this(this)
|
||||
|
@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter {
|
|||
|
||||
@property bool unused() const @safe nothrow { return m_waiters is null; }
|
||||
|
||||
void addRef() @safe nothrow { m_refCount++; }
|
||||
bool releaseRef() @safe nothrow { return --m_refCount > 0; }
|
||||
|
||||
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
|
||||
@safe {
|
||||
import std.datetime : SysTime, Clock, UTC;
|
||||
|
@ -1249,6 +1315,7 @@ private struct StackSList(T)
|
|||
|
||||
void add(T* elem)
|
||||
{
|
||||
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
|
||||
elem.next = m_first;
|
||||
m_first = elem;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue