Merge pull request #62 from FraMecca/master

TCPConnection.waitForDataAsync
This commit is contained in:
Sönke Ludwig 2018-02-26 20:33:53 +01:00 committed by GitHub
commit 6aa5cddf33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -586,6 +586,61 @@ mixin(tracer);
return m_context.readBuffer.length > 0; return m_context.readBuffer.length > 0;
} }
/** Waits asynchronously for new data to arrive.
This function can be used to detach the `TCPConnection` from a
running task while waiting for data, so that the associated memory
resources are available for other operations.
Note that `read_ready_callback` may be called from outside of a
task, so no blocking operations may be performed. Instead, an existing
task should be notified, or a new one started with `runTask`.
Params:
read_ready_callback = A callback taking a `bool` parameter that
signals the read-readiness of the connection
timeout = Optional timeout to limit the maximum wait time
Returns:
If the read readiness can be determined immediately, it will be
returned as WaitForDataAsyncStatus.sataAvailable` or
`WaitForDataAsyncStatus.noModeData` and the callback will not be
invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned
and the callback will be invoked once the status can be
determined or the specified timeout is reached.
*/
WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max)
if (is(typeof(read_ready_callback(true))))
{
mixin(tracer);
import vibe.core.core : setTimer;
if (!m_context)
return WaitForDataAsyncStatus.noMoreData;
if (m_context.readBuffer.length > 0)
return WaitForDataAsyncStatus.dataAvailable;
if (timeout <= 0.seconds) {
auto rs = waitForData(0.seconds);
return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData;
}
auto tm = setTimer(timeout, {
eventDriver.sockets.cancelRead(m_socket);
read_ready_callback(false);
});
eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once,
(sock, st, nb) {
tm.stop();
assert(m_context.readBuffer.length == 0);
m_context.readBuffer.putN(nb);
read_ready_callback(m_context.readBuffer.length > 0);
});
return WaitForDataAsyncStatus.waiting;
}
const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
void skip(ulong count) void skip(ulong count)
@ -684,6 +739,16 @@ mixin(tracer);
} }
} }
/** Represents possible return values for
TCPConnection.waitForDataAsync.
*/
enum WaitForDataAsyncStatus {
noMoreData,
dataAvailable,
waiting,
}
mixin validateConnectionStream!TCPConnection; mixin validateConnectionStream!TCPConnection;
private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout) private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout)