Merge pull request #44 from vibe-d/connect_timeout2

Rebased version of "tcp connect timeout" with Optlink fix
This commit is contained in:
Sönke Ludwig 2017-11-25 21:50:38 +01:00 committed by GitHub
commit d0d8b78568
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 18 deletions

View file

@ -156,7 +156,8 @@ TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback,
/** /**
Establishes a connection to the given host/port. Establishes a connection to the given host/port.
*/ */
TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0) TCPConnection connectTCP(string host, ushort port, string bind_interface = null,
ushort bind_port = 0, Duration timeout = Duration.max)
{ {
NetworkAddress addr = resolveHost(host); NetworkAddress addr = resolveHost(host);
addr.port = port; addr.port = port;
@ -173,10 +174,11 @@ TCPConnection connectTCP(string host, ushort port, string bind_interface = null,
if (addr.family != AddressFamily.UNIX) if (addr.family != AddressFamily.UNIX)
bind_address.port = bind_port; bind_address.port = bind_port;
return connectTCP(addr, bind_address); return connectTCP(addr, bind_address, timeout);
} }
/// ditto /// ditto
TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress) TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress,
Duration timeout = Duration.max)
{ {
import std.conv : to; import std.conv : to;
@ -193,14 +195,26 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA
scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen);
scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
// FIXME: make this interruptible bool cancelled;
auto result = asyncAwaitUninterruptible!(ConnectCallback, StreamSocketFD sock;
cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) ConnectStatus status;
//cb => eventDriver.sockets.cancelConnect(cb)
);
enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string);
return TCPConnection(result[0], uaddr); alias waiter = Waitable!(ConnectCallback,
cb => eventDriver.sockets.connectStream(uaddr, baddr, cb),
(ConnectCallback cb, StreamSocketFD sock_fd) {
cancelled = true;
eventDriver.sockets.cancelConnectStream(sock_fd);
},
(fd, st) { sock = fd; status = st; }
);
asyncAwaitAny!(true, waiter)(timeout);
enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~
": timeout");
enforce(status == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~status.to!string);
return TCPConnection(sock, uaddr);
} (); } ();
} }

View file

@ -1,6 +1,6 @@
module vibe.internal.async; module vibe.internal.async;
import std.traits : ParameterTypeTuple; import std.traits : ParameterTypeTuple, ReturnType;
import std.typecons : tuple; import std.typecons : tuple;
import vibe.core.core : hibernate, switchToTask; import vibe.core.core : hibernate, switchToTask;
import vibe.core.task : InterruptException, Task; import vibe.core.task : InterruptException, Task;
@ -20,14 +20,23 @@ auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string f
{ {
static struct R { static struct R {
bool completed = true; bool completed = true;
typeof(waitable.results) results; ParameterTypeTuple!Callback results;
} }
R ret; R ret;
alias waitable = Waitable!(Callback, static if (is(ReturnType!action == void)) {
action, alias waitable = Waitable!(Callback,
(cb) { ret.completed = false; cancel(cb); }, action,
(ParameterTypeTuple!Callback r) { ret.results = r; } (cb) { ret.completed = false; cancel(cb); },
); (ParameterTypeTuple!Callback r) { ret.results = r; }
);
}
else {
alias waitable = Waitable!(Callback,
action,
(cb, waitres) { ret.completed = false; cancel(cb, waitres); },
(ParameterTypeTuple!Callback r) { ret.results = r; }
);
}
asyncAwaitAny!(true, waitable)(timeout, func); asyncAwaitAny!(true, waitable)(timeout, func);
return ret; return ret;
} }
@ -60,7 +69,8 @@ template Waitable(CB, alias WAIT, alias CANCEL, alias DONE)
"CANCEL must be callable with a parameter of type "~CB.stringof); "CANCEL must be callable with a parameter of type "~CB.stringof);
else else
static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))), static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))),
"CANCEL must be callable with parameters ("~CB.stringof~", "~typeof(WAIT(CB.init)).stringof~")"); "CANCEL must be callable with parameters "~CB.stringof~
" and "~typeof(WAIT(CB.init)).stringof);
static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))), static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))),
"DONE must be callable with types "~ParameterTypeTuple!CB.stringof); "DONE must be callable with types "~ParameterTypeTuple!CB.stringof);