Convert Waitable from struct to pure template to avoid heap closures.

Also fixes a case where ThreadLocalWaiter.emitSingle overlaps a call to .emit.
This commit is contained in:
Sönke Ludwig 2017-07-21 01:43:13 +02:00
parent e32d818873
commit 9fe9783443
4 changed files with 186 additions and 153 deletions

View file

@ -8,85 +8,88 @@ import vibe.core.log;
import core.time : Duration, seconds;
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)()
auto asyncAwait(Callback, alias action, alias cancel)(string func = __FUNCTION__)
if (!is(Object == Duration)) {
Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(true, func)(waitable);
return tuple(waitable.results);
ParameterTypeTuple!Callback results;
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
asyncAwaitAny!(true, waitable)(func);
return tuple(results);
}
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
{
Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(true, func)(timeout, waitable);
static struct R {
bool completed;
bool completed = true;
typeof(waitable.results) results;
}
return R(!waitable.cancelled, waitable.results);
R ret;
alias waitable = Waitable!(Callback,
action,
(cb) { ret.completed = false; cancel(cb); },
(ParameterTypeTuple!Callback r) { ret.results = r; }
);
asyncAwaitAny!(true, waitable)(timeout, func);
return ret;
}
auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)()
auto asyncAwaitUninterruptible(Callback, alias action)(string func = __FUNCTION__)
nothrow {
static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); }
Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(false, func)(waitable);
return tuple(waitable.results);
else void cancel(Callback, typeof(action(Callback.init))) @safe @nogc nothrow { assert(false, "Action cannot be cancelled."); }
ParameterTypeTuple!Callback results;
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
asyncAwaitAny!(false, waitable)(func);
return tuple(results);
}
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
nothrow {
Waitable!(Callback, action, cancel) waitable;
asyncAwaitAny!(false, func)(timeout, waitable);
return tuple(waitable.results);
ParameterTypeTuple!Callback results;
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
asyncAwaitAny!(false, waitable)(timeout, func);
return tuple(results);
}
struct Waitable(CB, alias wait, alias cancel, on_result...)
if (on_result.length <= 1)
template Waitable(CB, alias WAIT, alias CANCEL, alias DONE)
{
import std.traits : ReturnType;
alias Callback = CB;
static if (on_result.length == 0) {
static assert(!hasAnyScopeParameter!Callback, "Need to retrieve results with a callback because of scoped parameter");
ParameterTypeTuple!Callback results;
void setResult(ref ParameterTypeTuple!Callback r) { this.results = r; }
} else {
import std.format : format;
alias PTypes = ParameterTypeTuple!Callback;
mixin(q{void setResult(%s) { on_result[0](%s); }}.format(generateParamDecls!Callback, generateParamNames!Callback));
}
bool cancelled;
auto waitCallback(Callback cb) nothrow { return wait(cb); }
static if (is(ReturnType!waitCallback == void))
void cancelCallback(Callback cb) nothrow { cancel(cb); }
static assert(is(typeof(WAIT(CB.init))), "WAIT must be callable with a parameter of type "~CB.stringof);
static if (is(typeof(WAIT(CB.init)) == void))
static assert(is(typeof(CANCEL(CB.init))),
"CANCEL must be callable with a parameter of type "~CB.stringof);
else
void cancelCallback(Callback cb, ReturnType!waitCallback r) nothrow { cancel(cb, r); }
static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))),
"CANCEL must be callable with parameters ("~CB.stringof~", "~typeof(WAIT(CB.init)).stringof~")");
static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))),
"DONE must be callable with types "~ParameterTypeTuple!CB.stringof);
alias Callback = CB;
alias wait = WAIT;
alias cancel = CANCEL;
alias done = DONE;
}
void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(Duration timeout, ref Waitables waitables)
void asyncAwaitAny(bool interruptible, Waitables...)(Duration timeout, string func = __FUNCTION__)
{
if (timeout == Duration.max) asyncAwaitAny!(interruptible, func)(waitables);
if (timeout == Duration.max) asyncAwaitAny!(interruptible, Waitables)(func);
else {
import eventcore.core;
auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, timeout, 0.seconds);
scope (exit) eventDriver.timers.releaseRef(tm);
Waitable!(TimerCallback,
alias timerwaitable = Waitable!(TimerCallback,
cb => eventDriver.timers.wait(tm, cb),
cb => eventDriver.timers.cancelWait(tm)
) timerwaitable;
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);
cb => eventDriver.timers.cancelWait(tm),
(tid) {}
);
asyncAwaitAny!(interruptible, timerwaitable, Waitables)(func);
}
}
void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)(ref Waitables waitables)
if (Waitables.length >= 1 && !is(Waitables[0] == Duration))
void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__)
if (Waitables.length >= 1)
{
import std.meta : staticMap;
import std.algorithm.searching : any;
@ -94,60 +97,60 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
import std.meta : AliasSeq;
import std.traits : ReturnType;
/*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates
bool[Waitables.length] fired;
ScopeGuard[Waitables.length] scope_guards;
bool any_fired = false;
Task t;
bool still_inside = true;
scope (exit) still_inside = false;
debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", waitables.length, func);
debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", Waitables.length, func);
() @trusted { logDebugV("si %x", &still_inside); } ();
foreach (i, W; Waitables) {
alias PTypes = ParameterTypeTuple!(CBDel!W);
/*scope*/auto cb = mixin(q{(%s) @safe nothrow {
() @trusted { logDebugV("siw %%x", &still_inside); } ();
debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", i, func, t != Task.init);
assert(still_inside, "Notification fired after asyncAwait had already returned!");
fired[i] = true;
any_fired = true;
static if (PTypes.length)
waitables[i].setResult(%s);
if (t != Task.init) switchToTask(t);
}}.format(generateParamDecls!(CBDel!W), generateParamNames!(CBDel!W)));
callbacks[i] = cb;
static string waitableCode()
{
string ret;
foreach (i, W; Waitables) {
alias PTypes = ParameterTypeTuple!(CBDel!W);
ret ~= q{
alias PT%1$s = ParameterTypeTuple!(Waitables[%1$s].Callback);
scope callback_%1$s = (%2$s) @safe nothrow {
() @trusted { logDebugV("siw %%x", &still_inside); } ();
debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", %1$s, func, t != Task.init);
assert(still_inside, "Notification fired after asyncAwait had already returned!");
fired[%1$s] = true;
any_fired = true;
Waitables[%1$s].done(%3$s);
if (t != Task.init) switchToTask(t);
};
debug(VibeAsyncLog) logDebugV("Starting operation %s", i);
static if (is(ReturnType!(W.waitCallback) == void))
waitables[i].waitCallback(callbacks[i]);
else
auto wr = waitables[i].waitCallback(callbacks[i]);
debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s);
alias WR%1$s = typeof(Waitables[%1$s].wait(callback_%1$s));
static if (is(WR%1$s == void)) Waitables[%1$s].wait(callback_%1$s);
else auto wr%1$s = Waitables[%1$s].wait(callback_%1$s);
scope ccb = () @safe nothrow {
if (!fired[i]) {
debug(VibeAsyncLog) logDebugV("Cancelling operation %s", i);
static if (is(ReturnType!(W.waitCallback) == void))
waitables[i].cancelCallback(callbacks[i]);
else
waitables[i].cancelCallback(callbacks[i], wr);
waitables[i].cancelled = true;
any_fired = true;
fired[i] = true;
}
};
scope_guards[i] = ScopeGuard(ccb);
scope (exit) {
if (!fired[%1$s]) {
debug(VibeAsyncLog) logDebugV("Cancelling operation %%s", %1$s);
static if (is(WR%1$s == void)) Waitables[%1$s].cancel(callback_%1$s);
else Waitables[%1$s].cancel(callback_%1$s, wr%1$s);
any_fired = true;
fired[%1$s] = true;
}
}
if (any_fired) {
debug(VibeAsyncLog) logDebugV("Returning to %s without waiting.", func);
return;
if (any_fired) {
debug(VibeAsyncLog) logDebugV("Returning to %%s without waiting.", func);
return;
}
}.format(i, generateParamDecls!(CBDel!W)(format("PT%s", i)), generateParamNames!(CBDel!W));
}
return ret;
}
mixin(waitableCode());
debug(VibeAsyncLog) logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible");
t = Task.getThis();
@ -173,9 +176,7 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
debug(VibeAsyncLog) logDebugV("Return result for %s.", func);
}
private alias CBDel(Waitable) = Waitable.Callback;
private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op !is null) op(); } }
private alias CBDel(alias Waitable) = Waitable.Callback;
@safe nothrow /*@nogc*/ unittest {
int cnt = 0;
@ -186,40 +187,45 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op
@safe nothrow /*@nogc*/ unittest {
int a, b, c;
Waitable!(
int w1r, w2r;
alias w1 = Waitable!(
void delegate(int) @safe nothrow,
(cb) { a++; cb(42); },
(cb) { assert(false); }
) w1;
Waitable!(
(cb) { assert(false); },
(i) { w1r = i; }
);
alias w2 = Waitable!(
void delegate(int) @safe nothrow,
(cb) { b++; },
(cb) { c++; }
) w2;
Waitable!(
(cb) { c++; },
(i) { w2r = i; }
);
alias w3 = Waitable!(
void delegate(int) @safe nothrow,
(cb) { c++; cb(42); },
(cb) { assert(false); },
(int n) { assert(n == 42); }
) w3;
);
asyncAwaitAny!false(w1, w2);
assert(w1.results[0] == 42 && w2.results[0] == 0);
asyncAwaitAny!(false, w1, w2);
assert(w1r == 42 && w2r == 0);
assert(a == 1 && b == 0 && c == 0);
asyncAwaitAny!false(w2, w1);
assert(w1.results[0] == 42 && w2.results[0] == 0);
asyncAwaitAny!(false, w2, w1);
assert(w1r == 42 && w2r == 0);
assert(a == 2 && b == 1 && c == 1);
asyncAwaitAny!false(w3);
asyncAwaitAny!(false, w3);
assert(c == 2);
}
private string generateParamDecls(Fun)()
private string generateParamDecls(Fun)(string ptypes_name = "PTypes")
{
import std.format : format;
import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple;
if (!__ctfe) assert(false);
alias Types = ParameterTypeTuple!Fun;
alias SClasses = ParameterStorageClassTuple!Fun;
string ret;
@ -229,7 +235,7 @@ private string generateParamDecls(Fun)()
static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope ";
static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out ";
static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref ";
ret ~= format("PTypes[%s] param_%s", i, i);
ret ~= format("%s[%s] param_%s", ptypes_name, i, i);
}
return ret;
}
@ -237,6 +243,7 @@ private string generateParamDecls(Fun)()
private string generateParamNames(Fun)()
{
import std.format : format;
if (!__ctfe) assert(false);
string ret;
foreach (i, T; ParameterTypeTuple!Fun) {