Merge pull request #26 from vibe-d/improve_threadlocalwaiter
Improve and fix (Local)ManualEvent
This commit is contained in:
commit
196325c0e4
|
@ -563,7 +563,7 @@ private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, Input
|
||||||
*/
|
*/
|
||||||
struct DirectoryWatcher { // TODO: avoid all those heap allocations!
|
struct DirectoryWatcher { // TODO: avoid all those heap allocations!
|
||||||
import std.array : Appender, appender;
|
import std.array : Appender, appender;
|
||||||
import vibe.core.sync : LocalManualEvent;
|
import vibe.core.sync : LocalManualEvent, createManualEvent;
|
||||||
|
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
|
@ -594,6 +594,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
|
||||||
private this(NativePath path, bool recursive)
|
private this(NativePath path, bool recursive)
|
||||||
{
|
{
|
||||||
m_context = new Context; // FIME: avoid GC allocation (use FD user data slot)
|
m_context = new Context; // FIME: avoid GC allocation (use FD user data slot)
|
||||||
|
m_context.changeEvent = createManualEvent();
|
||||||
m_watcher = eventDriver.watchers.watchDirectory(path.toNativeString, recursive, &m_context.onChange);
|
m_watcher = eventDriver.watchers.watchDirectory(path.toNativeString, recursive, &m_context.onChange);
|
||||||
m_context.path = path;
|
m_context.path = path;
|
||||||
m_context.recursive = recursive;
|
m_context.recursive = recursive;
|
||||||
|
|
|
@ -23,7 +23,9 @@ import std.traits : ReturnType;
|
||||||
*/
|
*/
|
||||||
LocalManualEvent createManualEvent()
|
LocalManualEvent createManualEvent()
|
||||||
@safe {
|
@safe {
|
||||||
return LocalManualEvent.init;
|
LocalManualEvent ret;
|
||||||
|
ret.initialize();
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
/// ditto
|
/// ditto
|
||||||
shared(ManualEvent) createSharedManualEvent()
|
shared(ManualEvent) createSharedManualEvent()
|
||||||
|
@ -319,7 +321,6 @@ unittest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
||||||
unittest { // test deferred throwing
|
unittest { // test deferred throwing
|
||||||
import vibe.core.core;
|
import vibe.core.core;
|
||||||
|
|
||||||
|
@ -357,7 +358,6 @@ unittest { // test deferred throwing
|
||||||
runEventLoop();
|
runEventLoop();
|
||||||
}
|
}
|
||||||
|
|
||||||
version (VibeLibevDriver) {} else // timers are not implemented for libev, yet
|
|
||||||
unittest {
|
unittest {
|
||||||
runMutexUnitTests!TaskMutex();
|
runMutexUnitTests!TaskMutex();
|
||||||
}
|
}
|
||||||
|
@ -679,28 +679,46 @@ struct LocalManualEvent {
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
private {
|
private {
|
||||||
int m_emitCount;
|
alias Waiter = ThreadLocalWaiter!false;
|
||||||
ThreadLocalWaiter m_waiter;
|
|
||||||
|
Waiter m_waiter;
|
||||||
}
|
}
|
||||||
|
|
||||||
// thread destructor in vibe.core.core will decrement the ref. count
|
// thread destructor in vibe.core.core will decrement the ref. count
|
||||||
package static EventID ms_threadEvent;
|
package static EventID ms_threadEvent;
|
||||||
|
|
||||||
//@disable this(this); // FIXME: commenting this out this is not a good idea...
|
private void initialize()
|
||||||
|
{
|
||||||
|
import vibe.internal.allocator : Mallocator, makeGCSafe;
|
||||||
|
m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
|
||||||
|
}
|
||||||
|
|
||||||
deprecated("LocalManualEvent is always non-null!")
|
this(this)
|
||||||
bool opCast() const nothrow { return true; }
|
{
|
||||||
|
if (m_waiter)
|
||||||
|
return m_waiter.addRef();
|
||||||
|
}
|
||||||
|
|
||||||
|
~this()
|
||||||
|
{
|
||||||
|
import vibe.internal.allocator : Mallocator, disposeGCSafe;
|
||||||
|
if (m_waiter) {
|
||||||
|
if (!m_waiter.releaseRef())
|
||||||
|
() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool opCast() const nothrow { return m_waiter !is null; }
|
||||||
|
|
||||||
/// A counter that is increased with every emit() call
|
/// A counter that is increased with every emit() call
|
||||||
int emitCount() const nothrow { return m_emitCount; }
|
int emitCount() const nothrow { return m_waiter.m_emitCount; }
|
||||||
/// ditto
|
|
||||||
int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
|
|
||||||
|
|
||||||
/// Emits the signal, waking up all owners of the signal.
|
/// Emits the signal, waking up all owners of the signal.
|
||||||
int emit()
|
int emit()
|
||||||
nothrow {
|
nothrow {
|
||||||
|
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
|
||||||
logTrace("unshared emit");
|
logTrace("unshared emit");
|
||||||
auto ec = m_emitCount++;
|
auto ec = m_waiter.m_emitCount++;
|
||||||
m_waiter.emit();
|
m_waiter.emit();
|
||||||
return ec;
|
return ec;
|
||||||
}
|
}
|
||||||
|
@ -708,8 +726,9 @@ struct LocalManualEvent {
|
||||||
/// Emits the signal, waking up a single owners of the signal.
|
/// Emits the signal, waking up a single owners of the signal.
|
||||||
int emitSingle()
|
int emitSingle()
|
||||||
nothrow {
|
nothrow {
|
||||||
|
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
|
||||||
logTrace("unshared single emit");
|
logTrace("unshared single emit");
|
||||||
auto ec = m_emitCount++;
|
auto ec = m_waiter.m_emitCount++;
|
||||||
m_waiter.emitSingle();
|
m_waiter.emitSingle();
|
||||||
return ec;
|
return ec;
|
||||||
}
|
}
|
||||||
|
@ -751,6 +770,8 @@ struct LocalManualEvent {
|
||||||
{
|
{
|
||||||
import std.datetime : Clock, SysTime, UTC;
|
import std.datetime : Clock, SysTime, UTC;
|
||||||
|
|
||||||
|
assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
|
||||||
|
|
||||||
SysTime target_timeout, now;
|
SysTime target_timeout, now;
|
||||||
if (timeout != Duration.max) {
|
if (timeout != Duration.max) {
|
||||||
try now = Clock.currTime(UTC());
|
try now = Clock.currTime(UTC());
|
||||||
|
@ -758,14 +779,14 @@ struct LocalManualEvent {
|
||||||
target_timeout = now + timeout;
|
target_timeout = now + timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (m_emitCount <= emit_count) {
|
while (m_waiter.m_emitCount <= emit_count) {
|
||||||
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
|
m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
|
||||||
try now = Clock.currTime(UTC());
|
try now = Clock.currTime(UTC());
|
||||||
catch (Exception e) { assert(false, e.msg); }
|
catch (Exception e) { assert(false, e.msg); }
|
||||||
if (now >= target_timeout) break;
|
if (now >= target_timeout) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return m_emitCount;
|
return m_waiter.m_emitCount;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,6 +806,46 @@ unittest {
|
||||||
runEventLoop();
|
runEventLoop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
|
||||||
|
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
|
||||||
|
|
||||||
|
LocalManualEvent l = createManualEvent();
|
||||||
|
|
||||||
|
Task t2;
|
||||||
|
runTask({
|
||||||
|
l.wait();
|
||||||
|
t2.interrupt();
|
||||||
|
sleep(20.msecs);
|
||||||
|
exitEventLoop();
|
||||||
|
});
|
||||||
|
t2 = runTask({
|
||||||
|
try {
|
||||||
|
l.wait();
|
||||||
|
assert(false, "Shouldn't reach this.");
|
||||||
|
} catch (InterruptException e) {}
|
||||||
|
});
|
||||||
|
runTask({
|
||||||
|
l.emit();
|
||||||
|
});
|
||||||
|
runEventLoop();
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest { // ensure that LocalManualEvent behaves correctly after being copied
|
||||||
|
import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
|
||||||
|
|
||||||
|
LocalManualEvent l = createManualEvent();
|
||||||
|
runTask({
|
||||||
|
auto lc = l;
|
||||||
|
sleep(100.msecs);
|
||||||
|
lc.emit();
|
||||||
|
});
|
||||||
|
runTask({
|
||||||
|
assert(l.wait(1.seconds, l.emitCount));
|
||||||
|
exitEventLoop();
|
||||||
|
});
|
||||||
|
runEventLoop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** A manually triggered multi threaded cross-task event.
|
/** A manually triggered multi threaded cross-task event.
|
||||||
|
|
||||||
|
@ -793,14 +854,17 @@ unittest {
|
||||||
struct ManualEvent {
|
struct ManualEvent {
|
||||||
import core.thread : Thread;
|
import core.thread : Thread;
|
||||||
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
|
import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
|
||||||
|
import vibe.internal.list : StackSList;
|
||||||
|
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
private {
|
private {
|
||||||
|
alias ThreadWaiter = ThreadLocalWaiter!true;
|
||||||
|
|
||||||
int m_emitCount;
|
int m_emitCount;
|
||||||
static struct Waiters {
|
static struct Waiters {
|
||||||
StackSList!ThreadLocalWaiter active; // actively waiting
|
StackSList!ThreadWaiter active; // actively waiting
|
||||||
StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs
|
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
|
||||||
}
|
}
|
||||||
Monitor!(Waiters, shared(SpinLock)) m_waiters;
|
Monitor!(Waiters, shared(SpinLock)) m_waiters;
|
||||||
}
|
}
|
||||||
|
@ -813,7 +877,7 @@ struct ManualEvent {
|
||||||
all
|
all
|
||||||
}
|
}
|
||||||
|
|
||||||
@disable this(this); // FIXME: commenting this out this is not a good idea...
|
@disable this(this);
|
||||||
|
|
||||||
deprecated("ManualEvent is always non-null!")
|
deprecated("ManualEvent is always non-null!")
|
||||||
bool opCast() const shared nothrow { return true; }
|
bool opCast() const shared nothrow { return true; }
|
||||||
|
@ -831,12 +895,14 @@ struct ManualEvent {
|
||||||
auto ec = atomicOp!"+="(m_emitCount, 1);
|
auto ec = atomicOp!"+="(m_emitCount, 1);
|
||||||
auto thisthr = Thread.getThis();
|
auto thisthr = Thread.getThis();
|
||||||
|
|
||||||
ThreadLocalWaiter* lw;
|
ThreadWaiter lw;
|
||||||
auto drv = eventDriver;
|
auto drv = eventDriver;
|
||||||
m_waiters.lock.active.filter((ThreadLocalWaiter* w) {
|
m_waiters.lock.active.filter((ThreadWaiter w) {
|
||||||
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
||||||
if (w.m_driver is drv) lw = w;
|
if (w.m_driver is drv) {
|
||||||
else {
|
lw = w;
|
||||||
|
lw.addRef();
|
||||||
|
} else {
|
||||||
try {
|
try {
|
||||||
assert(w.m_event != EventID.init);
|
assert(w.m_event != EventID.init);
|
||||||
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
||||||
|
@ -845,7 +911,10 @@ struct ManualEvent {
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
||||||
if (lw) lw.emit();
|
if (lw) {
|
||||||
|
lw.emit();
|
||||||
|
releaseWaiter(lw);
|
||||||
|
}
|
||||||
|
|
||||||
logTrace("emit shared done");
|
logTrace("emit shared done");
|
||||||
|
|
||||||
|
@ -862,13 +931,15 @@ struct ManualEvent {
|
||||||
auto ec = atomicOp!"+="(m_emitCount, 1);
|
auto ec = atomicOp!"+="(m_emitCount, 1);
|
||||||
auto thisthr = Thread.getThis();
|
auto thisthr = Thread.getThis();
|
||||||
|
|
||||||
ThreadLocalWaiter* lw;
|
ThreadWaiter lw;
|
||||||
auto drv = eventDriver;
|
auto drv = eventDriver;
|
||||||
m_waiters.lock.active.iterate((ThreadLocalWaiter* w) {
|
m_waiters.lock.active.iterate((ThreadWaiter w) {
|
||||||
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
() @trusted { logTrace("waiter %s", cast(void*)w); } ();
|
||||||
if (w.unused) return true;
|
if (w.unused) return true;
|
||||||
if (w.m_driver is drv) lw = w;
|
if (w.m_driver is drv) {
|
||||||
else {
|
lw = w;
|
||||||
|
lw.addRef();
|
||||||
|
} else {
|
||||||
try {
|
try {
|
||||||
assert(w.m_event != EventID.invalid);
|
assert(w.m_event != EventID.invalid);
|
||||||
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
|
||||||
|
@ -877,7 +948,10 @@ struct ManualEvent {
|
||||||
return false;
|
return false;
|
||||||
});
|
});
|
||||||
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
() @trusted { logTrace("lw %s", cast(void*)lw); } ();
|
||||||
if (lw) lw.emit();
|
if (lw) {
|
||||||
|
lw.emitSingle();
|
||||||
|
releaseWaiter(lw);
|
||||||
|
}
|
||||||
|
|
||||||
logTrace("emit shared done");
|
logTrace("emit shared done");
|
||||||
|
|
||||||
|
@ -937,7 +1011,7 @@ struct ManualEvent {
|
||||||
|
|
||||||
int ec = this.emitCount;
|
int ec = this.emitCount;
|
||||||
|
|
||||||
acquireThreadWaiter((ref ThreadLocalWaiter w) {
|
acquireThreadWaiter((scope ThreadWaiter w) {
|
||||||
while (ec <= emit_count) {
|
while (ec <= emit_count) {
|
||||||
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count);
|
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => this.emitCount > emit_count);
|
||||||
ec = this.emitCount;
|
ec = this.emitCount;
|
||||||
|
@ -955,10 +1029,9 @@ struct ManualEvent {
|
||||||
|
|
||||||
private void acquireThreadWaiter(DEL)(scope DEL del)
|
private void acquireThreadWaiter(DEL)(scope DEL del)
|
||||||
shared {
|
shared {
|
||||||
import vibe.internal.allocator : processAllocator, make;
|
import vibe.internal.allocator : processAllocator, makeGCSafe;
|
||||||
import core.memory : GC;
|
|
||||||
|
|
||||||
ThreadLocalWaiter* w;
|
ThreadWaiter w;
|
||||||
auto drv = eventDriver;
|
auto drv = eventDriver;
|
||||||
|
|
||||||
with (m_waiters.lock) {
|
with (m_waiters.lock) {
|
||||||
|
@ -984,35 +1057,38 @@ struct ManualEvent {
|
||||||
if (!w) {
|
if (!w) {
|
||||||
() @trusted {
|
() @trusted {
|
||||||
try {
|
try {
|
||||||
w = processAllocator.make!ThreadLocalWaiter;
|
w = processAllocator.makeGCSafe!ThreadWaiter;
|
||||||
w.m_driver = drv;
|
w.m_driver = drv;
|
||||||
w.m_event = ms_threadEvent;
|
w.m_event = ms_threadEvent;
|
||||||
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assert(false, "Failed to allocate thread waiter.");
|
assert(false, "Failed to allocate thread waiter.");
|
||||||
}
|
}
|
||||||
} ();
|
} ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(w.m_refCount == 1);
|
||||||
active.add(w);
|
active.add(w);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scope (exit) {
|
scope (exit) releaseWaiter(w);
|
||||||
|
|
||||||
|
del(w);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseWaiter(ThreadWaiter w)
|
||||||
|
shared nothrow {
|
||||||
if (!w.releaseRef()) {
|
if (!w.releaseRef()) {
|
||||||
assert(w.m_driver is drv);
|
assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
|
||||||
assert(w.unused);
|
assert(w.unused, "Waiter still used, but not referenced!?");
|
||||||
with (m_waiters.lock) {
|
with (m_waiters.lock) {
|
||||||
auto rmvd = active.remove(w);
|
auto rmvd = active.remove(w);
|
||||||
assert(rmvd);
|
assert(rmvd, "Waiter not in active queue anymore!?");
|
||||||
free.add(w);
|
free.add(w);
|
||||||
// TODO: cap size of m_freeWaiters
|
// TODO: cap size of m_freeWaiters
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
del(*w);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest {
|
unittest {
|
||||||
|
@ -1155,10 +1231,12 @@ package struct SpinLock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private struct ThreadLocalWaiter {
|
private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
|
||||||
|
import vibe.internal.list : CircularDList;
|
||||||
|
|
||||||
private {
|
private {
|
||||||
static struct Waiter {
|
static struct TaskWaiter {
|
||||||
Waiter* next;
|
TaskWaiter* prev, next;
|
||||||
Task task;
|
Task task;
|
||||||
void delegate() @safe nothrow notifier;
|
void delegate() @safe nothrow notifier;
|
||||||
bool cancelled;
|
bool cancelled;
|
||||||
|
@ -1170,39 +1248,52 @@ private struct ThreadLocalWaiter {
|
||||||
void cancel() @safe nothrow { cancelled = true; notifier = null; }
|
void cancel() @safe nothrow { cancelled = true; notifier = null; }
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadLocalWaiter* next;
|
static if (EVENT_TRIGGERED) {
|
||||||
|
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
|
||||||
NativeEventDriver m_driver;
|
NativeEventDriver m_driver;
|
||||||
EventID m_event = EventID.invalid;
|
EventID m_event = EventID.invalid;
|
||||||
Waiter* m_waiters;
|
} else {
|
||||||
|
int m_emitCount = 0;
|
||||||
|
}
|
||||||
int m_refCount = 1;
|
int m_refCount = 1;
|
||||||
|
TaskWaiter m_pivot;
|
||||||
|
TaskWaiter m_emitPivot;
|
||||||
|
CircularDList!(TaskWaiter*) m_waiters;
|
||||||
}
|
}
|
||||||
|
|
||||||
this(this)
|
this()
|
||||||
@safe nothrow {
|
{
|
||||||
if (m_event != EventID.invalid)
|
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
|
||||||
m_driver.events.addRef(m_event);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static if (EVENT_TRIGGERED) {
|
||||||
~this()
|
~this()
|
||||||
@safe nothrow {
|
{
|
||||||
if (m_event != EventID.invalid)
|
if (m_event != EventID.invalid)
|
||||||
m_driver.events.releaseRef(m_event);
|
eventDriver.events.releaseRef(m_event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@property bool unused() const @safe nothrow { return m_waiters is null; }
|
@property bool unused() const @safe nothrow { return m_waiters.empty; }
|
||||||
|
|
||||||
void addRef() @safe nothrow { m_refCount++; }
|
void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
|
||||||
bool releaseRef() @safe nothrow { return --m_refCount > 0; }
|
bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
|
||||||
|
|
||||||
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
|
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
|
||||||
@safe {
|
@safe {
|
||||||
import std.datetime : SysTime, Clock, UTC;
|
import std.datetime : SysTime, Clock, UTC;
|
||||||
import vibe.internal.async : Waitable, asyncAwaitAny;
|
import vibe.internal.async : Waitable, asyncAwaitAny;
|
||||||
|
|
||||||
Waiter w;
|
TaskWaiter waiter_store;
|
||||||
Waiter* pw = () @trusted { return &w; } ();
|
TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
|
||||||
w.next = m_waiters;
|
|
||||||
m_waiters = pw;
|
m_waiters.insertBack(waiter);
|
||||||
|
assert(waiter.next !is null);
|
||||||
|
scope (exit)
|
||||||
|
if (waiter.next !is null) {
|
||||||
|
m_waiters.remove(waiter);
|
||||||
|
assert(!waiter.next);
|
||||||
|
}
|
||||||
|
|
||||||
SysTime target_timeout, now;
|
SysTime target_timeout, now;
|
||||||
if (timeout != Duration.max) {
|
if (timeout != Duration.max) {
|
||||||
|
@ -1211,32 +1302,16 @@ private struct ThreadLocalWaiter {
|
||||||
target_timeout = now + timeout;
|
target_timeout = now + timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
Waitable!(typeof(Waiter.notifier),
|
Waitable!(typeof(TaskWaiter.notifier),
|
||||||
cb => w.wait(cb),
|
cb => waiter.wait(cb),
|
||||||
cb => w.cancel(),
|
cb => waiter.cancel(),
|
||||||
) waitable;
|
) waitable;
|
||||||
|
|
||||||
void removeWaiter()
|
|
||||||
@safe nothrow {
|
|
||||||
Waiter* piw = m_waiters, ppiw = null;
|
|
||||||
while (piw !is null) {
|
|
||||||
if (piw is pw) {
|
|
||||||
if (ppiw) ppiw.next = piw.next;
|
|
||||||
else m_waiters = piw.next;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ppiw = piw;
|
|
||||||
piw = piw.next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
scope (failure) removeWaiter();
|
|
||||||
|
|
||||||
if (evt != EventID.invalid) {
|
if (evt != EventID.invalid) {
|
||||||
Waitable!(EventCallback,
|
Waitable!(EventCallback,
|
||||||
(cb) {
|
(cb) {
|
||||||
eventDriver.events.wait(evt, cb);
|
eventDriver.events.wait(evt, cb);
|
||||||
// check for exit codition *after* starting to wait for the event
|
// check for exit condition *after* starting to wait for the event
|
||||||
// to avoid a race condition
|
// to avoid a race condition
|
||||||
if (exit_condition()) {
|
if (exit_condition()) {
|
||||||
eventDriver.events.cancelWait(evt, cb);
|
eventDriver.events.cancelWait(evt, cb);
|
||||||
|
@ -1251,109 +1326,57 @@ private struct ThreadLocalWaiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (waitable.cancelled) {
|
if (waitable.cancelled) {
|
||||||
removeWaiter();
|
assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
|
||||||
return false;
|
return false;
|
||||||
} else debug {
|
} else {
|
||||||
Waiter* piw = m_waiters;
|
assert(waiter.next is null, "Triggered waiter still in queue!?");
|
||||||
while (piw !is null) {
|
|
||||||
assert(piw !is pw, "Thread local waiter still in queue after it got notified!?");
|
|
||||||
piw = piw.next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool emit()
|
void emit()
|
||||||
@safe nothrow {
|
@safe nothrow {
|
||||||
if (!m_waiters) return false;
|
import std.algorithm.mutation : swap;
|
||||||
|
import vibe.core.core : yield;
|
||||||
|
|
||||||
Waiter* waiters = m_waiters;
|
if (m_waiters.empty) return;
|
||||||
m_waiters = null;
|
|
||||||
while (waiters) {
|
TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
|
||||||
auto wnext = waiters.next;
|
|
||||||
assert(wnext !is waiters);
|
if (pivot.next) { // another emit in progress?
|
||||||
if (waiters.notifier !is null) {
|
// shift pivot to the end, so that the other emit call will process all pending waiters
|
||||||
logTrace("notify task %s %s %s", cast(void*)waiters, () @trusted { return cast(void*)waiters.notifier.funcptr; } (), waiters.notifier.ptr);
|
if (pivot !is m_waiters.back) {
|
||||||
waiters.notifier();
|
m_waiters.remove(pivot);
|
||||||
waiters.notifier = null;
|
m_waiters.insertBack(pivot);
|
||||||
} else logTrace("notify callback is null");
|
}
|
||||||
waiters = wnext;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
m_waiters.insertBack(pivot);
|
||||||
|
scope (exit) m_waiters.remove(pivot);
|
||||||
|
|
||||||
|
foreach (w; m_waiters) {
|
||||||
|
if (w is pivot) break;
|
||||||
|
emitWaiter(w);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool emitSingle()
|
bool emitSingle()
|
||||||
@safe nothrow {
|
@safe nothrow {
|
||||||
if (!m_waiters) return false;
|
if (m_waiters.empty) return false;
|
||||||
|
emitWaiter(m_waiters.front);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emitWaiter(TaskWaiter* w)
|
||||||
|
@safe nothrow {
|
||||||
|
m_waiters.remove(w);
|
||||||
|
|
||||||
auto w = m_waiters;
|
|
||||||
m_waiters = w.next;
|
|
||||||
if (w.notifier !is null) {
|
if (w.notifier !is null) {
|
||||||
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
|
logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
|
||||||
w.notifier();
|
w.notifier();
|
||||||
w.notifier = null;
|
w.notifier = null;
|
||||||
} else logTrace("notify callback is null");
|
} else logTrace("notify callback is null");
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private struct StackSList(T)
|
|
||||||
{
|
|
||||||
@safe nothrow:
|
|
||||||
|
|
||||||
import core.atomic : cas;
|
|
||||||
|
|
||||||
private T* m_first;
|
|
||||||
|
|
||||||
@property T* first() { return m_first; }
|
|
||||||
@property shared(T)* first() shared @trusted { return atomicLoad(m_first); }
|
|
||||||
|
|
||||||
bool empty() const { return m_first is null; }
|
|
||||||
|
|
||||||
void add(T* elem)
|
|
||||||
{
|
|
||||||
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
|
|
||||||
elem.next = m_first;
|
|
||||||
m_first = elem;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool remove(T* elem)
|
|
||||||
{
|
|
||||||
T* w = m_first, wp;
|
|
||||||
while (w !is elem) {
|
|
||||||
if (!w) return false;
|
|
||||||
wp = w;
|
|
||||||
w = w.next;
|
|
||||||
}
|
|
||||||
if (wp) wp.next = w.next;
|
|
||||||
else m_first = w.next;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void filter(scope bool delegate(T* el) @safe nothrow del)
|
|
||||||
{
|
|
||||||
T* w = m_first, pw;
|
|
||||||
while (w !is null) {
|
|
||||||
auto wnext = w.next;
|
|
||||||
if (!del(w)) {
|
|
||||||
if (pw) pw.next = wnext;
|
|
||||||
else m_first = wnext;
|
|
||||||
} else pw = w;
|
|
||||||
w = wnext;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void iterate(scope bool delegate(T* el) @safe nothrow del)
|
|
||||||
{
|
|
||||||
T* w = m_first;
|
|
||||||
while (w !is null) {
|
|
||||||
auto wnext = w.next;
|
|
||||||
if (!del(w)) break;
|
|
||||||
w = wnext;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1369,7 +1392,6 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@trusted bool tryLock()
|
@trusted bool tryLock()
|
||||||
{
|
{
|
||||||
if (cas(&m_locked, false, true)) {
|
if (cas(&m_locked, false, true)) {
|
||||||
|
|
|
@ -809,7 +809,7 @@ package struct TaskScheduler {
|
||||||
Returns `true` $(I iff) there are more tasks left to process.
|
Returns `true` $(I iff) there are more tasks left to process.
|
||||||
*/
|
*/
|
||||||
ScheduleStatus schedule()
|
ScheduleStatus schedule()
|
||||||
{
|
nothrow {
|
||||||
if (m_taskQueue.empty)
|
if (m_taskQueue.empty)
|
||||||
return ScheduleStatus.idle;
|
return ScheduleStatus.idle;
|
||||||
|
|
||||||
|
@ -846,7 +846,7 @@ package struct TaskScheduler {
|
||||||
|
|
||||||
/// Resumes execution of a yielded task.
|
/// Resumes execution of a yielded task.
|
||||||
private void resumeTask(Task t)
|
private void resumeTask(Task t)
|
||||||
{
|
nothrow {
|
||||||
import std.encoding : sanitize;
|
import std.encoding : sanitize;
|
||||||
|
|
||||||
logTrace("task fiber resume");
|
logTrace("task fiber resume");
|
||||||
|
|
|
@ -16,3 +16,26 @@ public import std.experimental.allocator.mallocator;
|
||||||
s_threadAllocator = () @trusted { return allocatorObject(GCAllocator.instance); } ();
|
s_threadAllocator = () @trusted { return allocatorObject(GCAllocator.instance); } ();
|
||||||
return s_threadAllocator;
|
return s_threadAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto makeGCSafe(T, Allocator, A...)(Allocator allocator, A args)
|
||||||
|
{
|
||||||
|
import core.memory : GC;
|
||||||
|
import std.traits : hasIndirections;
|
||||||
|
|
||||||
|
auto ret = allocator.make!T(args);
|
||||||
|
static if (is (T == class)) enum tsize = __traits(classInstanceSize, T);
|
||||||
|
else enum tsize = T.sizeof;
|
||||||
|
static if (hasIndirections!T)
|
||||||
|
GC.addRange(cast(void*)ret, tsize, typeid(T));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void disposeGCSafe(T, Allocator)(Allocator allocator, T obj)
|
||||||
|
{
|
||||||
|
import core.memory : GC;
|
||||||
|
import std.traits : hasIndirections;
|
||||||
|
|
||||||
|
static if (hasIndirections!T)
|
||||||
|
GC.removeRange(cast(void*)obj);
|
||||||
|
allocator.dispose(obj);
|
||||||
|
}
|
||||||
|
|
192
source/vibe/internal/list.d
Normal file
192
source/vibe/internal/list.d
Normal file
|
@ -0,0 +1,192 @@
|
||||||
|
module vibe.internal.list;
|
||||||
|
|
||||||
|
import core.atomic;
|
||||||
|
|
||||||
|
struct CircularDList(T)
|
||||||
|
{
|
||||||
|
private {
|
||||||
|
T m_pivot;
|
||||||
|
}
|
||||||
|
|
||||||
|
this(T pivot)
|
||||||
|
{
|
||||||
|
assert(pivot.prev is null && pivot.next is null);
|
||||||
|
pivot.next = pivot.prev = pivot;
|
||||||
|
m_pivot = pivot;
|
||||||
|
assert(this.empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool empty() const { return m_pivot.next is m_pivot; }
|
||||||
|
|
||||||
|
|
||||||
|
@property T front() { return m_pivot.next is m_pivot ? null : m_pivot.next; }
|
||||||
|
@property T back() { return m_pivot.prev is m_pivot ? null : m_pivot.prev; }
|
||||||
|
|
||||||
|
void remove(T elem)
|
||||||
|
{
|
||||||
|
assert(elem !is m_pivot);
|
||||||
|
elem.prev.next = elem.next;
|
||||||
|
elem.next.prev = elem.prev;
|
||||||
|
elem.prev = elem.next = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertBefore(T elem, T pivot)
|
||||||
|
{
|
||||||
|
assert(elem.prev is null && elem.next is null);
|
||||||
|
elem.prev = pivot.prev;
|
||||||
|
elem.next = pivot;
|
||||||
|
pivot.prev.next = elem;
|
||||||
|
pivot.prev = elem;
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertFront(T elem) { insertBefore(elem, m_pivot.next); }
|
||||||
|
|
||||||
|
void insertBack(T elem) { insertBefore(elem, m_pivot); }
|
||||||
|
|
||||||
|
// NOTE: allowed to remove the current element
|
||||||
|
int opApply(int delegate(T) @safe nothrow del)
|
||||||
|
@safe nothrow {
|
||||||
|
T prev = m_pivot;
|
||||||
|
debug size_t counter = 0;
|
||||||
|
while (prev.next !is m_pivot) {
|
||||||
|
auto el = prev.next;
|
||||||
|
if (auto ret = del(el))
|
||||||
|
return ret;
|
||||||
|
if (prev.next is el)
|
||||||
|
prev = prev.next;
|
||||||
|
debug assert (++counter < 1_000_000, "Cycle in list?");
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
static final class C {
|
||||||
|
C prev, next;
|
||||||
|
int i;
|
||||||
|
this(int i) { this.i = i; }
|
||||||
|
}
|
||||||
|
|
||||||
|
alias L = CircularDList!C;
|
||||||
|
auto l = L(new C(0));
|
||||||
|
assert(l.empty);
|
||||||
|
|
||||||
|
auto c = new C(1);
|
||||||
|
l.insertBack(c);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c);
|
||||||
|
assert(l.back is c);
|
||||||
|
foreach (c; l) assert(c.i == 1);
|
||||||
|
|
||||||
|
auto c2 = new C(2);
|
||||||
|
l.insertFront(c2);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c2);
|
||||||
|
assert(l.back is c);
|
||||||
|
foreach (c; l) assert(c.i == 1 || c.i == 2);
|
||||||
|
|
||||||
|
l.remove(c);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c2);
|
||||||
|
assert(l.back is c2);
|
||||||
|
foreach (c; l) assert(c.i == 2);
|
||||||
|
|
||||||
|
l.remove(c2);
|
||||||
|
assert(l.empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct StackSList(T)
|
||||||
|
{
|
||||||
|
@safe nothrow:
|
||||||
|
|
||||||
|
import core.atomic : cas;
|
||||||
|
|
||||||
|
private T m_first;
|
||||||
|
|
||||||
|
@property T first() { return m_first; }
|
||||||
|
@property T front() { return m_first; }
|
||||||
|
|
||||||
|
bool empty() const { return m_first is null; }
|
||||||
|
|
||||||
|
void add(T elem)
|
||||||
|
{
|
||||||
|
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
|
||||||
|
elem.next = m_first;
|
||||||
|
m_first = elem;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool remove(T elem)
|
||||||
|
{
|
||||||
|
debug uint counter = 0;
|
||||||
|
T w = m_first, wp;
|
||||||
|
while (w !is elem) {
|
||||||
|
if (!w) return false;
|
||||||
|
wp = w;
|
||||||
|
w = w.next;
|
||||||
|
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||||
|
}
|
||||||
|
if (wp) wp.next = w.next;
|
||||||
|
else m_first = w.next;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void filter(scope bool delegate(T el) @safe nothrow del)
|
||||||
|
{
|
||||||
|
debug uint counter = 0;
|
||||||
|
T w = m_first, pw;
|
||||||
|
while (w !is null) {
|
||||||
|
auto wnext = w.next;
|
||||||
|
if (!del(w)) {
|
||||||
|
if (pw) pw.next = wnext;
|
||||||
|
else m_first = wnext;
|
||||||
|
} else pw = w;
|
||||||
|
w = wnext;
|
||||||
|
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void iterate(scope bool delegate(T el) @safe nothrow del)
|
||||||
|
{
|
||||||
|
debug uint counter = 0;
|
||||||
|
T w = m_first;
|
||||||
|
while (w !is null) {
|
||||||
|
auto wnext = w.next;
|
||||||
|
if (!del(w)) break;
|
||||||
|
w = wnext;
|
||||||
|
debug assert(++counter < 1_000_000, "Cycle in linked list?");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
static final class C {
|
||||||
|
C next;
|
||||||
|
int i;
|
||||||
|
this(int i) { this.i = i; }
|
||||||
|
}
|
||||||
|
|
||||||
|
alias L = StackSList!C;
|
||||||
|
L l;
|
||||||
|
assert(l.empty);
|
||||||
|
|
||||||
|
auto c = new C(1);
|
||||||
|
l.add(c);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c);
|
||||||
|
l.iterate((el) { assert(el.i == 1); return true; });
|
||||||
|
|
||||||
|
auto c2 = new C(2);
|
||||||
|
l.add(c2);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c2);
|
||||||
|
l.iterate((el) { assert(el.i == 1 || el.i == 2); return true; });
|
||||||
|
|
||||||
|
l.remove(c);
|
||||||
|
assert(!l.empty);
|
||||||
|
assert(l.front is c2);
|
||||||
|
l.iterate((el) { assert(el.i == 2); return true; });
|
||||||
|
|
||||||
|
l.filter((el) => el.i == 0);
|
||||||
|
assert(l.empty);
|
||||||
|
}
|
Loading…
Reference in a new issue