Allow the buffer to grow for PipeMode.concurrent.

Reduces overhead for fast streams by increasing the buffer size up to 4MB, as long as the latency stays below 100ms.
This commit is contained in:
Sönke Ludwig 2021-01-12 14:40:36 +01:00
parent 0b6aa0d24c
commit 3e45456bf3

View file

@ -77,7 +77,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
case PipeMode.concurrent: case PipeMode.concurrent:
{ {
enum bufcount = 4; enum bufcount = 4;
enum bufsize = 64*1024; enum bufsize = 4*1024*1024;
static struct ConcurrentPipeState { static struct ConcurrentPipeState {
InputStream source; InputStream source;
@ -94,6 +94,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
void readLoop() void readLoop()
{ {
// gradually increased depending on read speed
size_t rbsize = 64*1024;
while (true) { while (true) {
ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
if (remaining == 0) break; if (remaining == 0) break;
@ -101,9 +104,13 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
while (read_idx >= write_idx + buffers.length) while (read_idx >= write_idx + buffers.length)
evt.wait(); evt.wait();
size_t chunk = min(remaining, bufsize); size_t chunk = min(remaining, rbsize);
auto bi = read_idx % bufcount; auto bi = read_idx % bufcount;
auto tm = MonoTime.currTime;
source.read(buffers[bi][0 .. chunk]); source.read(buffers[bi][0 .. chunk]);
if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs)
rbsize *= 2;
if (nbytes != ulong.max) nbytes -= chunk; if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk; bytesWritten += chunk;
bufferFill[bi] = chunk; bufferFill[bi] = chunk;