From 55e786649744a57c584af4f77652e1537886b9af Mon Sep 17 00:00:00 2001 From: Boris-Barboris Date: Sat, 11 Nov 2017 01:24:42 +0000 Subject: [PATCH 1/3] implement tcp connect timeout --- source/vibe/core/net.d | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 7ee77d6..b849820 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -156,7 +156,8 @@ TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, /** Establishes a connection to the given host/port. */ -TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0) +TCPConnection connectTCP(string host, ushort port, string bind_interface = null, + ushort bind_port = 0, Duration timeout = Duration.max) { NetworkAddress addr = resolveHost(host); addr.port = port; @@ -173,10 +174,11 @@ TCPConnection connectTCP(string host, ushort port, string bind_interface = null, if (addr.family != AddressFamily.UNIX) bind_address.port = bind_port; - return connectTCP(addr, bind_address); + return connectTCP(addr, bind_address, timeout); } /// ditto -TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress) +TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress, + Duration timeout = Duration.max) { import std.conv : to; @@ -193,11 +195,15 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); - // FIXME: make this interruptible auto result = asyncAwaitUninterruptible!(ConnectCallback, - cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) - //cb => eventDriver.sockets.cancelConnect(cb) - ); + cb => eventDriver.sockets.connectStream(uaddr, baddr, cb), + (ConnectCallback cb, StreamSocketFD sock_fd) { + eventDriver.sockets.cancelConnectStream(sock_fd); + } + )(timeout); + + if (result[1] == ConnectStatus.cancelled) + result[1] = ConnectStatus.timeout; enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); return TCPConnection(result[0], uaddr); From 6634cbc64543cc100a5c545600b026060f309ffa Mon Sep 17 00:00:00 2001 From: Boris-Barboris Date: Sat, 11 Nov 2017 15:45:44 +0000 Subject: [PATCH 2/3] switch to asyncAwait --- source/vibe/core/net.d | 10 +++++----- source/vibe/internal/async.d | 26 ++++++++++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index b849820..f188187 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -195,18 +195,18 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); - auto result = asyncAwaitUninterruptible!(ConnectCallback, + auto result = asyncAwait!(ConnectCallback, cb => eventDriver.sockets.connectStream(uaddr, baddr, cb), (ConnectCallback cb, StreamSocketFD sock_fd) { eventDriver.sockets.cancelConnectStream(sock_fd); } )(timeout); - if (result[1] == ConnectStatus.cancelled) - result[1] = ConnectStatus.timeout; - enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); + enforce(result.completed, "Failed to connect to " ~ addr.toString() ~ + ": timeout"); + enforce(result.results[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result.results[1].to!string); - return TCPConnection(result[0], uaddr); + return TCPConnection(result.results[0], uaddr); } (); } diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index f151a6d..473ea08 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -1,6 +1,6 @@ module vibe.internal.async; -import std.traits : ParameterTypeTuple; +import std.traits : ParameterTypeTuple, ReturnType; import std.typecons : tuple; import vibe.core.core : hibernate, switchToTask; import vibe.core.task : InterruptException, Task; @@ -20,14 +20,23 @@ auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string f { static struct R { bool completed = true; - typeof(waitable.results) results; + ParameterTypeTuple!Callback results; } R ret; - alias waitable = Waitable!(Callback, - action, - (cb) { ret.completed = false; cancel(cb); }, - (ParameterTypeTuple!Callback r) { ret.results = r; } - ); + static if (is(ReturnType!action == void)) { + alias waitable = Waitable!(Callback, + action, + (cb) { ret.completed = false; cancel(cb); }, + (ParameterTypeTuple!Callback r) { ret.results = r; } + ); + } + else { + alias waitable = Waitable!(Callback, + action, + (cb, waitres) { ret.completed = false; cancel(cb, waitres); }, + (ParameterTypeTuple!Callback r) { ret.results = r; } + ); + } asyncAwaitAny!(true, waitable)(timeout, func); return ret; } @@ -60,7 +69,8 @@ template Waitable(CB, alias WAIT, alias CANCEL, alias DONE) "CANCEL must be callable with a parameter of type "~CB.stringof); else static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))), - "CANCEL must be callable with parameters ("~CB.stringof~", "~typeof(WAIT(CB.init)).stringof~")"); + "CANCEL must be callable with parameters "~CB.stringof~ + " and "~typeof(WAIT(CB.init)).stringof); static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))), "DONE must be callable with types "~ParameterTypeTuple!CB.stringof); From 2625397f91259aeccb1c13e56e8e078a781e2502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 22 Nov 2017 16:52:46 +0100 Subject: [PATCH 3/3] Use asyncAwaitAny instead of asyncAwait to work around linker error. --- source/vibe/core/net.d | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index f188187..e86855b 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -195,18 +195,26 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); - auto result = asyncAwait!(ConnectCallback, + bool cancelled; + StreamSocketFD sock; + ConnectStatus status; + + alias waiter = Waitable!(ConnectCallback, cb => eventDriver.sockets.connectStream(uaddr, baddr, cb), (ConnectCallback cb, StreamSocketFD sock_fd) { + cancelled = true; eventDriver.sockets.cancelConnectStream(sock_fd); - } - )(timeout); + }, + (fd, st) { sock = fd; status = st; } + ); - enforce(result.completed, "Failed to connect to " ~ addr.toString() ~ + asyncAwaitAny!(true, waiter)(timeout); + + enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~ ": timeout"); - enforce(result.results[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result.results[1].to!string); + enforce(status == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~status.to!string); - return TCPConnection(result.results[0], uaddr); + return TCPConnection(sock, uaddr); } (); }