diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index 68e3385..34c5e93 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -84,7 +84,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, OutputStream sink; ulong nbytes; ubyte[][bufcount] buffers; - size_t[bufcount] buffer_fill; + size_t[bufcount] bufferFill; // buffer index that is being read/written size_t read_idx = 0, write_idx = 0; Exception readex; @@ -94,34 +94,21 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, void readLoop() { - if (nbytes == ulong.max) { - while (!source.empty) { - while (read_idx >= write_idx + buffers.length) { - evt.wait(); - } + while (true) { + ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; + if (remaining == 0) break; - size_t chunk = min(source.leastSize, bufsize); - auto bi = read_idx % bufcount; - source.read(buffers[bi][0 .. chunk]); - bytesWritten += chunk; - buffer_fill[bi] = chunk; - if (write_idx >= read_idx++) - evt.emit(); - } - } else { - while (nbytes > 0) { - while (read_idx >= write_idx + buffers.length) - evt.wait(); + while (read_idx >= write_idx + buffers.length) + evt.wait(); - size_t chunk = min(nbytes, bufsize); - auto bi = read_idx % bufcount; - source.read(buffers[bi][0 .. chunk]); - nbytes -= chunk; - bytesWritten += chunk; - buffer_fill[bi] = chunk; - if (write_idx >= read_idx++) - evt.emit(); - } + size_t chunk = min(remaining, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + if (nbytes != ulong.max) nbytes -= chunk; + bytesWritten += chunk; + bufferFill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); } } @@ -134,9 +121,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, } auto bi = write_idx % bufcount; - sink.write(buffers[bi][0 .. buffer_fill[bi]]); + sink.write(buffers[bi][0 .. bufferFill[bi]]); - // notify reader that we just freed a buffer + // notify reader that we just made a buffer available if (write_idx++ <= read_idx - buffers.length) evt.emit(); } @@ -167,7 +154,6 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, reader.joinUninterruptible(); } - // write loop state.writeLoop(); reader.join();