TCPConnection and async improvements.
- asyncAwaitAny now takes the callback type, so that parameter storage classes are handled correctly - Implement TCPConnection.remoteAddress/localAddress - Implement TCPConnection.tcpNoDelay, keepAlive and readTimeout - Implement timeout handling for TCPConnection.waitForData
This commit is contained in:
parent
d1fad2c98b
commit
f015662a94
3 changed files with 136 additions and 53 deletions
|
@ -10,14 +10,14 @@ import core.time : Duration, seconds;
|
|||
|
||||
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)()
|
||||
if (!is(Object == Duration)) {
|
||||
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
|
||||
Waitable!(action, cancel, Callback) waitable;
|
||||
asyncAwaitAny!(true, func)(waitable);
|
||||
return tuple(waitable.results);
|
||||
}
|
||||
|
||||
auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
|
||||
{
|
||||
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
|
||||
Waitable!(action, cancel, Callback) waitable;
|
||||
asyncAwaitAny!(true, func)(timeout, waitable);
|
||||
static struct R {
|
||||
bool completed;
|
||||
|
@ -30,23 +30,24 @@ auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION_
|
|||
nothrow {
|
||||
static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
|
||||
else void cancel(Callback, typeof(action(Callback.init))) { assert(false, "Action cannot be cancelled."); }
|
||||
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
|
||||
Waitable!(action, cancel, Callback) waitable;
|
||||
asyncAwaitAny!(false, func)(waitable);
|
||||
return tuple(waitable.results);
|
||||
}
|
||||
|
||||
auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout)
|
||||
nothrow {
|
||||
Waitable!(action, cancel, ParameterTypeTuple!Callback) waitable;
|
||||
Waitable!(action, cancel, Callback) waitable;
|
||||
asyncAwaitAny!(false, func)(timeout, waitable);
|
||||
return tuple(waitable.results);
|
||||
}
|
||||
|
||||
struct Waitable(alias wait, alias cancel, Results...) {
|
||||
struct Waitable(alias wait, alias cancel, CB) {
|
||||
import std.traits : ReturnType;
|
||||
|
||||
alias Callback = void delegate(Results) @safe nothrow;
|
||||
Results results;
|
||||
alias Callback = CB;
|
||||
|
||||
ParameterTypeTuple!Callback results;
|
||||
bool cancelled;
|
||||
auto waitCallback(Callback cb) nothrow { return wait(cb); }
|
||||
|
||||
|
@ -68,7 +69,7 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
|
|||
Waitable!(
|
||||
cb => eventDriver.timers.wait(tm, cb),
|
||||
cb => eventDriver.timers.cancelWait(tm),
|
||||
TimerID
|
||||
TimerCallback
|
||||
) timerwaitable;
|
||||
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);
|
||||
}
|
||||
|
@ -79,6 +80,8 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
|
|||
{
|
||||
import std.meta : staticMap;
|
||||
import std.algorithm.searching : any;
|
||||
import std.format : format;
|
||||
import std.meta : AliasSeq;
|
||||
import std.traits : ReturnType;
|
||||
|
||||
/*scope*/ staticMap!(CBDel, Waitables) callbacks; // FIXME: avoid heap delegates
|
||||
|
@ -96,15 +99,17 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
|
|||
() @trusted { logDebugV("si %x", &still_inside); } ();
|
||||
|
||||
foreach (i, W; Waitables) {
|
||||
/*scope*/auto cb = (typeof(Waitables[i].results) results) @safe nothrow {
|
||||
() @trusted { logDebugV("siw %x", &still_inside); } ();
|
||||
debug(VibeAsyncLog) logDebugV("Waitable %s in %s fired (istask=%s).", i, func, t != Task.init);
|
||||
alias PTypes = ParameterTypeTuple!(CBDel!W);
|
||||
/*scope*/auto cb = mixin(q{(%s) @safe nothrow {
|
||||
() @trusted { logDebugV("siw %%x", &still_inside); } ();
|
||||
debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", i, func, t != Task.init);
|
||||
assert(still_inside, "Notification fired after asyncAwait had already returned!");
|
||||
fired[i] = true;
|
||||
any_fired = true;
|
||||
waitables[i].results = results;
|
||||
static if (PTypes.length)
|
||||
waitables[i].results = AliasSeq!(%s);
|
||||
if (t != Task.init) switchToTask(t);
|
||||
};
|
||||
}}.format(generateParamDecls!(CBDel!W), generateParamNames!(CBDel!W)));
|
||||
callbacks[i] = cb;
|
||||
|
||||
debug(VibeAsyncLog) logDebugV("Starting operation %s", i);
|
||||
|
@ -158,13 +163,13 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
|
|||
debug(VibeAsyncLog) logDebugV("Return result for %s.", func);
|
||||
}
|
||||
|
||||
private alias CBDel(Waitable) = void delegate(typeof(Waitable.results)) @safe nothrow;
|
||||
private alias CBDel(Waitable) = Waitable.Callback;
|
||||
|
||||
private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op !is null) op(); } }
|
||||
|
||||
@safe nothrow /*@nogc*/ unittest {
|
||||
int cnt = 0;
|
||||
auto ret = asyncAwaitUninterruptible!(void delegate(int), (cb) { cnt++; cb(42); });
|
||||
auto ret = asyncAwaitUninterruptible!(void delegate(int) @safe nothrow, (cb) { cnt++; cb(42); });
|
||||
assert(ret[0] == 42);
|
||||
assert(cnt == 1);
|
||||
}
|
||||
|
@ -174,12 +179,12 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op
|
|||
Waitable!(
|
||||
(cb) { a++; cb(42); },
|
||||
(cb) { assert(false); },
|
||||
int
|
||||
void delegate(int) @safe nothrow
|
||||
) w1;
|
||||
Waitable!(
|
||||
(cb) { b++; },
|
||||
(cb) { c++; },
|
||||
int
|
||||
void delegate(int) @safe nothrow
|
||||
) w2;
|
||||
|
||||
asyncAwaitAny!false(w1, w2);
|
||||
|
@ -190,3 +195,34 @@ private struct ScopeGuard { @safe nothrow: void delegate() op; ~this() { if (op
|
|||
assert(w1.results[0] == 42 && w2.results[0] == 0);
|
||||
assert(a == 2 && b == 1 && c == 1);
|
||||
}
|
||||
|
||||
private string generateParamDecls(Fun)()
|
||||
{
|
||||
import std.format : format;
|
||||
import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple;
|
||||
|
||||
alias Types = ParameterTypeTuple!Fun;
|
||||
alias SClasses = ParameterStorageClassTuple!Fun;
|
||||
string ret;
|
||||
foreach (i, T; Types) {
|
||||
static if (i > 0) ret ~= ", ";
|
||||
static if (SClasses[i] & ParameterStorageClass.lazy_) ret ~= "lazy ";
|
||||
static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope ";
|
||||
static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out ";
|
||||
static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref ";
|
||||
ret ~= format("PTypes[%s] param_%s", i, i);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private string generateParamNames(Fun)()
|
||||
{
|
||||
import std.format : format;
|
||||
|
||||
string ret;
|
||||
foreach (i, T; ParameterTypeTuple!Fun) {
|
||||
static if (i > 0) ret ~= ", ";
|
||||
ret ~= format("param_%s", i);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue