From f3f60ee87067bd5df21ec6ae62b22098498fe188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Oct 2020 16:57:15 +0200 Subject: [PATCH] Implement a concurrent mode for pipe(). This maximizes throughput for typical disk I/O loads. --- source/vibe/core/stream.d | 185 ++++++++++++++++++++++++++++++++------ 1 file changed, 157 insertions(+), 28 deletions(-) diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index ab34f6a..68e3385 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -35,44 +35,173 @@ public import eventcore.driver : IOMode; The actual number of bytes written is returned. If `nbytes` is given and not equal to `ulong.max`, íts value will be returned. */ -ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes) - @blocking @trusted +ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, + ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted if (isOutputStream!OutputStream && isInputStream!InputStream) { import vibe.internal.allocator : theAllocator, makeArray, dispose; + import vibe.core.core : runTask; + import vibe.core.sync : LocalManualEvent, createManualEvent; + import vibe.core.task : InterruptException; - scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); - scope (exit) theAllocator.dispose(buffer); + final switch (mode) { + case PipeMode.sequential: + { + scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); + scope (exit) theAllocator.dispose(buffer); - //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); - ulong ret = 0; - if (nbytes == ulong.max) { - while (!source.empty) { - size_t chunk = min(source.leastSize, buffer.length); - assert(chunk > 0, "leastSize returned zero for non-empty stream."); - //logTrace("read pipe chunk %d", chunk); - source.read(buffer[0 .. chunk]); - sink.write(buffer[0 .. chunk]); - ret += chunk; - } - } else { - while (nbytes > 0) { - size_t chunk = min(nbytes, buffer.length); - //logTrace("read pipe chunk %d", chunk); - source.read(buffer[0 .. chunk]); - sink.write(buffer[0 .. chunk]); - nbytes -= chunk; - ret += chunk; - } + ulong ret = 0; + + if (nbytes == ulong.max) { + while (!source.empty) { + size_t chunk = min(source.leastSize, buffer.length); + assert(chunk > 0, "leastSize returned zero for non-empty stream."); + //logTrace("read pipe chunk %d", chunk); + source.read(buffer[0 .. chunk]); + sink.write(buffer[0 .. chunk]); + ret += chunk; + } + } else { + while (nbytes > 0) { + size_t chunk = min(nbytes, buffer.length); + //logTrace("read pipe chunk %d", chunk); + source.read(buffer[0 .. chunk]); + sink.write(buffer[0 .. chunk]); + nbytes -= chunk; + ret += chunk; + } + } + + return ret; + } + case PipeMode.concurrent: + { + enum bufcount = 4; + enum bufsize = 64*1024; + + static struct ConcurrentPipeState { + InputStream source; + OutputStream sink; + ulong nbytes; + ubyte[][bufcount] buffers; + size_t[bufcount] buffer_fill; + // buffer index that is being read/written + size_t read_idx = 0, write_idx = 0; + Exception readex; + bool done = false; + LocalManualEvent evt; + size_t bytesWritten; + + void readLoop() + { + if (nbytes == ulong.max) { + while (!source.empty) { + while (read_idx >= write_idx + buffers.length) { + evt.wait(); + } + + size_t chunk = min(source.leastSize, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + bytesWritten += chunk; + buffer_fill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); + } + } else { + while (nbytes > 0) { + while (read_idx >= write_idx + buffers.length) + evt.wait(); + + size_t chunk = min(nbytes, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + nbytes -= chunk; + bytesWritten += chunk; + buffer_fill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); + } + } + } + + void writeLoop() + { + while (read_idx > write_idx || !done) { + while (read_idx <= write_idx) { + if (done) return; + evt.wait(); + } + + auto bi = write_idx % bufcount; + sink.write(buffers[bi][0 .. buffer_fill[bi]]); + + // notify reader that we just freed a buffer + if (write_idx++ <= read_idx - buffers.length) + evt.emit(); + } + } + } + + scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize); + scope (exit) theAllocator.dispose(buffer); + + ConcurrentPipeState state; + foreach (i; 0 .. bufcount) + state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)]; + swap(state.source, source); + swap(state.sink, sink); + state.nbytes = nbytes; + state.evt = createManualEvent(); + + auto reader = runTask(function(ConcurrentPipeState* state) nothrow { + try state.readLoop(); + catch (InterruptException e) {} + catch (Exception e) state.readex = e; + state.done = true; + state.evt.emit(); + }, &state); + + scope (failure) { + reader.interrupt(); + reader.joinUninterruptible(); + } + + // write loop + state.writeLoop(); + + reader.join(); + + if (state.readex) throw state.readex; + + return state.bytesWritten; + } } - return ret; } /// ditto -ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink) - @blocking +ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, + PipeMode mode = PipeMode.sequential) @blocking if (isOutputStream!OutputStream && isInputStream!InputStream) { - return pipe(source, sink, ulong.max); + return pipe(source, sink, ulong.max, mode); +} + +enum PipeMode { + /** Sequentially reads into a buffer and writes it out to the sink. + + This mode reads and writes to the same buffer in a ping-pong fashion. + The memory overhead is low, but if the source does not support + read-ahead buffering, or the sink does not have an internal buffer that + is drained asynchronously, the total throghput will be reduced. + */ + sequential, + + /** Uses a task to concurrently read and write. + + This mode maximizes throughput at the expense of setting up a task and + associated sycnronization. + */ + concurrent }