Fix timeout handling in ManualEvent.

The timeout could was re-applied in every loop iteration of the wait routine, possibly causing an infinite loop.
This commit is contained in:
Sönke Ludwig 2016-12-19 20:19:58 +01:00
parent 79656a80df
commit 1b2c0f33d1

View file

@ -838,6 +838,15 @@ struct ManualEvent {
private int doWait(bool interruptible)(Duration timeout, int emit_count) private int doWait(bool interruptible)(Duration timeout, int emit_count)
{ {
import std.datetime : SysTime, Clock, UTC;
SysTime target_timeout, now;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
ThreadWaiter w; ThreadWaiter w;
@ -848,14 +857,29 @@ struct ManualEvent {
cb => tw.wait(cb), cb => tw.wait(cb),
cb => tw.cancel() cb => tw.cancel()
) waitable; ) waitable;
asyncAwaitAny!interruptible(timeout, waitable); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
ec = this.emitCount; ec = this.emitCount;
}
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
}
return ec; return ec;
} }
private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
shared { shared {
import std.datetime : SysTime, Clock, UTC;
SysTime target_timeout, now;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
target_timeout = now + timeout;
}
int ec = this.emitCount; int ec = this.emitCount;
while (ec <= emit_count) { while (ec <= emit_count) {
shared(ThreadWaiter) w; shared(ThreadWaiter) w;
@ -870,7 +894,8 @@ struct ManualEvent {
cb => tw.wait(cb), cb => tw.wait(cb),
cb => tw.cancel() cb => tw.cancel()
) waitable; ) waitable;
asyncAwaitAny!interruptible(timeout, waitable); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, waitable);
if (waitable.cancelled) break; // timeout
} else { } else {
again: again:
// if we are the first waiter for this thread, // if we are the first waiter for this thread,
@ -885,7 +910,7 @@ struct ManualEvent {
cb => tw.cancel() cb => tw.cancel()
) localwaiter; ) localwaiter;
logDebugV("Wait on event %s", ms_threadEvent); logDebugV("Wait on event %s", ms_threadEvent);
asyncAwaitAny!interruptible(timeout, eventwaiter, localwaiter); asyncAwaitAny!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, eventwaiter, localwaiter);
if (!eventwaiter.cancelled) { if (!eventwaiter.cancelled) {
if (() @trusted { return atomicLoad(w.next); } () is null) if (() @trusted { return atomicLoad(w.next); } () is null)
@ -900,6 +925,12 @@ struct ManualEvent {
}(); }();
ec = this.emitCount; ec = this.emitCount;
if (timeout != Duration.max) {
try now = Clock.currTime(UTC());
catch (Exception e) { assert(false, e.msg); }
if (now >= target_timeout) break;
}
} }
return ec; return ec;
} }