Draft proper shared semantics for ManualEvent.

This commit is contained in:
Sönke Ludwig 2016-06-15 18:21:33 +02:00
parent a74a89cab7
commit e8ba981263

View file

@ -126,13 +126,13 @@ class LocalTaskSemaphore
//import vibe.utils.memory; //import vibe.utils.memory;
private { private {
static struct Waiter { static struct ThreadWaiter {
ManualEvent signal; ManualEvent signal;
ubyte priority; ubyte priority;
uint seq; uint seq;
} }
BinaryHeap!(Array!Waiter, asc) m_waiters; BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
uint m_maxLocks; uint m_maxLocks;
uint m_locks; uint m_locks;
uint m_seq; uint m_seq;
@ -182,7 +182,7 @@ class LocalTaskSemaphore
if (tryLock()) if (tryLock())
return; return;
Waiter w; ThreadWaiter w;
w.signal = createManualEvent(); w.signal = createManualEvent();
w.priority = priority; w.priority = priority;
w.seq = min(0, m_seq - w.priority); w.seq = min(0, m_seq - w.priority);
@ -201,7 +201,7 @@ class LocalTaskSemaphore
{ {
m_locks--; m_locks--;
if (m_waiters.length > 0 && available > 0) { if (m_waiters.length > 0 && available > 0) {
Waiter w = m_waiters.front(); ThreadWaiter w = m_waiters.front();
w.signal.emit(); // resume one w.signal.emit(); // resume one
m_waiters.removeFront(); m_waiters.removeFront();
} }
@ -209,7 +209,7 @@ class LocalTaskSemaphore
// if true, a goes after b. ie. b comes out front() // if true, a goes after b. ie. b comes out front()
/// private /// private
static bool asc(ref Waiter a, ref Waiter b) static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
{ {
if (a.seq == b.seq) { if (a.seq == b.seq) {
if (a.priority == b.priority) { if (a.priority == b.priority) {
@ -225,7 +225,7 @@ class LocalTaskSemaphore
private void rewindSeq() private void rewindSeq()
{ {
Array!Waiter waiters = m_waiters.release(); Array!ThreadWaiter waiters = m_waiters.release();
ushort min_seq; ushort min_seq;
import std.algorithm : min; import std.algorithm : min;
foreach (ref waiter; waiters[]) foreach (ref waiter; waiters[])
@ -644,23 +644,24 @@ shared(ManualEvent) createSharedManualEvent()
*/ */
struct ManualEvent { struct ManualEvent {
import core.thread : Thread; import core.thread : Thread;
import vibe.internal.async : asyncAwait, asyncAwaitUninterruptible; import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
private { private {
static struct Waiter {
Waiter* next;
immutable EventID event;
immutable EventDriver driver;
immutable Thread thread;
StackSList!ThreadWaiter tasks;
}
static struct ThreadWaiter { static struct ThreadWaiter {
ThreadWaiter* next; ThreadWaiter* next;
/*immutable*/ EventID event;
/*immutable*/ EventDriver driver;
//immutable Thread thread;
StackSList!LocalWaiter tasks;
}
static struct LocalWaiter {
LocalWaiter* next;
Task task; Task task;
void delegate() @safe nothrow notifier; void delegate() @safe nothrow notifier;
bool cancelled = false;
void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; } void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; }
void cancel() @safe nothrow { notifier = null; } void cancel() @safe nothrow { cancelled = true; auto n = notifier; notifier = null; n(); }
void wait(void delegate() @safe nothrow del) void wait(void delegate() @safe nothrow del)
shared @safe nothrow { shared @safe nothrow {
@ -675,7 +676,7 @@ struct ManualEvent {
} }
} }
int m_emitCount; int m_emitCount;
Waiter* m_waiters; ThreadWaiter* m_waiters;
} }
@ -710,20 +711,22 @@ struct ManualEvent {
final switch (mode) { final switch (mode) {
case EmitMode.all: case EmitMode.all:
// FIXME: would be nice to have atomicSwap instead // FIXME: would be nice to have atomicSwap instead
auto w = cast(Waiter*)atomicLoad(m_waiters); auto w = cast(ThreadWaiter*)atomicLoad(m_waiters);
if (w !is null && !cas(&m_waiters, cast(shared(Waiter)*)w, cast(shared(Waiter)*)null)) if (w !is null && !cas(&m_waiters, cast(shared(ThreadWaiter)*)w, cast(shared(ThreadWaiter)*)null))
return ec; return ec;
while (w !is null) { while (w !is null) {
if (w.thread is thisthr) { if (w.driver is eventDriver) {
// Note: emitForThisThread can result in w getting deallocated at any // Note: emitForThisThread can result in w getting deallocated at any
// time, so we need to copy any fields first // time, so we need to copy any fields first
auto tasks = w.tasks; auto tasks = w.tasks;
w = w.next; w = w.next;
emitForThisThread(w.tasks.m_first, mode); emitForThisThread(w.tasks.m_first, mode);
} else { } else {
auto drv = w.driver;
auto evt = w.event; auto evt = w.event;
w = w.next; w = w.next;
eventDriver.triggerEvent(evt, true); if (evt != EventID.init)
drv.triggerEvent(evt, true);
} }
} }
break; break;
@ -742,7 +745,7 @@ struct ManualEvent {
auto w = m_waiters; auto w = m_waiters;
m_waiters = null; m_waiters = null;
if (w !is null) { if (w !is null) {
assert(w.thread is Thread.getThis(), "Unshared ManualEvent has waiters in foreign thread!"); assert(w.driver is eventDriver, "Unshared ManualEvent has waiters in foreign thread!");
assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!"); assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!");
emitForThisThread(w.tasks.m_first, EmitMode.all); emitForThisThread(w.tasks.m_first, EmitMode.all);
} }
@ -781,8 +784,8 @@ struct ManualEvent {
*/ */
int wait(Duration timeout, int emit_count) int wait(Duration timeout, int emit_count)
{ {
Waiter w; ThreadWaiter w;
ThreadWaiter tw; LocalWaiter tw;
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
@ -799,12 +802,13 @@ struct ManualEvent {
/// ditto /// ditto
int wait(Duration timeout, int emit_count) int wait(Duration timeout, int emit_count)
shared { shared {
shared(Waiter) w; shared(ThreadWaiter) w;
ThreadWaiter tw; LocalWaiter tw;
acquireWaiter(w, tw);
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
acquireWaiter(w, tw);
if (tw.next) { if (tw.next) {
// if we are not the first waiter for this thread, // if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread // wait for getting resumed by emitForThisThread
@ -816,14 +820,21 @@ struct ManualEvent {
} else { } else {
// if we are the first waiter for this thread, // if we are the first waiter for this thread,
// wait for the thread event to get emitted // wait for the thread event to get emitted
/*asyncAwait!(EventCallback, void delegate() @safe nothrow, Waitable!(
cb => eventDriver.waitForEvent(ms_threadEvent, cb), cb => eventDriver.waitForEvent(ms_threadEvent, cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb),
EventID
) eventwaiter;
Waitable!(
cb => tw.wait(cb), cb => tw.wait(cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent) cb => tw.cancel()
)(timeout); ) localwaiter;
emitForThisThread(w.waiters); asyncAwaitAny!true(timeout, eventwaiter, localwaiter);
ec = this.emitCount;*/
assert(false); ec = this.emitCount;
if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode
else if (localwaiter.cancelled) break; // timeout
} }
} }
return ec; return ec;
@ -844,8 +855,8 @@ struct ManualEvent {
/// ditto /// ditto
int waitUninterruptible(Duration timeout, int emit_count) int waitUninterruptible(Duration timeout, int emit_count)
nothrow { nothrow {
Waiter w; ThreadWaiter w;
ThreadWaiter tw; LocalWaiter tw;
acquireWaiter(w, tw); acquireWaiter(w, tw);
int ec = this.emitCount; int ec = this.emitCount;
@ -861,23 +872,45 @@ struct ManualEvent {
/// ditto /// ditto
int waitUninterruptible(Duration timeout, int emit_count) int waitUninterruptible(Duration timeout, int emit_count)
shared nothrow { shared nothrow {
/*Waiter w; shared(ThreadWaiter) w;
ThreadWaiter tw; LocalWaiter tw;
auto event = acquireWaiter(w, tw);
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
asyncAwaitUninterruptible!(void delegate(), acquireWaiter(w, tw);
cb => tw.wait(cb),
cb => tw.cancel() if (tw.next) {
)(timeout); // if we are not the first waiter for this thread,
ec = this.emitCount; // wait for getting resumed by emitForThisThread
asyncAwaitUninterruptible!(void delegate() @safe nothrow,
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
} else {
// if we are the first waiter for this thread,
// wait for the thread event to get emitted
Waitable!(
cb => eventDriver.waitForEvent(ms_threadEvent, cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb),
EventID
) eventwaiter;
Waitable!(
cb => tw.wait(cb),
cb => tw.cancel()
) localwaiter;
asyncAwaitAny!false(timeout, eventwaiter, localwaiter);
ec = this.emitCount;
if (!eventwaiter.cancelled) emitForThisThread(cast(LocalWaiter*)w.tasks.m_first, EmitMode.all); // FIXME: use proper emit mode
else if (localwaiter.cancelled) break; // timeout
}
} }
return ec;*/ return ec;
assert(false);
} }
private static bool emitForThisThread(ThreadWaiter* waiters, EmitMode mode) private static bool emitForThisThread(LocalWaiter* waiters, EmitMode mode)
nothrow { nothrow {
if (!waiters) return false; if (!waiters) return false;
@ -896,9 +929,9 @@ struct ManualEvent {
return true; return true;
} }
private void acquireWaiter(ref Waiter w, ref ThreadWaiter tw) private void acquireWaiter(ref ThreadWaiter w, ref LocalWaiter tw)
nothrow { nothrow {
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the Waiter will be dangling // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the ThreadWaiter will be dangling
tw.task = Task.getThis(); tw.task = Task.getThis();
if (m_waiters) { if (m_waiters) {
@ -908,18 +941,28 @@ struct ManualEvent {
} }
} }
private void acquireWaiter(ref shared(Waiter) w, ref ThreadWaiter tw) private void acquireWaiter(ref shared(ThreadWaiter) w, ref LocalWaiter tw)
nothrow shared { nothrow shared {
tw.task = Task.getThis(); tw.task = Task.getThis();
if (ms_threadEvent == EventID.init) if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.createEvent(); ms_threadEvent = eventDriver.createEvent();
auto sdriver = cast(shared)eventDriver;
if (m_waiters) { if (m_waiters) {
//m_waiters.tasks.add(&tw); shared(ThreadWaiter)* pw = m_waiters;
assert(false); while (pw !is null) {
if (pw.driver is sdriver) {
(cast(ThreadWaiter*)pw).tasks.add(&tw);
break;
}
pw = atomicLoad(pw.next);
}
} else { } else {
m_waiters = &w; m_waiters = &w;
w.event = ms_threadEvent;
w.driver = sdriver;
} }
} }
} }