Add vibe.d 0.7.x compatible stream definitions.

In contrast to 0.7.x, streams can now be either of class, struct or interface type.
This commit is contained in:
Sönke Ludwig 2016-10-26 13:11:28 +02:00
parent 77d70de805
commit 0ee42c4243
4 changed files with 355 additions and 16 deletions

View file

@ -7,12 +7,11 @@
*/
module vibe.core.file;
//public import vibe.core.stream;
//public import vibe.inet.url;
import eventcore.core : eventDriver;
import eventcore.driver;
import vibe.core.log;
import vibe.core.path;
import vibe.core.stream;
import vibe.internal.async : asyncAwait;
import core.stdc.stdio;
@ -411,7 +410,7 @@ struct FileStream {
/// Determines if the file stream is still open
@property bool isOpen() const { return m_fd != FileFD.init; }
@property ulong size() const { return m_size; }
@property ulong size() const nothrow { return m_size; }
@property bool readable() const { return m_mode != FileMode.append; }
@property bool writable() const { return m_mode != FileMode.read; }
@ -483,6 +482,9 @@ logDebug("Written %s", res[2]);
}
}
mixin validateRandomAccessStream!FileStream;
private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, InputStream stream, ulong nbytes = 0)
{
assert(false);

View file

@ -502,6 +502,8 @@ mixin(tracer);
}
}
mixin validateConnectionStream!TCPConnection;
/**
Represents a listening TCP socket.

View file

@ -1,16 +1,245 @@
/**
Generic stream interface used by several stream-like classes.
This module defines the basic (buffered) stream primitives. For concrete stream types, take a
look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional
high-level operations on streams, such as reading streams by line or as a whole.
Copyright: © 2012-2016 RejectedSoftware e.K.
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig
*/
module vibe.core.stream;
enum isInputStream(T) = __traits(compiles, {
T s;
ubyte[] buf;
if (!s.empty)
s.read(buf);
if (s.leastSize > 0)
s.read(buf);
});
import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance;
import core.time;
import std.algorithm;
import std.conv;
enum isOutputStream(T) = __traits(compiles, {
T s;
const(ubyte)[] buf;
s.write(buf);
});
/**************************************************************************************************/
/* Public functions */
/**************************************************************************************************/
/**
Returns a `NullOutputStream` instance.
The instance will only be created on the first request and gets reused for
all subsequent calls from the same thread.
*/
NullOutputStream nullSink() @safe nothrow
{
static NullOutputStream ret;
if (!ret) ret = new NullOutputStream;
return ret;
}
/**************************************************************************************************/
/* Public types */
/**************************************************************************************************/
/**
Interface for all classes implementing readable streams.
*/
interface InputStream {
@safe:
/** Returns true $(I iff) the end of the input stream has been reached.
*/
@property bool empty();
/** Returns the maximum number of bytes that are known to remain in this stream until the end is
reached. After `leastSize()` bytes have been read, the stream will either have reached EOS
and `empty()` returns `true`, or `leastSize()` returns again a number greater than 0.
*/
@property ulong leastSize();
/** Queries if there is data available for immediate, non-blocking read.
*/
@property bool dataAvailableForRead();
/** Returns a temporary reference to the data that is currently buffered.
The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()`
returns `false`. Streams that don't have an internal buffer will always return an empty
slice.
Note that any method invocation on the same stream potentially invalidates the contents of
the returned buffer.
*/
const(ubyte)[] peek();
/** Fills the preallocated array 'bytes' with data from the stream.
Throws: An exception if the operation reads past the end of the stream
*/
void read(ubyte[] dst);
}
/**
Interface for all classes implementing writeable streams.
*/
interface OutputStream {
@safe:
/** Writes an array of bytes to the stream.
*/
void write(in ubyte[] bytes);
/** Flushes the stream and makes sure that all data is being written to the output device.
*/
void flush();
/** Flushes and finalizes the stream.
Finalize has to be called on certain types of streams. No writes are possible after a
call to finalize().
*/
void finalize();
/** Writes an array of chars to the stream.
*/
final void write(in char[] bytes)
{
write(cast(const(ubyte)[])bytes);
}
/** Pipes an InputStream directly into this OutputStream.
The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly
`nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an
exception is thrown.
*/
void write(InputStream stream, ulong nbytes = 0);
protected final void writeDefault(InputStream stream, ulong nbytes = 0)
@trusted // FreeListRef
{
import vibe.internal.memory : FreeListRef;
static struct Buffer { ubyte[64*1024] bytes = void; }
auto bufferobj = FreeListRef!(Buffer, false)();
auto buffer = bufferobj.bytes[];
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if( nbytes == 0 ){
while( !stream.empty ){
size_t chunk = min(stream.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
write(buffer[0 .. chunk]);
}
} else {
while( nbytes > 0 ){
size_t chunk = min(nbytes, buffer.length);
//logTrace("read pipe chunk %d", chunk);
stream.read(buffer[0 .. chunk]);
write(buffer[0 .. chunk]);
nbytes -= chunk;
}
}
}
}
/**
Interface for all classes implementing readable and writable streams.
*/
interface Stream : InputStream, OutputStream {
}
/**
Interface for streams based on a connection.
Connection streams are based on streaming socket connections, pipes and similar end-to-end
streams.
See_also: `vibe.core.net.TCPConnection`
*/
interface ConnectionStream : Stream {
@safe:
/** Determines The current connection status.
If `connected` is `false`, writing to the connection will trigger an exception. Reading may
still succeed as long as there is data left in the input buffer. Use `InputStream.empty`
instead to determine when to stop reading.
*/
@property bool connected() const;
/** Actively closes the connection and frees associated resources.
Note that close must always be called, even if the remote has already closed the connection.
Failure to do so will result in resource and memory leakage.
Closing a connection implies a call to `finalize`, so that it doesn't need to be called
explicitly (it will be a no-op in that case).
*/
void close();
/** Blocks until data becomes available for read.
The maximum wait time can be customized with the `timeout` parameter. If there is already
data availabe for read, or if the connection is closed, the function will return immediately
without blocking.
Params:
timeout = Optional timeout, the default value of `Duration.max` waits without a timeout.
Returns:
The function will return `true` if data becomes available before the timeout is reached.
If the connection gets closed, or the timeout gets reached, `false` is returned instead.
*/
bool waitForData(Duration timeout = Duration.max);
}
/**
Interface for all streams supporting random access.
*/
interface RandomAccessStream : Stream {
@safe:
/// Returns the total size of the file.
@property ulong size() const nothrow;
/// Determines if this stream is readable.
@property bool readable() const nothrow;
/// Determines if this stream is writable.
@property bool writable() const nothrow;
/// Seeks to a specific position in the file if supported by the stream.
void seek(ulong offset);
/// Returns the current offset of the file pointer
ulong tell() nothrow;
}
/**
Stream implementation acting as a sink with no function.
Any data written to the stream will be ignored and discarded. This stream type is useful if
the output of a particular stream is not needed but the stream needs to be drained.
*/
final class NullOutputStream : OutputStream {
void write(in ubyte[] bytes) {}
void write(InputStream stream, ulong nbytes = 0)
{
writeDefault(stream, nbytes);
}
void flush() {}
void finalize() {}
}
enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null;
enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null;
enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null;
enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null;
mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, InputStream); }
mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, OutputStream); }
mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, ConnectionStream); }
mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, RandomAccessStream); }

View file

@ -382,3 +382,109 @@ unittest {
// DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704
import core.sync.mutex : Mutex;
enum synchronizedIsNothrow = __traits(compiles, (Mutex m) nothrow { synchronized(m) {} });
/// Mixin template that checks a particular aggregate type for conformance with a specific interface.
template validateInterfaceConformance(T, I)
{
import vibe.internal.traits : checkInterfaceConformance;
static assert(checkInterfaceConformance!(T, I) is null, checkInterfaceConformance!(T, I));
}
/** Checks an aggregate type for conformance with a specific interface.
The value of this template is either `null`, or an error message indicating the first method
of the interface that is not properly implemented by `T`.
*/
template checkInterfaceConformance(T, I) {
import std.meta : AliasSeq;
import std.traits : FunctionAttribute, FunctionTypeOf, MemberFunctionsTuple, ParameterTypeTuple, ReturnType, functionAttributes;
alias Members = AliasSeq!(__traits(allMembers, I));
template checkMemberConformance(string mem) {
alias Overloads = MemberFunctionsTuple!(I, mem);
template impl(size_t i) {
static if (i < Overloads.length) {
alias F = Overloads[i];
alias FT = FunctionTypeOf!F;
alias PT = ParameterTypeTuple!F;
alias RT = ReturnType!F;
static if (functionAttributes!F & FunctionAttribute.property) {
static if (PT.length > 0) {
static if (!is(typeof({ T t; return mixin("t."~mem) = PT.init; } ()) : RT))
enum impl = T.stringof ~ " does not implement property setter \"" ~ mem ~ "\" of type " ~ FT.stringof;
else enum string impl = impl!(i+1);
} else {
static if (!is(typeof({ T t; return mixin("t."~mem); }()) : RT))
enum impl = T.stringof ~ " does not implement property getter \"" ~ mem ~ "\" of type " ~ FT.stringof;
else enum string impl = impl!(i+1);
}
} else {
//pragma(msg, typeof({ T t; PT p; return mixin("t."~mem)(p); } ()));
static if (!is(typeof({ T t; PT p; return mixin("t."~mem)(p); } ()) : RT))
enum impl = T.stringof ~ " does not implement method \"" ~ mem ~ "\" of type " ~ FT.stringof;
else enum string impl = impl!(i+1);
}
} else enum string impl = null;
}
alias checkMemberConformance = impl!0;
}
template impl(size_t i) {
static if (i < Members.length) {
enum mc = checkMemberConformance!(Members[i]);
static if (mc is null) enum impl = impl!(i+1);
else enum impl = mc;
} else enum string impl = null;
}
static if (is(T == struct) || is(T == class) || is(T == interface))
enum checkInterfaceConformance = impl!0;
else
enum checkInterfaceConformance = "Aggregate type expected, not " ~ T.stringof;
}
unittest {
interface InputStream {
@safe:
@property bool empty() nothrow;
void read(ubyte[] dst);
}
interface OutputStream {
@safe:
void write(in ubyte[] bytes);
void flush();
void finalize();
void write(InputStream stream, ulong nbytes = 0);
}
static class OSClass : OutputStream {
override void write(in ubyte[] bytes) {}
override void flush() {}
override void finalize() {}
override void write(InputStream stream, ulong nbytes) {}
}
mixin validateInterfaceConformance!(OSClass, OutputStream);
static struct OSStruct {
void write(in ubyte[] bytes) {}
void flush() {}
void finalize() {}
void write(IS)(IS stream, ulong nbytes) {}
}
mixin validateInterfaceConformance!(OSStruct, OutputStream);
static struct NonOSStruct {
void write(in ubyte[] bytes) {}
void flush(bool) {}
void finalize() {}
void write(InputStream stream, ulong nbytes) {}
}
static assert(checkInterfaceConformance!(NonOSStruct, OutputStream) ==
"NonOSStruct does not implement method \"flush\" of type @safe void()");
}