Merge pull request #243 from vibe-d/parallel_pipe_improvement

Allow the buffer to grow for PipeMode.concurrent.
This commit is contained in:
Leonid Kramer 2021-01-12 14:52:49 +01:00 committed by GitHub
commit ff4c9851c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -77,7 +77,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
case PipeMode.concurrent: case PipeMode.concurrent:
{ {
enum bufcount = 4; enum bufcount = 4;
enum bufsize = 64*1024; enum bufsize = 4*1024*1024;
static struct ConcurrentPipeState { static struct ConcurrentPipeState {
InputStream source; InputStream source;
@ -94,6 +94,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
void readLoop() void readLoop()
{ {
// gradually increased depending on read speed
size_t rbsize = 64*1024;
while (true) { while (true) {
ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
if (remaining == 0) break; if (remaining == 0) break;
@ -101,9 +104,13 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
while (read_idx >= write_idx + buffers.length) while (read_idx >= write_idx + buffers.length)
evt.wait(); evt.wait();
size_t chunk = min(remaining, bufsize); size_t chunk = min(remaining, rbsize);
auto bi = read_idx % bufcount; auto bi = read_idx % bufcount;
auto tm = MonoTime.currTime;
source.read(buffers[bi][0 .. chunk]); source.read(buffers[bi][0 .. chunk]);
if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs)
rbsize *= 2;
if (nbytes != ulong.max) nbytes -= chunk; if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk; bytesWritten += chunk;
bufferFill[bi] = chunk; bufferFill[bi] = chunk;