From 0d6ba62f513a809906e729a622deb1ba86415328 Mon Sep 17 00:00:00 2001 From: Francesco Mecca Date: Wed, 21 Feb 2018 22:17:18 +0000 Subject: [PATCH 1/5] added TCPConnection.waitForDataAsync --- source/vibe/core/net.d | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 7c5c56e..fdca9d9 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -586,6 +586,31 @@ mixin(tracer); return m_context.readBuffer.length > 0; } + void waitForDataAsync(void delegate(bool) @safe read_callback, Duration timeout = Duration.max) + { +mixin(tracer); + import vibe.core.core : runTask; + + if (!m_context) { + runTask(read_callback, false); + return; + } + if (m_context.readBuffer.length > 0) { + runTask(read_callback, true); + return; + } + + + auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; + bool cancelled; + IOStatus status; + size_t nbytes; + eventDriver.sockets.waitForData(m_socket, + (sock, st, nb) { if(st != IOStatus.ok) runTask(read_callback, false); + else runTask(read_callback, true); + }); + } + const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } void skip(ulong count) From 20e32cf32722f100fbdb280aefd68c86fd97a685 Mon Sep 17 00:00:00 2001 From: Francesco Mecca Date: Fri, 23 Feb 2018 00:27:04 +0000 Subject: [PATCH 2/5] added Timer and enum return to TCPConnection.waitForDataAsync --- source/vibe/core/net.d | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index fdca9d9..f29d4b8 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -586,29 +586,39 @@ mixin(tracer); return m_context.readBuffer.length > 0; } - void waitForDataAsync(void delegate(bool) @safe read_callback, Duration timeout = Duration.max) + enum WaitForDataAsyncStatus { + noMoreData, + dataAvailable, + waiting + } + + WaitForDataAsyncStatus waitForDataAsync(void delegate(bool) @safe read_callback, Duration timeout = Duration.max) { mixin(tracer); - import vibe.core.core : runTask; + import vibe.core.core : runTask, setTimer, createTimer; if (!m_context) { runTask(read_callback, false); - return; + return WaitForDataAsyncStatus.noMoreData; } if (m_context.readBuffer.length > 0) { runTask(read_callback, true); - return; + return WaitForDataAsyncStatus.dataAvailable; } + if (timeout <= 0.seconds) { + auto rs = waitForData(0.seconds); + auto mode = rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; + return mode; + } - auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; - bool cancelled; - IOStatus status; - size_t nbytes; - eventDriver.sockets.waitForData(m_socket, - (sock, st, nb) { if(st != IOStatus.ok) runTask(read_callback, false); + auto tm = setTimer(timeout, { eventDriver.sockets.cancelRead(m_socket); + runTask(read_callback, false); }); + eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once, + (sock, st, nb) { tm.stop(); if(st != IOStatus.ok) runTask(read_callback, false); else runTask(read_callback, true); }); + return WaitForDataAsyncStatus.waiting; } const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } From 99e2873cc01c2f66feb15a73d315ee48765b0c12 Mon Sep 17 00:00:00 2001 From: Francesco Mecca Date: Sat, 24 Feb 2018 21:28:35 +0100 Subject: [PATCH 3/5] assert and stronger condition on TCPConnection.waitForDataAsync assert and stronger condition on TCPConnection.waitForDataAsync --- source/vibe/core/net.d | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index f29d4b8..496b093 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -586,12 +586,6 @@ mixin(tracer); return m_context.readBuffer.length > 0; } - enum WaitForDataAsyncStatus { - noMoreData, - dataAvailable, - waiting - } - WaitForDataAsyncStatus waitForDataAsync(void delegate(bool) @safe read_callback, Duration timeout = Duration.max) { mixin(tracer); @@ -615,8 +609,9 @@ mixin(tracer); auto tm = setTimer(timeout, { eventDriver.sockets.cancelRead(m_socket); runTask(read_callback, false); }); eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once, - (sock, st, nb) { tm.stop(); if(st != IOStatus.ok) runTask(read_callback, false); - else runTask(read_callback, true); + (sock, st, nb) { tm.stop(); assert(m_context.readBuffer.length == 0); + m_context.readBuffer.putN(nb); + runTask(read_callback, m_context.readBuffer.length > 0); }); return WaitForDataAsyncStatus.waiting; } @@ -719,6 +714,16 @@ mixin(tracer); } } +/** Represents possible return values for + TCPConnection.waitForDataAsync. +*/ +enum WaitForDataAsyncStatus { + noMoreData, + dataAvailable, + waiting, +} + + mixin validateConnectionStream!TCPConnection; private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout) From e33cf567ecc1c21d5403f9567bdc9f673fcea9dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 26 Feb 2018 13:15:24 +0100 Subject: [PATCH 4/5] Change callback semantics of waitForDataAsync Also adds a documentation comment to specify the semantics. --- source/vibe/core/net.d | 57 ++++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 496b093..d8777fb 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -586,33 +586,58 @@ mixin(tracer); return m_context.readBuffer.length > 0; } - WaitForDataAsyncStatus waitForDataAsync(void delegate(bool) @safe read_callback, Duration timeout = Duration.max) + /** Waits asynchronously for new data to arrive. + + This function can be used to detach the `TCPConnection` from a + running task while waiting for data, so that the associated memory + resources are available for other operations. + + Note that `read_ready_callback` may be called from outside of a + task, so no blocking operations may be performed. Instead, an existing + task should be notified, or a new one started with `runTask`. + + Params: + read_ready_callback = A callback taking a `bool` parameter that + signals the read-readiness of the connection + timeout = Optional timeout to limit the maximum wait time + + Returns: + If the read readiness can be determined immediately, it will be + returned as WaitForDataAsyncStatus.sataAvailable` or + `WaitForDataAsyncStatus.noModeData` and the callback will not be + invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned + and the callback will be invoked once the status can be + determined or the specified timeout is reached. + */ + WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max) + if (is(typeof(CALLABLE(true))) { mixin(tracer); - import vibe.core.core : runTask, setTimer, createTimer; + import vibe.core.core : setTimer; - if (!m_context) { - runTask(read_callback, false); + if (!m_context) return WaitForDataAsyncStatus.noMoreData; - } - if (m_context.readBuffer.length > 0) { - runTask(read_callback, true); + + if (m_context.readBuffer.length > 0) return WaitForDataAsyncStatus.dataAvailable; - } if (timeout <= 0.seconds) { auto rs = waitForData(0.seconds); - auto mode = rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; - return mode; + return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; } - auto tm = setTimer(timeout, { eventDriver.sockets.cancelRead(m_socket); - runTask(read_callback, false); }); + auto tm = setTimer(timeout, { + eventDriver.sockets.cancelRead(m_socket); + read_ready_callback(false); + }); eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once, - (sock, st, nb) { tm.stop(); assert(m_context.readBuffer.length == 0); - m_context.readBuffer.putN(nb); - runTask(read_callback, m_context.readBuffer.length > 0); - }); + (sock, st, nb) { + tm.stop(); + assert(m_context.readBuffer.length == 0); + m_context.readBuffer.putN(nb); + read_ready_callback(m_context.readBuffer.length > 0); + }); + return WaitForDataAsyncStatus.waiting; } From 57d516a82b7b992798490ef72a8596e1d5844707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 26 Feb 2018 19:40:13 +0100 Subject: [PATCH 5/5] fixup --- source/vibe/core/net.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index d8777fb..cc842ad 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -610,7 +610,7 @@ mixin(tracer); determined or the specified timeout is reached. */ WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max) - if (is(typeof(CALLABLE(true))) + if (is(typeof(read_ready_callback(true)))) { mixin(tracer); import vibe.core.core : setTimer;