Deprecate stream based OutputStream.write overloads.

Replaced by a global pipe() function that supports non-class streams, too.
This commit is contained in:
Sönke Ludwig 2017-01-25 22:42:35 +01:00
parent 8985923e4d
commit 373f82c16b
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -24,6 +24,51 @@ import std.conv;
public import eventcore.driver : IOMode; public import eventcore.driver : IOMode;
/** Pipes an InputStream directly into this OutputStream.
The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly
`nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an
exception is thrown.
*/
void pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes)
@blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, make, dispose;
static struct Buffer { ubyte[64*1024] bytes = void; }
auto bufferobj = theAllocator.make!Buffer();
scope (exit) theAllocator.dispose(bufferobj);
auto buffer = bufferobj.bytes;
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if (nbytes == ulong.max) {
while (!source.empty) {
size_t chunk = min(source.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
}
} else {
while (nbytes > 0) {
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
source.read(buffer[0 .. chunk]);
sink.write(buffer[0 .. chunk]);
nbytes -= chunk;
}
}
}
/// ditto
void pipe(InputStream, OutputStream)(InputStream source, OutputStream sink)
@blocking
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
pipe(source, sink, ulong.max);
}
/** Marks a function as blocking. /** Marks a function as blocking.
Blocking in this case means that it may contain an operation that needs to wait for Blocking in this case means that it may contain an operation that needs to wait for
@ -129,6 +174,18 @@ interface OutputStream {
/// ditto /// ditto
final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
/** Deprecated - writes the contents of an `InputStream` into this stream.
This function will be removed. Use `pipe` instead.
See_Also: `pipe`
*/
deprecated("Use s.pipe(this) instead.")
final void write(InputStream s) { s.pipe(this); }
/// ditto
deprecated("Use s.pipe(this, nbytes) instead.")
final void write(InputStream s, ulong nbytes) { s.pipe(this, nbytes); }
/** Flushes the stream and makes sure that all data is being written to the output device. /** Flushes the stream and makes sure that all data is being written to the output device.
*/ */
void flush() @blocking; void flush() @blocking;
@ -139,49 +196,6 @@ interface OutputStream {
call to finalize(). call to finalize().
*/ */
void finalize() @blocking; void finalize() @blocking;
/** Writes an array of chars to the stream.
*/
/** Pipes an InputStream directly into this OutputStream.
The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly
`nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an
exception is thrown.
*/
void write(InputStream stream, ulong nbytes) @blocking;
/// ditto
final void write(InputStream stream) @blocking { write(stream, ulong.max); }
protected final void writeDefault(InputStream stream, ulong nbytes = 0)
@trusted @blocking // FreeListRef
{
import vibe.internal.allocator : theAllocator, make, dispose;
static struct Buffer { ubyte[64*1024] bytes = void; }
auto bufferobj = theAllocator.make!Buffer();
scope (exit) theAllocator.dispose(bufferobj);
auto buffer = bufferobj.bytes;
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if (nbytes == 0 || nbytes == ulong.max) {
while( !stream.empty ){
size_t chunk = min(stream.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
write(buffer[0 .. chunk]);
}
} else {
while( nbytes > 0 ){
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
write(buffer[0 .. chunk]);
nbytes -= chunk;
}
}
}
} }
/** /**
@ -268,10 +282,6 @@ interface RandomAccessStream : Stream {
*/ */
final class NullOutputStream : OutputStream { final class NullOutputStream : OutputStream {
size_t write(in ubyte[] bytes, IOMode) { return bytes.length; } size_t write(in ubyte[] bytes, IOMode) { return bytes.length; }
void write(InputStream stream, ulong nbytes)
{
writeDefault(stream, nbytes);
}
alias write = OutputStream.write; alias write = OutputStream.write;
void flush() {} void flush() {}
void finalize() {} void finalize() {}