Implement a concurrent mode for pipe().

This maximizes throughput for typical disk I/O loads.
This commit is contained in:
Sönke Ludwig 2020-10-21 16:57:15 +02:00
parent a736481467
commit f3f60ee870

View file

@ -35,44 +35,173 @@ public import eventcore.driver : IOMode;
The actual number of bytes written is returned. If `nbytes` is given
and not equal to `ulong.max`, íts value will be returned.
*/
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes)
@blocking @trusted
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, makeArray, dispose;
import vibe.core.core : runTask;
import vibe.core.sync : LocalManualEvent, createManualEvent;
import vibe.core.task : InterruptException;
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope (exit) theAllocator.dispose(buffer);
final switch (mode) {
case PipeMode.sequential:
{
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope (exit) theAllocator.dispose(buffer);
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
ulong ret = 0;
if (nbytes == ulong.max) {
while (!source.empty) {
size_t chunk = min(source.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
ret += chunk;
}
} else {
while (nbytes > 0) {
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
nbytes -= chunk;
ret += chunk;
}
ulong ret = 0;
if (nbytes == ulong.max) {
while (!source.empty) {
size_t chunk = min(source.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
ret += chunk;
}
} else {
while (nbytes > 0) {
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
nbytes -= chunk;
ret += chunk;
}
}
return ret;
}
case PipeMode.concurrent:
{
enum bufcount = 4;
enum bufsize = 64*1024;
static struct ConcurrentPipeState {
InputStream source;
OutputStream sink;
ulong nbytes;
ubyte[][bufcount] buffers;
size_t[bufcount] buffer_fill;
// buffer index that is being read/written
size_t read_idx = 0, write_idx = 0;
Exception readex;
bool done = false;
LocalManualEvent evt;
size_t bytesWritten;
void readLoop()
{
if (nbytes == ulong.max) {
while (!source.empty) {
while (read_idx >= write_idx + buffers.length) {
evt.wait();
}
size_t chunk = min(source.leastSize, bufsize);
auto bi = read_idx % bufcount;
source.read(buffers[bi][0 .. chunk]);
bytesWritten += chunk;
buffer_fill[bi] = chunk;
if (write_idx >= read_idx++)
evt.emit();
}
} else {
while (nbytes > 0) {
while (read_idx >= write_idx + buffers.length)
evt.wait();
size_t chunk = min(nbytes, bufsize);
auto bi = read_idx % bufcount;
source.read(buffers[bi][0 .. chunk]);
nbytes -= chunk;
bytesWritten += chunk;
buffer_fill[bi] = chunk;
if (write_idx >= read_idx++)
evt.emit();
}
}
}
void writeLoop()
{
while (read_idx > write_idx || !done) {
while (read_idx <= write_idx) {
if (done) return;
evt.wait();
}
auto bi = write_idx % bufcount;
sink.write(buffers[bi][0 .. buffer_fill[bi]]);
// notify reader that we just freed a buffer
if (write_idx++ <= read_idx - buffers.length)
evt.emit();
}
}
}
scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize);
scope (exit) theAllocator.dispose(buffer);
ConcurrentPipeState state;
foreach (i; 0 .. bufcount)
state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)];
swap(state.source, source);
swap(state.sink, sink);
state.nbytes = nbytes;
state.evt = createManualEvent();
auto reader = runTask(function(ConcurrentPipeState* state) nothrow {
try state.readLoop();
catch (InterruptException e) {}
catch (Exception e) state.readex = e;
state.done = true;
state.evt.emit();
}, &state);
scope (failure) {
reader.interrupt();
reader.joinUninterruptible();
}
// write loop
state.writeLoop();
reader.join();
if (state.readex) throw state.readex;
return state.bytesWritten;
}
}
return ret;
}
/// ditto
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink)
@blocking
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
PipeMode mode = PipeMode.sequential) @blocking
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
return pipe(source, sink, ulong.max);
return pipe(source, sink, ulong.max, mode);
}
enum PipeMode {
/** Sequentially reads into a buffer and writes it out to the sink.
This mode reads and writes to the same buffer in a ping-pong fashion.
The memory overhead is low, but if the source does not support
read-ahead buffering, or the sink does not have an internal buffer that
is drained asynchronously, the total throghput will be reduced.
*/
sequential,
/** Uses a task to concurrently read and write.
This mode maximizes throughput at the expense of setting up a task and
associated sycnronization.
*/
concurrent
}