diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 4b76bbd..dfac097 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -799,8 +799,8 @@ struct ManualEvent { private { int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; - StackSList!ThreadLocalWaiter free; + StackSList!ThreadLocalWaiter active; // actively waiting + StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -955,7 +955,7 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : theAllocator, make; + import vibe.internal.allocator : processAllocator, make; import core.memory : GC; ThreadLocalWaiter* w; @@ -965,6 +965,8 @@ struct ManualEvent { active.iterate((aw) { if (aw.m_driver is drv) { w = aw; + w.addRef(); + return false; } return true; }); @@ -973,6 +975,7 @@ struct ManualEvent { free.filter((fw) { if (fw.m_driver is drv) { w = fw; + w.addRef(); return false; } return true; @@ -981,7 +984,7 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = theAllocator.make!ThreadLocalWaiter; + w = processAllocator.make!ThreadLocalWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); @@ -996,9 +999,12 @@ struct ManualEvent { } scope (exit) { - if (w.unused) { + if (!w.releaseRef()) { + assert(w.m_driver is drv); + assert(w.unused); with (m_waiters.lock) { - active.remove(w); + auto rmvd = active.remove(w); + assert(rmvd); free.add(w); // TODO: cap size of m_freeWaiters } @@ -1026,6 +1032,62 @@ unittest { runEventLoop(); } +unittest { + import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep; + import vibe.core.taskpool : TaskPool; + import core.time : msecs, usecs; + import std.concurrency : send, receiveOnly; + import std.random : uniform; + + auto tpool = new shared TaskPool(4); + scope (exit) tpool.terminate(); + + static void test(shared(ManualEvent)* evt, Task owner) + { + owner.tid.send(Task.getThis()); + + int ec = evt.emitCount; + auto thist = Task.getThis(); + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + while (ec < 5_000) { + tm.rearm(500.msecs); + sleep(uniform(0, 10_000).usecs); + if (uniform(0, 10) == 0) evt.emit(); + auto ecn = evt.wait(ec); + assert(ecn > ec); + ec = ecn; + } + } + + auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); }); + scope (exit) watchdog.stop(); + + auto e = createSharedManualEvent(); + Task[] tasks; + + runTask({ + auto thist = Task.getThis(); + + // start 25 tasks in each thread + foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist); + // collect all task handles + foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task; + + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + int pec = 0; + while (e.emitCount < 5_000) { + tm.rearm(500.msecs); + sleep(50.usecs); + e.emit(); + } + + // wait for all worker tasks to finish + foreach (t; tasks) t.join(); + }).join(); +} + package shared struct Monitor(T, M) { alias Mutex = M; @@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter { NativeEventDriver m_driver; EventID m_event = EventID.invalid; Waiter* m_waiters; + int m_refCount = 1; } this(this) @@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter { @property bool unused() const @safe nothrow { return m_waiters is null; } + void addRef() @safe nothrow { m_refCount++; } + bool releaseRef() @safe nothrow { return --m_refCount > 0; } + bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; @@ -1249,6 +1315,7 @@ private struct StackSList(T) void add(T* elem) { + debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); elem.next = m_first; m_first = elem; }