2016-03-01 19:30:42 +00:00
|
|
|
module vibe.internal.async;
|
|
|
|
|
2017-11-11 15:45:44 +00:00
|
|
|
import std.traits : ParameterTypeTuple, ReturnType;
|
2016-03-01 19:30:42 +00:00
|
|
|
import std.typecons : tuple;
|
2016-06-14 06:01:03 +00:00
|
|
|
import vibe.core.core : hibernate, switchToTask;
|
|
|
|
import vibe.core.task : InterruptException, Task;
|
2016-03-01 19:30:42 +00:00
|
|
|
import vibe.core.log;
|
|
|
|
import core.time : Duration, seconds;
|
|
|
|
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
auto asyncAwait(Callback, alias action, alias cancel)(string func = __FUNCTION__)
|
2016-06-14 06:01:03 +00:00
|
|
|
if (!is(Object == Duration)) {
|
2017-07-20 23:43:13 +00:00
|
|
|
ParameterTypeTuple!Callback results;
|
|
|
|
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
|
|
|
|
asyncAwaitAny!(true, waitable)(func);
|
|
|
|
return tuple(results);
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
|
2016-03-01 19:30:42 +00:00
|
|
|
{
|
2016-10-24 22:27:51 +00:00
|
|
|
static struct R {
|
2017-07-20 23:43:13 +00:00
|
|
|
bool completed = true;
|
2017-11-11 15:45:44 +00:00
|
|
|
ParameterTypeTuple!Callback results;
|
2016-10-24 22:27:51 +00:00
|
|
|
}
|
2017-07-20 23:43:13 +00:00
|
|
|
R ret;
|
2017-11-11 15:45:44 +00:00
|
|
|
static if (is(ReturnType!action == void)) {
|
|
|
|
alias waitable = Waitable!(Callback,
|
|
|
|
action,
|
|
|
|
(cb) { ret.completed = false; cancel(cb); },
|
|
|
|
(ParameterTypeTuple!Callback r) { ret.results = r; }
|
|
|
|
);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
alias waitable = Waitable!(Callback,
|
|
|
|
action,
|
|
|
|
(cb, waitres) { ret.completed = false; cancel(cb, waitres); },
|
|
|
|
(ParameterTypeTuple!Callback r) { ret.results = r; }
|
|
|
|
);
|
|
|
|
}
|
2017-07-20 23:43:13 +00:00
|
|
|
asyncAwaitAny!(true, waitable)(timeout, func);
|
|
|
|
return ret;
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
auto asyncAwaitUninterruptible(Callback, alias action)(string func = __FUNCTION__)
|
2016-06-14 06:01:03 +00:00
|
|
|
nothrow {
|
2016-10-24 22:27:51 +00:00
|
|
|
static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
|
2017-07-20 23:43:13 +00:00
|
|
|
else void cancel(Callback, typeof(action(Callback.init))) @safe @nogc nothrow { assert(false, "Action cannot be cancelled."); }
|
|
|
|
ParameterTypeTuple!Callback results;
|
|
|
|
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
|
|
|
|
asyncAwaitAny!(false, waitable)(func);
|
|
|
|
return tuple(results);
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
|
2016-06-14 06:01:03 +00:00
|
|
|
nothrow {
|
2017-07-20 23:43:13 +00:00
|
|
|
ParameterTypeTuple!Callback results;
|
|
|
|
alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
|
|
|
|
asyncAwaitAny!(false, waitable)(timeout, func);
|
|
|
|
return tuple(results);
|
2016-06-14 06:01:03 +00:00
|
|
|
}
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
template Waitable(CB, alias WAIT, alias CANCEL, alias DONE)
|
2017-01-15 22:55:37 +00:00
|
|
|
{
|
2016-10-24 22:27:51 +00:00
|
|
|
import std.traits : ReturnType;
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
static assert(is(typeof(WAIT(CB.init))), "WAIT must be callable with a parameter of type "~CB.stringof);
|
|
|
|
static if (is(typeof(WAIT(CB.init)) == void))
|
|
|
|
static assert(is(typeof(CANCEL(CB.init))),
|
|
|
|
"CANCEL must be callable with a parameter of type "~CB.stringof);
|
2016-10-24 22:27:51 +00:00
|
|
|
else
|
2017-07-20 23:43:13 +00:00
|
|
|
static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))),
|
2017-11-11 15:45:44 +00:00
|
|
|
"CANCEL must be callable with parameters "~CB.stringof~
|
|
|
|
" and "~typeof(WAIT(CB.init)).stringof);
|
2017-07-20 23:43:13 +00:00
|
|
|
static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))),
|
|
|
|
"DONE must be callable with types "~ParameterTypeTuple!CB.stringof);
|
|
|
|
|
|
|
|
alias Callback = CB;
|
|
|
|
alias wait = WAIT;
|
|
|
|
alias cancel = CANCEL;
|
|
|
|
alias done = DONE;
|
2016-06-14 07:26:12 +00:00
|
|
|
}
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
void asyncAwaitAny(bool interruptible, Waitables...)(Duration timeout, string func = __FUNCTION__)
|
2016-06-14 07:26:12 +00:00
|
|
|
{
|
2017-07-20 23:43:13 +00:00
|
|
|
if (timeout == Duration.max) asyncAwaitAny!(interruptible, Waitables)(func);
|
2016-06-14 07:26:12 +00:00
|
|
|
else {
|
|
|
|
import eventcore.core;
|
|
|
|
|
2016-10-05 12:40:29 +00:00
|
|
|
auto tm = eventDriver.timers.create();
|
2016-10-24 06:22:37 +00:00
|
|
|
eventDriver.timers.set(tm, timeout, 0.seconds);
|
2016-10-05 12:40:29 +00:00
|
|
|
scope (exit) eventDriver.timers.releaseRef(tm);
|
2017-07-20 23:43:13 +00:00
|
|
|
alias timerwaitable = Waitable!(TimerCallback,
|
2016-10-05 12:40:29 +00:00
|
|
|
cb => eventDriver.timers.wait(tm, cb),
|
2017-07-20 23:43:13 +00:00
|
|
|
cb => eventDriver.timers.cancelWait(tm),
|
|
|
|
(tid) {}
|
|
|
|
);
|
|
|
|
asyncAwaitAny!(interruptible, timerwaitable, Waitables)(func);
|
2016-06-14 07:26:12 +00:00
|
|
|
}
|
|
|
|
}
|
2016-06-14 06:01:03 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__)
|
|
|
|
if (Waitables.length >= 1)
|
2016-06-14 07:26:12 +00:00
|
|
|
{
|
|
|
|
import std.meta : staticMap;
|
|
|
|
import std.algorithm.searching : any;
|
2017-01-15 19:59:36 +00:00
|
|
|
import std.format : format;
|
|
|
|
import std.meta : AliasSeq;
|
2016-10-24 22:27:51 +00:00
|
|
|
import std.traits : ReturnType;
|
2016-06-14 07:26:12 +00:00
|
|
|
|
|
|
|
bool[Waitables.length] fired;
|
2016-06-15 16:21:04 +00:00
|
|
|
bool any_fired = false;
|
2016-03-01 19:30:42 +00:00
|
|
|
Task t;
|
|
|
|
|
2016-06-17 20:33:04 +00:00
|
|
|
bool still_inside = true;
|
|
|
|
scope (exit) still_inside = false;
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", Waitables.length, func);
|
2016-06-15 16:21:04 +00:00
|
|
|
|
2016-10-24 22:27:51 +00:00
|
|
|
() @trusted { logDebugV("si %x", &still_inside); } ();
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
static string waitableCode()
|
|
|
|
{
|
|
|
|
string ret;
|
|
|
|
foreach (i, W; Waitables) {
|
|
|
|
alias PTypes = ParameterTypeTuple!(CBDel!W);
|
|
|
|
ret ~= q{
|
|
|
|
alias PT%1$s = ParameterTypeTuple!(Waitables[%1$s].Callback);
|
|
|
|
scope callback_%1$s = (%2$s) @safe nothrow {
|
2017-09-27 15:43:08 +00:00
|
|
|
// NOTE: this triggers DigitalMars/optlink#18
|
|
|
|
//() @trusted { logDebugV("siw %%x", &still_inside); } ();
|
2017-07-20 23:43:13 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", %1$s, func, t != Task.init);
|
|
|
|
assert(still_inside, "Notification fired after asyncAwait had already returned!");
|
|
|
|
fired[%1$s] = true;
|
|
|
|
any_fired = true;
|
|
|
|
Waitables[%1$s].done(%3$s);
|
|
|
|
if (t != Task.init) switchToTask(t);
|
|
|
|
};
|
|
|
|
|
|
|
|
debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s);
|
|
|
|
alias WR%1$s = typeof(Waitables[%1$s].wait(callback_%1$s));
|
|
|
|
static if (is(WR%1$s == void)) Waitables[%1$s].wait(callback_%1$s);
|
|
|
|
else auto wr%1$s = Waitables[%1$s].wait(callback_%1$s);
|
|
|
|
|
|
|
|
scope (exit) {
|
|
|
|
if (!fired[%1$s]) {
|
|
|
|
debug(VibeAsyncLog) logDebugV("Cancelling operation %%s", %1$s);
|
|
|
|
static if (is(WR%1$s == void)) Waitables[%1$s].cancel(callback_%1$s);
|
|
|
|
else Waitables[%1$s].cancel(callback_%1$s, wr%1$s);
|
|
|
|
any_fired = true;
|
|
|
|
fired[%1$s] = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (any_fired) {
|
|
|
|
debug(VibeAsyncLog) logDebugV("Returning to %%s without waiting.", func);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}.format(i, generateParamDecls!(CBDel!W)(format("PT%s", i)), generateParamNames!(CBDel!W));
|
2016-06-14 07:26:12 +00:00
|
|
|
}
|
2017-07-20 23:43:13 +00:00
|
|
|
return ret;
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
2016-06-14 07:26:12 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
mixin(waitableCode());
|
|
|
|
|
2016-10-04 15:51:24 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible");
|
2016-06-17 20:33:04 +00:00
|
|
|
|
2016-06-14 07:26:12 +00:00
|
|
|
t = Task.getThis();
|
2016-06-17 20:33:04 +00:00
|
|
|
|
2016-10-24 22:27:51 +00:00
|
|
|
debug (VibeAsyncLog) scope (failure) logDebugV("Aborting wait due to exception");
|
|
|
|
|
2016-06-14 07:26:12 +00:00
|
|
|
do {
|
|
|
|
static if (interruptible) {
|
|
|
|
bool interrupted = false;
|
|
|
|
hibernate(() @safe nothrow {
|
2016-10-04 15:51:24 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Got interrupted in %s.", func);
|
2016-06-14 07:26:12 +00:00
|
|
|
interrupted = true;
|
|
|
|
});
|
2016-10-04 15:51:24 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s, interrupted=%s)", fired, interrupted);
|
2016-06-14 07:26:12 +00:00
|
|
|
if (interrupted)
|
|
|
|
throw new InterruptException;
|
2016-06-17 20:33:04 +00:00
|
|
|
} else {
|
|
|
|
hibernate();
|
2016-10-04 15:51:24 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s)", fired);
|
2016-06-17 20:33:04 +00:00
|
|
|
}
|
2016-06-15 16:21:04 +00:00
|
|
|
} while (!any_fired);
|
2016-06-14 07:26:12 +00:00
|
|
|
|
2016-10-04 15:51:24 +00:00
|
|
|
debug(VibeAsyncLog) logDebugV("Return result for %s.", func);
|
2016-03-01 19:30:42 +00:00
|
|
|
}
|
2016-06-14 07:26:12 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
private alias CBDel(alias Waitable) = Waitable.Callback;
|
2016-06-15 16:21:04 +00:00
|
|
|
|
|
|
|
@safe nothrow /*@nogc*/ unittest {
|
|
|
|
int cnt = 0;
|
2017-01-15 19:59:36 +00:00
|
|
|
auto ret = asyncAwaitUninterruptible!(void delegate(int) @safe nothrow, (cb) { cnt++; cb(42); });
|
2016-06-15 16:21:04 +00:00
|
|
|
assert(ret[0] == 42);
|
|
|
|
assert(cnt == 1);
|
2016-06-17 20:33:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@safe nothrow /*@nogc*/ unittest {
|
|
|
|
int a, b, c;
|
2017-07-20 23:43:13 +00:00
|
|
|
int w1r, w2r;
|
|
|
|
alias w1 = Waitable!(
|
2017-01-15 23:23:37 +00:00
|
|
|
void delegate(int) @safe nothrow,
|
2016-06-17 20:33:04 +00:00
|
|
|
(cb) { a++; cb(42); },
|
2017-07-20 23:43:13 +00:00
|
|
|
(cb) { assert(false); },
|
|
|
|
(i) { w1r = i; }
|
|
|
|
);
|
|
|
|
alias w2 = Waitable!(
|
2017-01-15 23:23:37 +00:00
|
|
|
void delegate(int) @safe nothrow,
|
2016-06-17 20:33:04 +00:00
|
|
|
(cb) { b++; },
|
2017-07-20 23:43:13 +00:00
|
|
|
(cb) { c++; },
|
|
|
|
(i) { w2r = i; }
|
|
|
|
);
|
|
|
|
alias w3 = Waitable!(
|
2017-01-15 23:23:37 +00:00
|
|
|
void delegate(int) @safe nothrow,
|
2017-01-15 22:55:37 +00:00
|
|
|
(cb) { c++; cb(42); },
|
|
|
|
(cb) { assert(false); },
|
|
|
|
(int n) { assert(n == 42); }
|
2017-07-20 23:43:13 +00:00
|
|
|
);
|
2016-06-17 20:33:04 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
asyncAwaitAny!(false, w1, w2);
|
|
|
|
assert(w1r == 42 && w2r == 0);
|
2016-06-17 20:33:04 +00:00
|
|
|
assert(a == 1 && b == 0 && c == 0);
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
asyncAwaitAny!(false, w2, w1);
|
|
|
|
assert(w1r == 42 && w2r == 0);
|
2016-06-17 20:33:04 +00:00
|
|
|
assert(a == 2 && b == 1 && c == 1);
|
2017-01-15 22:55:37 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
asyncAwaitAny!(false, w3);
|
2017-01-15 22:55:37 +00:00
|
|
|
assert(c == 2);
|
2016-06-17 20:33:04 +00:00
|
|
|
}
|
2017-01-15 19:59:36 +00:00
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
private string generateParamDecls(Fun)(string ptypes_name = "PTypes")
|
2017-01-15 19:59:36 +00:00
|
|
|
{
|
|
|
|
import std.format : format;
|
|
|
|
import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple;
|
|
|
|
|
2017-07-20 23:43:13 +00:00
|
|
|
if (!__ctfe) assert(false);
|
|
|
|
|
2017-01-15 19:59:36 +00:00
|
|
|
alias Types = ParameterTypeTuple!Fun;
|
|
|
|
alias SClasses = ParameterStorageClassTuple!Fun;
|
|
|
|
string ret;
|
|
|
|
foreach (i, T; Types) {
|
|
|
|
static if (i > 0) ret ~= ", ";
|
|
|
|
static if (SClasses[i] & ParameterStorageClass.lazy_) ret ~= "lazy ";
|
|
|
|
static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope ";
|
|
|
|
static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out ";
|
|
|
|
static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref ";
|
2017-07-20 23:43:13 +00:00
|
|
|
ret ~= format("%s[%s] param_%s", ptypes_name, i, i);
|
2017-01-15 19:59:36 +00:00
|
|
|
}
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
private string generateParamNames(Fun)()
|
|
|
|
{
|
|
|
|
import std.format : format;
|
2017-07-20 23:43:13 +00:00
|
|
|
if (!__ctfe) assert(false);
|
2017-01-15 19:59:36 +00:00
|
|
|
|
|
|
|
string ret;
|
|
|
|
foreach (i, T; ParameterTypeTuple!Fun) {
|
|
|
|
static if (i > 0) ret ~= ", ";
|
|
|
|
ret ~= format("param_%s", i);
|
|
|
|
}
|
|
|
|
return ret;
|
|
|
|
}
|
2017-01-15 23:20:35 +00:00
|
|
|
|
|
|
|
private template hasAnyScopeParameter(Callback) {
|
|
|
|
import std.algorithm.searching : any;
|
|
|
|
import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
|
|
|
|
alias SC = ParameterStorageClassTuple!Callback;
|
|
|
|
static if (SC.length == 0) enum hasAnyScopeParameter = false;
|
|
|
|
else enum hasAnyScopeParameter = any!(c => c & ParameterStorageClass.scope_)([SC]);
|
|
|
|
}
|