Merge pull request #233 from vibe-d/concurrent_pipe

Add a concurrent pipe() mode
This commit is contained in:
Sönke Ludwig 2020-10-24 16:38:18 +02:00 committed by GitHub
commit 65b921cc65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 343 additions and 63 deletions

View file

@ -1,3 +1,11 @@
1.11.0 - 2020-10-24
===================
- Added a concurrent mode to `pipe()` using `PipeMode.concurrent` to improve throughput in I/O limited situations - [pull #233][issue233]
[issue233]: https://github.com/vibe-d/vibe-core/issues/233
1.10.3 - 2020-10-15
===================

View file

@ -1102,7 +1102,7 @@ void setTaskCreationCallback(TaskCreationCallback func)
/**
A version string representing the current vibe.d core version
*/
enum vibeVersionString = "1.10.3";
enum vibeVersionString = "1.11.0";
/**

View file

@ -242,7 +242,7 @@ void copyFile(NativePath from, NativePath to, bool overwrite = false)
auto dst = openFile(to, FileMode.createTrunc);
scope(exit) dst.close();
dst.truncate(src.size);
dst.write(src);
src.pipe(dst, PipeMode.concurrent);
}
// TODO: also retain creation time on windows
@ -651,7 +651,7 @@ struct FileStream {
void write(InputStream)(InputStream stream, ulong nbytes = ulong.max)
if (isInputStream!InputStream)
{
writeDefault(this, stream, nbytes);
pipe(stream, this, nbytes, PipeMode.concurrent);
}
void flush()
@ -670,38 +670,6 @@ struct FileStream {
mixin validateRandomAccessStream!FileStream;
private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, InputStream stream, ulong nbytes = ulong.max)
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, make, dispose;
import std.algorithm.comparison : min;
static struct Buffer { ubyte[64*1024] bytes = void; }
auto bufferobj = () @trusted { return theAllocator.make!Buffer(); } ();
scope (exit) () @trusted { theAllocator.dispose(bufferobj); } ();
auto buffer = bufferobj.bytes[];
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if (nbytes == ulong.max) {
while (!stream.empty) {
size_t chunk = min(stream.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
dst.write(buffer[0 .. chunk]);
}
} else {
while (nbytes > 0) {
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
dst.write(buffer[0 .. chunk]);
nbytes -= chunk;
}
}
}
/**
Interface for directory watcher implementations.

View file

@ -35,17 +35,23 @@ public import eventcore.driver : IOMode;
The actual number of bytes written is returned. If `nbytes` is given
and not equal to `ulong.max`, íts value will be returned.
*/
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes)
@blocking @trusted
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, makeArray, dispose;
import vibe.core.core : runTask;
import vibe.core.sync : LocalManualEvent, createManualEvent;
import vibe.core.task : InterruptException;
final switch (mode) {
case PipeMode.sequential:
{
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope (exit) theAllocator.dispose(buffer);
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
ulong ret = 0;
if (nbytes == ulong.max) {
while (!source.empty) {
size_t chunk = min(source.leastSize, buffer.length);
@ -65,14 +71,123 @@ ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulo
ret += chunk;
}
}
return ret;
}
case PipeMode.concurrent:
{
enum bufcount = 4;
enum bufsize = 64*1024;
static struct ConcurrentPipeState {
InputStream source;
OutputStream sink;
ulong nbytes;
ubyte[][bufcount] buffers;
size_t[bufcount] bufferFill;
// buffer index that is being read/written
size_t read_idx = 0, write_idx = 0;
Exception readex;
bool done = false;
LocalManualEvent evt;
size_t bytesWritten;
void readLoop()
{
while (true) {
ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
if (remaining == 0) break;
while (read_idx >= write_idx + buffers.length)
evt.wait();
size_t chunk = min(remaining, bufsize);
auto bi = read_idx % bufcount;
source.read(buffers[bi][0 .. chunk]);
if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk;
bufferFill[bi] = chunk;
if (write_idx >= read_idx++)
evt.emit();
}
}
void writeLoop()
{
while (read_idx > write_idx || !done) {
while (read_idx <= write_idx) {
if (done) return;
evt.wait();
}
auto bi = write_idx % bufcount;
sink.write(buffers[bi][0 .. bufferFill[bi]]);
// notify reader that we just made a buffer available
if (write_idx++ <= read_idx - buffers.length)
evt.emit();
}
}
}
scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize);
scope (exit) theAllocator.dispose(buffer);
ConcurrentPipeState state;
foreach (i; 0 .. bufcount)
state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)];
swap(state.source, source);
swap(state.sink, sink);
state.nbytes = nbytes;
state.evt = createManualEvent();
auto reader = runTask(function(ConcurrentPipeState* state) nothrow {
try state.readLoop();
catch (InterruptException e) {}
catch (Exception e) state.readex = e;
state.done = true;
state.evt.emit();
}, &state);
scope (failure) {
reader.interrupt();
reader.joinUninterruptible();
}
state.writeLoop();
reader.join();
if (state.readex) throw state.readex;
return state.bytesWritten;
}
}
}
/// ditto
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink)
@blocking
ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
PipeMode mode = PipeMode.sequential) @blocking
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
return pipe(source, sink, ulong.max);
return pipe(source, sink, ulong.max, mode);
}
enum PipeMode {
/** Sequentially reads into a buffer and writes it out to the sink.
This mode reads and writes to the same buffer in a ping-pong fashion.
The memory overhead is low, but if the source does not support
read-ahead buffering, or the sink does not have an internal buffer that
is drained asynchronously, the total throghput will be reduced.
*/
sequential,
/** Uses a task to concurrently read and write.
This mode maximizes throughput at the expense of setting up a task and
associated sycnronization.
*/
concurrent
}

View file

@ -73,7 +73,10 @@ void runTest()
write(bar, null);
assert(!watcher.readChanges(changes, 100.msecs));
remove(bar);
assert(!watcher.readChanges(changes, 1500.msecs));
watcher = NativePath(dir).watchDirectory(Yes.recursive);
assert(!watcher.readChanges(changes, 1500.msecs));
write(foo, null);
sleep(sleepTime);
write(foo, [0, 1]);

View file

@ -0,0 +1,186 @@
/+ dub.sdl:
name "test"
dependency "vibe-core" path=".."
+/
module test;
import vibe.core.core;
import vibe.core.stream;
import std.algorithm : min;
import std.array : Appender, appender;
import std.exception;
import std.random;
import core.time : Duration, msecs;
void main()
{
auto datau = new uint[](2 * 1024 * 1024);
foreach (ref u; datau)
u = uniform!uint();
auto data = cast(ubyte[])datau;
test(data, 0, 0.msecs, 0, 0.msecs);
test(data, 32768, 1.msecs, 32768, 1.msecs);
test(data, 32768, 0.msecs, 0, 0.msecs);
test(data, 0, 0.msecs, 32768, 0.msecs);
test(data, 32768, 0.msecs, 32768, 0.msecs);
test(data, 1023*967, 10.msecs, 0, 0.msecs);
test(data, 0, 0.msecs, 1023*967, 20.msecs);
test(data, 1023*967, 10.msecs, 1023*967, 10.msecs);
test(data, 1023*967, 10.msecs, 32768, 0.msecs);
test(data, 32768, 0.msecs, 1023*967, 10.msecs);
test(data, 1023*967, 10.msecs, 65535, 0.msecs);
test(data, 65535, 0.msecs, 1023*967, 10.msecs);
}
void test(ubyte[] data, ulong read_sleep_freq, Duration read_sleep,
ulong write_sleep_freq, Duration write_sleep)
{
test(data, ulong.max, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, data.length * 2, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, data.length / 2, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 64 * 1024, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 64 * 1024 - 57, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 557, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
}
void test(ubyte[] data, ulong chunk_limit, ulong read_sleep_freq,
Duration read_sleep, ulong write_sleep_freq, Duration write_sleep)
{
test(data, ulong.max, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 8 * 1024 * 1024, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 8 * 1024 * 1024 - 37, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
test(data, 37, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
}
void test(ubyte[] data, ulong data_limit, ulong chunk_limit, ulong read_sleep_freq,
Duration read_sleep, ulong write_sleep_freq, Duration write_sleep)
{
import std.traits : EnumMembers;
foreach (m; EnumMembers!PipeMode)
test(data, m, data_limit, chunk_limit, read_sleep_freq, read_sleep, write_sleep_freq, write_sleep);
}
void test(ubyte[] data, PipeMode mode, ulong data_limit, ulong chunk_limit,
ulong read_sleep_freq, Duration read_sleep, ulong write_sleep_freq,
Duration write_sleep)
{
import vibe.core.log;
logInfo("test RF=%s RS=%sms WF=%s WS=%sms CL=%s DL=%s M=%s", read_sleep_freq, read_sleep.total!"msecs", write_sleep_freq, write_sleep.total!"msecs", chunk_limit, data_limit, mode);
auto input = TestInputStream(data, chunk_limit, read_sleep_freq, read_sleep);
auto output = TestOutputStream(write_sleep_freq, write_sleep);
auto datacmp = data[0 .. min(data.length, data_limit)];
input.pipe(output, data_limit, mode);
if (output.m_data.data != datacmp) {
logError("MISMATCH: %s b vs. %s b ([%(%s, %) ... %(%s, %)] vs. [%(%s, %) ... %(%s, %)])",
output.m_data.data.length, datacmp.length,
output.m_data.data[0 .. 6], output.m_data.data[$-6 .. $],
datacmp[0 .. 6], datacmp[$-6 .. $]);
assert(false);
}
// avoid leaking memory due to false pointers
output.freeData();
}
struct TestInputStream {
private {
const(ubyte)[] m_data;
ulong m_chunkLimit = size_t.max;
ulong m_sleepFrequency = 0;
Duration m_sleepAmount;
}
this(const(ubyte)[] data, ulong chunk_limit, ulong sleep_frequency, Duration sleep_amount)
{
m_data = data;
m_chunkLimit = chunk_limit;
m_sleepFrequency = sleep_frequency;
m_sleepAmount = sleep_amount;
}
@safe:
@property bool empty() @blocking { return m_data.length == 0; }
@property ulong leastSize() @blocking { return min(m_data.length, m_chunkLimit); }
@property bool dataAvailableForRead() { assert(false); }
const(ubyte)[] peek() { assert(false); } // currently not used by pipe()
size_t read(scope ubyte[] dst, IOMode mode)
@blocking {
assert(mode == IOMode.all || mode == IOMode.once);
if (mode == IOMode.once)
dst = dst[0 .. min($, m_chunkLimit)];
auto oldsleeps = m_sleepFrequency ? m_data.length / m_sleepFrequency : 0;
dst[] = m_data[0 .. dst.length];
m_data = m_data[dst.length .. $];
auto newsleeps = m_sleepFrequency ? m_data.length / m_sleepFrequency : 0;
if (oldsleeps != newsleeps) {
if (m_sleepAmount > 0.msecs)
sleep(m_sleepAmount * (oldsleeps - newsleeps));
else yield();
}
return dst.length;
}
void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
}
mixin validateInputStream!TestInputStream;
struct TestOutputStream {
private {
Appender!(ubyte[]) m_data;
ulong m_sleepFrequency = 0;
Duration m_sleepAmount;
}
this(ulong sleep_frequency, Duration sleep_amount)
{
m_data = appender!(ubyte[]);
m_data.reserve(2*1024*1024);
m_sleepFrequency = sleep_frequency;
m_sleepAmount = sleep_amount;
}
void freeData()
{
import core.memory : GC;
auto d = m_data.data;
m_data.clear();
GC.free(d.ptr);
}
@safe:
void finalize() @safe @blocking {}
void flush() @safe @blocking {}
size_t write(in ubyte[] bytes, IOMode mode)
@safe @blocking {
assert(mode == IOMode.all);
auto oldsleeps = m_sleepFrequency ? m_data.data.length / m_sleepFrequency : 0;
m_data.put(bytes);
auto newsleeps = m_sleepFrequency ? m_data.data.length / m_sleepFrequency : 0;
if (oldsleeps != newsleeps) {
if (m_sleepAmount > 0.msecs)
sleep(m_sleepAmount * (newsleeps - oldsleeps));
else yield();
}
return bytes.length;
}
void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
}
mixin validateOutputStream!TestOutputStream;