From f3f60ee87067bd5df21ec6ae62b22098498fe188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Oct 2020 16:57:15 +0200 Subject: [PATCH 1/7] Implement a concurrent mode for pipe(). This maximizes throughput for typical disk I/O loads. --- source/vibe/core/stream.d | 185 ++++++++++++++++++++++++++++++++------ 1 file changed, 157 insertions(+), 28 deletions(-) diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index ab34f6a..68e3385 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -35,44 +35,173 @@ public import eventcore.driver : IOMode; The actual number of bytes written is returned. If `nbytes` is given and not equal to `ulong.max`, íts value will be returned. */ -ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes) - @blocking @trusted +ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, + ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted if (isOutputStream!OutputStream && isInputStream!InputStream) { import vibe.internal.allocator : theAllocator, makeArray, dispose; + import vibe.core.core : runTask; + import vibe.core.sync : LocalManualEvent, createManualEvent; + import vibe.core.task : InterruptException; - scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); - scope (exit) theAllocator.dispose(buffer); + final switch (mode) { + case PipeMode.sequential: + { + scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); + scope (exit) theAllocator.dispose(buffer); - //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); - ulong ret = 0; - if (nbytes == ulong.max) { - while (!source.empty) { - size_t chunk = min(source.leastSize, buffer.length); - assert(chunk > 0, "leastSize returned zero for non-empty stream."); - //logTrace("read pipe chunk %d", chunk); - source.read(buffer[0 .. chunk]); - sink.write(buffer[0 .. chunk]); - ret += chunk; - } - } else { - while (nbytes > 0) { - size_t chunk = min(nbytes, buffer.length); - //logTrace("read pipe chunk %d", chunk); - source.read(buffer[0 .. chunk]); - sink.write(buffer[0 .. chunk]); - nbytes -= chunk; - ret += chunk; - } + ulong ret = 0; + + if (nbytes == ulong.max) { + while (!source.empty) { + size_t chunk = min(source.leastSize, buffer.length); + assert(chunk > 0, "leastSize returned zero for non-empty stream."); + //logTrace("read pipe chunk %d", chunk); + source.read(buffer[0 .. chunk]); + sink.write(buffer[0 .. chunk]); + ret += chunk; + } + } else { + while (nbytes > 0) { + size_t chunk = min(nbytes, buffer.length); + //logTrace("read pipe chunk %d", chunk); + source.read(buffer[0 .. chunk]); + sink.write(buffer[0 .. chunk]); + nbytes -= chunk; + ret += chunk; + } + } + + return ret; + } + case PipeMode.concurrent: + { + enum bufcount = 4; + enum bufsize = 64*1024; + + static struct ConcurrentPipeState { + InputStream source; + OutputStream sink; + ulong nbytes; + ubyte[][bufcount] buffers; + size_t[bufcount] buffer_fill; + // buffer index that is being read/written + size_t read_idx = 0, write_idx = 0; + Exception readex; + bool done = false; + LocalManualEvent evt; + size_t bytesWritten; + + void readLoop() + { + if (nbytes == ulong.max) { + while (!source.empty) { + while (read_idx >= write_idx + buffers.length) { + evt.wait(); + } + + size_t chunk = min(source.leastSize, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + bytesWritten += chunk; + buffer_fill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); + } + } else { + while (nbytes > 0) { + while (read_idx >= write_idx + buffers.length) + evt.wait(); + + size_t chunk = min(nbytes, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + nbytes -= chunk; + bytesWritten += chunk; + buffer_fill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); + } + } + } + + void writeLoop() + { + while (read_idx > write_idx || !done) { + while (read_idx <= write_idx) { + if (done) return; + evt.wait(); + } + + auto bi = write_idx % bufcount; + sink.write(buffers[bi][0 .. buffer_fill[bi]]); + + // notify reader that we just freed a buffer + if (write_idx++ <= read_idx - buffers.length) + evt.emit(); + } + } + } + + scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize); + scope (exit) theAllocator.dispose(buffer); + + ConcurrentPipeState state; + foreach (i; 0 .. bufcount) + state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)]; + swap(state.source, source); + swap(state.sink, sink); + state.nbytes = nbytes; + state.evt = createManualEvent(); + + auto reader = runTask(function(ConcurrentPipeState* state) nothrow { + try state.readLoop(); + catch (InterruptException e) {} + catch (Exception e) state.readex = e; + state.done = true; + state.evt.emit(); + }, &state); + + scope (failure) { + reader.interrupt(); + reader.joinUninterruptible(); + } + + // write loop + state.writeLoop(); + + reader.join(); + + if (state.readex) throw state.readex; + + return state.bytesWritten; + } } - return ret; } /// ditto -ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink) - @blocking +ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, + PipeMode mode = PipeMode.sequential) @blocking if (isOutputStream!OutputStream && isInputStream!InputStream) { - return pipe(source, sink, ulong.max); + return pipe(source, sink, ulong.max, mode); +} + +enum PipeMode { + /** Sequentially reads into a buffer and writes it out to the sink. + + This mode reads and writes to the same buffer in a ping-pong fashion. + The memory overhead is low, but if the source does not support + read-ahead buffering, or the sink does not have an internal buffer that + is drained asynchronously, the total throghput will be reduced. + */ + sequential, + + /** Uses a task to concurrently read and write. + + This mode maximizes throughput at the expense of setting up a task and + associated sycnronization. + */ + concurrent } From 730b42bf316ea6f5f0da653bfc3af5758a639988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Oct 2020 16:57:47 +0200 Subject: [PATCH 2/7] Add a test for pipe(). --- tests/vibe.core.stream.pipe.d | 186 ++++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 tests/vibe.core.stream.pipe.d diff --git a/tests/vibe.core.stream.pipe.d b/tests/vibe.core.stream.pipe.d new file mode 100644 index 0000000..e818605 --- /dev/null +++ b/tests/vibe.core.stream.pipe.d @@ -0,0 +1,186 @@ +/+ dub.sdl: + name "test" + dependency "vibe-core" path=".." ++/ +module test; + +import vibe.core.core; +import vibe.core.stream; +import std.algorithm : min; +import std.array : Appender, appender; +import std.exception; +import std.random; +import core.time : Duration, msecs; + +void main() +{ + auto datau = new uint[](2 * 1024 * 1024); + foreach (ref u; datau) + u = uniform!uint(); + auto data = cast(ubyte[])datau; + + test(data, 0, 0.msecs, 0, 0.msecs); + test(data, 32768, 1.msecs, 32768, 1.msecs); + + test(data, 32768, 0.msecs, 0, 0.msecs); + test(data, 0, 0.msecs, 32768, 0.msecs); + test(data, 32768, 0.msecs, 32768, 0.msecs); + + test(data, 1023*967, 10.msecs, 0, 0.msecs); + test(data, 0, 0.msecs, 1023*967, 20.msecs); + test(data, 1023*967, 10.msecs, 1023*967, 10.msecs); + + test(data, 1023*967, 10.msecs, 32768, 0.msecs); + test(data, 32768, 0.msecs, 1023*967, 10.msecs); + + test(data, 1023*967, 10.msecs, 65535, 0.msecs); + test(data, 65535, 0.msecs, 1023*967, 10.msecs); +} + +void test(ubyte[] data, ulong read_sleep_freq, Duration read_sleep, + ulong write_sleep_freq, Duration write_sleep) +{ + test(data, ulong.max, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, data.length * 2, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, data.length / 2, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 64 * 1024, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 64 * 1024 - 57, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 557, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); +} + +void test(ubyte[] data, ulong chunk_limit, ulong read_sleep_freq, + Duration read_sleep, ulong write_sleep_freq, Duration write_sleep) +{ + test(data, ulong.max, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 8 * 1024 * 1024, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 8 * 1024 * 1024 - 37, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); + test(data, 37, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); +} + +void test(ubyte[] data, ulong data_limit, ulong chunk_limit, ulong read_sleep_freq, + Duration read_sleep, ulong write_sleep_freq, Duration write_sleep) +{ + import std.traits : EnumMembers; + foreach (m; EnumMembers!PipeMode) + test(data, m, data_limit, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep); +} + +void test(ubyte[] data, PipeMode mode, ulong data_limit, ulong chunk_limit, + ulong read_sleep_freq, Duration read_sleep, ulong write_sleep_freq, + Duration write_sleep) +{ + import vibe.core.log; + logInfo("test RF=%s RS=%sms WF=%s WS=%sms CL=%s DL=%s M=%s", read_sleep_freq, read_sleep.total!"msecs", write_sleep_freq, write_sleep.total!"msecs", chunk_limit, data_limit, mode); + + auto input = TestInputStream(data, chunk_limit, read_sleep_freq, read_sleep); + auto output = TestOutputStream(write_sleep_freq, write_sleep); + auto datacmp = data[0 .. min(data.length, data_limit)]; + + input.pipe(output, data_limit, mode); + if (output.m_data.data != datacmp) { + logError("MISMATCH: %s b vs. %s b ([%(%s, %) ... %(%s, %)] vs. [%(%s, %) ... %(%s, %)])", + output.m_data.data.length, datacmp.length, + output.m_data.data[0 .. 6], output.m_data.data[$-6 .. $], + datacmp[0 .. 6], datacmp[$-6 .. $]); + assert(false); + } + + // avoid leaking memory due to false pointers + output.freeData(); +} + + +struct TestInputStream { + private { + const(ubyte)[] m_data; + ulong m_chunkLimit = size_t.max; + ulong m_sleepFrequency = 0; + Duration m_sleepAmount; + } + + this(const(ubyte)[] data, ulong chunk_limit, ulong sleep_frequency, Duration sleep_amount) + { + m_data = data; + m_chunkLimit = chunk_limit; + m_sleepFrequency = sleep_frequency; + m_sleepAmount = sleep_amount; + } + + @safe: + + @property bool empty() @blocking { return m_data.length == 0; } + + @property ulong leastSize() @blocking { return min(m_data.length, m_chunkLimit); } + + @property bool dataAvailableForRead() { assert(false); } + + const(ubyte)[] peek() { assert(false); } // currently not used by pipe() + + size_t read(scope ubyte[] dst, IOMode mode) + @blocking { + assert(mode == IOMode.all || mode == IOMode.once); + if (mode == IOMode.once) + dst = dst[0 .. min($, m_chunkLimit)]; + auto oldsleeps = m_sleepFrequency ? m_data.length / m_sleepFrequency : 0; + dst[] = m_data[0 .. dst.length]; + m_data = m_data[dst.length .. $]; + auto newsleeps = m_sleepFrequency ? m_data.length / m_sleepFrequency : 0; + if (oldsleeps != newsleeps) { + if (m_sleepAmount > 0.msecs) + sleep(m_sleepAmount * (oldsleeps - newsleeps)); + else yield(); + } + return dst.length; + } + void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } +} + +mixin validateInputStream!TestInputStream; + +struct TestOutputStream { + private { + Appender!(ubyte[]) m_data; + ulong m_sleepFrequency = 0; + Duration m_sleepAmount; + } + + this(ulong sleep_frequency, Duration sleep_amount) + { + m_data = appender!(ubyte[]); + m_data.reserve(2*1024*1024); + m_sleepFrequency = sleep_frequency; + m_sleepAmount = sleep_amount; + } + + void freeData() + { + import core.memory : GC; + auto d = m_data.data; + m_data.clear(); + GC.free(d.ptr); + } + + @safe: + + void finalize() @safe @blocking {} + void flush() @safe @blocking {} + + size_t write(in ubyte[] bytes, IOMode mode) + @safe @blocking { + assert(mode == IOMode.all); + + auto oldsleeps = m_sleepFrequency ? m_data.data.length / m_sleepFrequency : 0; + m_data.put(bytes); + auto newsleeps = m_sleepFrequency ? m_data.data.length / m_sleepFrequency : 0; + if (oldsleeps != newsleeps) { + if (m_sleepAmount > 0.msecs) + sleep(m_sleepAmount * (newsleeps - oldsleeps)); + else yield(); + } + return bytes.length; + } + void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } + void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } +} + +mixin validateOutputStream!TestOutputStream; From eb183d5ab2461eda4be03e1a9a081f25226c2acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Oct 2020 16:58:03 +0200 Subject: [PATCH 3/7] Use concurrent pipe() in copyFile(). --- source/vibe/core/file.d | 36 ++---------------------------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index d537127..430db57 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -242,7 +242,7 @@ void copyFile(NativePath from, NativePath to, bool overwrite = false) auto dst = openFile(to, FileMode.createTrunc); scope(exit) dst.close(); dst.truncate(src.size); - dst.write(src); + src.pipe(dst, PipeMode.concurrent); } // TODO: also retain creation time on windows @@ -651,7 +651,7 @@ struct FileStream { void write(InputStream)(InputStream stream, ulong nbytes = ulong.max) if (isInputStream!InputStream) { - writeDefault(this, stream, nbytes); + pipe(stream, this, nbytes, PipeMode.concurrent); } void flush() @@ -670,38 +670,6 @@ struct FileStream { mixin validateRandomAccessStream!FileStream; -private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, InputStream stream, ulong nbytes = ulong.max) - if (isOutputStream!OutputStream && isInputStream!InputStream) -{ - import vibe.internal.allocator : theAllocator, make, dispose; - import std.algorithm.comparison : min; - - static struct Buffer { ubyte[64*1024] bytes = void; } - auto bufferobj = () @trusted { return theAllocator.make!Buffer(); } (); - scope (exit) () @trusted { theAllocator.dispose(bufferobj); } (); - auto buffer = bufferobj.bytes[]; - - //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); - if (nbytes == ulong.max) { - while (!stream.empty) { - size_t chunk = min(stream.leastSize, buffer.length); - assert(chunk > 0, "leastSize returned zero for non-empty stream."); - //logTrace("read pipe chunk %d", chunk); - stream.read(buffer[0 .. chunk]); - dst.write(buffer[0 .. chunk]); - } - } else { - while (nbytes > 0) { - size_t chunk = min(nbytes, buffer.length); - //logTrace("read pipe chunk %d", chunk); - stream.read(buffer[0 .. chunk]); - dst.write(buffer[0 .. chunk]); - nbytes -= chunk; - } - } -} - - /** Interface for directory watcher implementations. From 2ea3a7ceb1e3bf63c158e16cf6dd497710474834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 21 Oct 2020 20:05:00 +0200 Subject: [PATCH 4/7] Refactor read loop to avoid code duplication. --- source/vibe/core/stream.d | 46 ++++++++++++++------------------------- 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index 68e3385..34c5e93 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -84,7 +84,7 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, OutputStream sink; ulong nbytes; ubyte[][bufcount] buffers; - size_t[bufcount] buffer_fill; + size_t[bufcount] bufferFill; // buffer index that is being read/written size_t read_idx = 0, write_idx = 0; Exception readex; @@ -94,34 +94,21 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, void readLoop() { - if (nbytes == ulong.max) { - while (!source.empty) { - while (read_idx >= write_idx + buffers.length) { - evt.wait(); - } + while (true) { + ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; + if (remaining == 0) break; - size_t chunk = min(source.leastSize, bufsize); - auto bi = read_idx % bufcount; - source.read(buffers[bi][0 .. chunk]); - bytesWritten += chunk; - buffer_fill[bi] = chunk; - if (write_idx >= read_idx++) - evt.emit(); - } - } else { - while (nbytes > 0) { - while (read_idx >= write_idx + buffers.length) - evt.wait(); + while (read_idx >= write_idx + buffers.length) + evt.wait(); - size_t chunk = min(nbytes, bufsize); - auto bi = read_idx % bufcount; - source.read(buffers[bi][0 .. chunk]); - nbytes -= chunk; - bytesWritten += chunk; - buffer_fill[bi] = chunk; - if (write_idx >= read_idx++) - evt.emit(); - } + size_t chunk = min(remaining, bufsize); + auto bi = read_idx % bufcount; + source.read(buffers[bi][0 .. chunk]); + if (nbytes != ulong.max) nbytes -= chunk; + bytesWritten += chunk; + bufferFill[bi] = chunk; + if (write_idx >= read_idx++) + evt.emit(); } } @@ -134,9 +121,9 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, } auto bi = write_idx % bufcount; - sink.write(buffers[bi][0 .. buffer_fill[bi]]); + sink.write(buffers[bi][0 .. bufferFill[bi]]); - // notify reader that we just freed a buffer + // notify reader that we just made a buffer available if (write_idx++ <= read_idx - buffers.length) evt.emit(); } @@ -167,7 +154,6 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, reader.joinUninterruptible(); } - // write loop state.writeLoop(); reader.join(); From 1ec93b5336090113e198c06b1e1da5aa5b5d9b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 24 Oct 2020 10:53:50 +0200 Subject: [PATCH 5/7] Fix test transition between different directory watchers. --- tests/dirwatcher.d | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/dirwatcher.d b/tests/dirwatcher.d index 2cf17a5..d8778df 100644 --- a/tests/dirwatcher.d +++ b/tests/dirwatcher.d @@ -73,7 +73,10 @@ void runTest() write(bar, null); assert(!watcher.readChanges(changes, 100.msecs)); remove(bar); + assert(!watcher.readChanges(changes, 1500.msecs)); + watcher = NativePath(dir).watchDirectory(Yes.recursive); + assert(!watcher.readChanges(changes, 1500.msecs)); write(foo, null); sleep(sleepTime); write(foo, [0, 1]); From 4168d40b2bec73ca7c9512b7f04a382b1506c73b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 23 Oct 2020 22:01:27 +0200 Subject: [PATCH 6/7] Update change log. --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 929cc0f..7320a6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +1.11.0 - 2020-10-24 +=================== + +- Added a concurrent mode to `pipe()` using `PipeMode.concurrent` to improve throughput in I/O limited situations - [pull #233][issue233] + +[issue233]: https://github.com/vibe-d/vibe-core/issues/233 + + 1.10.3 - 2020-10-15 =================== From 131f7f5e540727199a7246e028f9ba6a5010b5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 23 Oct 2020 22:01:44 +0200 Subject: [PATCH 7/7] Bump version to 1.11.0 --- source/vibe/core/core.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 0b2506e..10eb326 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1102,7 +1102,7 @@ void setTaskCreationCallback(TaskCreationCallback func) /** A version string representing the current vibe.d core version */ -enum vibeVersionString = "1.10.3"; +enum vibeVersionString = "1.11.0"; /**