Refactor read loop to avoid code duplication.

This commit is contained in:
Sönke Ludwig 2020-10-21 20:05:00 +02:00
parent eb183d5ab2
commit 2ea3a7ceb1

View file

@ -84,7 +84,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
OutputStream sink; OutputStream sink;
ulong nbytes; ulong nbytes;
ubyte[][bufcount] buffers; ubyte[][bufcount] buffers;
size_t[bufcount] buffer_fill; size_t[bufcount] bufferFill;
// buffer index that is being read/written // buffer index that is being read/written
size_t read_idx = 0, write_idx = 0; size_t read_idx = 0, write_idx = 0;
Exception readex; Exception readex;
@ -94,36 +94,23 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
void readLoop() void readLoop()
{ {
if (nbytes == ulong.max) { while (true) {
while (!source.empty) { ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
while (read_idx >= write_idx + buffers.length) { if (remaining == 0) break;
evt.wait();
}
size_t chunk = min(source.leastSize, bufsize);
auto bi = read_idx % bufcount;
source.read(buffers[bi][0 .. chunk]);
bytesWritten += chunk;
buffer_fill[bi] = chunk;
if (write_idx >= read_idx++)
evt.emit();
}
} else {
while (nbytes > 0) {
while (read_idx >= write_idx + buffers.length) while (read_idx >= write_idx + buffers.length)
evt.wait(); evt.wait();
size_t chunk = min(nbytes, bufsize); size_t chunk = min(remaining, bufsize);
auto bi = read_idx % bufcount; auto bi = read_idx % bufcount;
source.read(buffers[bi][0 .. chunk]); source.read(buffers[bi][0 .. chunk]);
nbytes -= chunk; if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk; bytesWritten += chunk;
buffer_fill[bi] = chunk; bufferFill[bi] = chunk;
if (write_idx >= read_idx++) if (write_idx >= read_idx++)
evt.emit(); evt.emit();
} }
} }
}
void writeLoop() void writeLoop()
{ {
@ -134,9 +121,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
} }
auto bi = write_idx % bufcount; auto bi = write_idx % bufcount;
sink.write(buffers[bi][0 .. buffer_fill[bi]]); sink.write(buffers[bi][0 .. bufferFill[bi]]);
// notify reader that we just freed a buffer // notify reader that we just made a buffer available
if (write_idx++ <= read_idx - buffers.length) if (write_idx++ <= read_idx - buffers.length)
evt.emit(); evt.emit();
} }
@ -167,7 +154,6 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
reader.joinUninterruptible(); reader.joinUninterruptible();
} }
// write loop
state.writeLoop(); state.writeLoop();
reader.join(); reader.join();