From 3e45456bf326574a7f9a61ca635d385224923be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 12 Jan 2021 14:40:36 +0100 Subject: [PATCH] Allow the buffer to grow for PipeMode.concurrent. Reduces overhead for fast streams by increasing the buffer size up to 4MB, as long as the latency stays below 100ms. --- source/vibe/core/stream.d | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index 34c5e93..31d2069 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -77,7 +77,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, case PipeMode.concurrent: { enum bufcount = 4; - enum bufsize = 64*1024; + enum bufsize = 4*1024*1024; static struct ConcurrentPipeState { InputStream source; @@ -94,6 +94,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, void readLoop() { + // gradually increased depending on read speed + size_t rbsize = 64*1024; + while (true) { ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; if (remaining == 0) break; @@ -101,9 +104,13 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, while (read_idx >= write_idx + buffers.length) evt.wait(); - size_t chunk = min(remaining, bufsize); + size_t chunk = min(remaining, rbsize); auto bi = read_idx % bufcount; + + auto tm = MonoTime.currTime; source.read(buffers[bi][0 .. chunk]); + if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs) + rbsize *= 2; if (nbytes != ulong.max) nbytes -= chunk; bytesWritten += chunk; bufferFill[bi] = chunk;