diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index 5471fce..ca97bfa 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -7,12 +7,11 @@ */ module vibe.core.file; -//public import vibe.core.stream; -//public import vibe.inet.url; import eventcore.core : eventDriver; import eventcore.driver; import vibe.core.log; import vibe.core.path; +import vibe.core.stream; import vibe.internal.async : asyncAwait; import core.stdc.stdio; @@ -411,7 +410,7 @@ struct FileStream { /// Determines if the file stream is still open @property bool isOpen() const { return m_fd != FileFD.init; } - @property ulong size() const { return m_size; } + @property ulong size() const nothrow { return m_size; } @property bool readable() const { return m_mode != FileMode.append; } @property bool writable() const { return m_mode != FileMode.read; } @@ -483,6 +482,9 @@ logDebug("Written %s", res[2]); } } +mixin validateRandomAccessStream!FileStream; + + private void writeDefault(OutputStream, InputStream)(ref OutputStream dst, InputStream stream, ulong nbytes = 0) { assert(false); diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 2e7943d..e2c1a00 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -502,6 +502,8 @@ mixin(tracer); } } +mixin validateConnectionStream!TCPConnection; + /** Represents a listening TCP socket. diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index 86fc122..5993948 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -1,16 +1,245 @@ +/** + Generic stream interface used by several stream-like classes. + + This module defines the basic (buffered) stream primitives. For concrete stream types, take a + look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional + high-level operations on streams, such as reading streams by line or as a whole. + + Copyright: © 2012-2016 RejectedSoftware e.K. + License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. + Authors: Sönke Ludwig +*/ module vibe.core.stream; -enum isInputStream(T) = __traits(compiles, { - T s; - ubyte[] buf; - if (!s.empty) - s.read(buf); - if (s.leastSize > 0) - s.read(buf); -}); +import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance; +import core.time; +import std.algorithm; +import std.conv; -enum isOutputStream(T) = __traits(compiles, { - T s; - const(ubyte)[] buf; - s.write(buf); -}); + +/**************************************************************************************************/ +/* Public functions */ +/**************************************************************************************************/ + +/** + Returns a `NullOutputStream` instance. + + The instance will only be created on the first request and gets reused for + all subsequent calls from the same thread. +*/ +NullOutputStream nullSink() @safe nothrow +{ + static NullOutputStream ret; + if (!ret) ret = new NullOutputStream; + return ret; +} + +/**************************************************************************************************/ +/* Public types */ +/**************************************************************************************************/ + +/** + Interface for all classes implementing readable streams. +*/ +interface InputStream { + @safe: + + /** Returns true $(I iff) the end of the input stream has been reached. + */ + @property bool empty(); + + /** Returns the maximum number of bytes that are known to remain in this stream until the end is + reached. After `leastSize()` bytes have been read, the stream will either have reached EOS + and `empty()` returns `true`, or `leastSize()` returns again a number greater than 0. + */ + @property ulong leastSize(); + + /** Queries if there is data available for immediate, non-blocking read. + */ + @property bool dataAvailableForRead(); + + /** Returns a temporary reference to the data that is currently buffered. + + The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()` + returns `false`. Streams that don't have an internal buffer will always return an empty + slice. + + Note that any method invocation on the same stream potentially invalidates the contents of + the returned buffer. + */ + const(ubyte)[] peek(); + + /** Fills the preallocated array 'bytes' with data from the stream. + + Throws: An exception if the operation reads past the end of the stream + */ + void read(ubyte[] dst); +} + +/** + Interface for all classes implementing writeable streams. +*/ +interface OutputStream { + @safe: + + /** Writes an array of bytes to the stream. + */ + void write(in ubyte[] bytes); + + /** Flushes the stream and makes sure that all data is being written to the output device. + */ + void flush(); + + /** Flushes and finalizes the stream. + + Finalize has to be called on certain types of streams. No writes are possible after a + call to finalize(). + */ + void finalize(); + + /** Writes an array of chars to the stream. + */ + final void write(in char[] bytes) + { + write(cast(const(ubyte)[])bytes); + } + + /** Pipes an InputStream directly into this OutputStream. + + The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly + `nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an + exception is thrown. + */ + void write(InputStream stream, ulong nbytes = 0); + + protected final void writeDefault(InputStream stream, ulong nbytes = 0) + @trusted // FreeListRef + { + import vibe.internal.memory : FreeListRef; + + static struct Buffer { ubyte[64*1024] bytes = void; } + auto bufferobj = FreeListRef!(Buffer, false)(); + auto buffer = bufferobj.bytes[]; + + //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); + if( nbytes == 0 ){ + while( !stream.empty ){ + size_t chunk = min(stream.leastSize, buffer.length); + assert(chunk > 0, "leastSize returned zero for non-empty stream."); + //logTrace("read pipe chunk %d", chunk); + stream.read(buffer[0 .. chunk]); + write(buffer[0 .. chunk]); + } + } else { + while( nbytes > 0 ){ + size_t chunk = min(nbytes, buffer.length); + //logTrace("read pipe chunk %d", chunk); + stream.read(buffer[0 .. chunk]); + write(buffer[0 .. chunk]); + nbytes -= chunk; + } + } + } +} + +/** + Interface for all classes implementing readable and writable streams. +*/ +interface Stream : InputStream, OutputStream { +} + + +/** + Interface for streams based on a connection. + + Connection streams are based on streaming socket connections, pipes and similar end-to-end + streams. + + See_also: `vibe.core.net.TCPConnection` +*/ +interface ConnectionStream : Stream { + @safe: + + /** Determines The current connection status. + + If `connected` is `false`, writing to the connection will trigger an exception. Reading may + still succeed as long as there is data left in the input buffer. Use `InputStream.empty` + instead to determine when to stop reading. + */ + @property bool connected() const; + + /** Actively closes the connection and frees associated resources. + + Note that close must always be called, even if the remote has already closed the connection. + Failure to do so will result in resource and memory leakage. + + Closing a connection implies a call to `finalize`, so that it doesn't need to be called + explicitly (it will be a no-op in that case). + */ + void close(); + + /** Blocks until data becomes available for read. + + The maximum wait time can be customized with the `timeout` parameter. If there is already + data availabe for read, or if the connection is closed, the function will return immediately + without blocking. + + Params: + timeout = Optional timeout, the default value of `Duration.max` waits without a timeout. + + Returns: + The function will return `true` if data becomes available before the timeout is reached. + If the connection gets closed, or the timeout gets reached, `false` is returned instead. + */ + bool waitForData(Duration timeout = Duration.max); +} + + +/** + Interface for all streams supporting random access. +*/ +interface RandomAccessStream : Stream { + @safe: + + /// Returns the total size of the file. + @property ulong size() const nothrow; + + /// Determines if this stream is readable. + @property bool readable() const nothrow; + + /// Determines if this stream is writable. + @property bool writable() const nothrow; + + /// Seeks to a specific position in the file if supported by the stream. + void seek(ulong offset); + + /// Returns the current offset of the file pointer + ulong tell() nothrow; +} + + +/** + Stream implementation acting as a sink with no function. + + Any data written to the stream will be ignored and discarded. This stream type is useful if + the output of a particular stream is not needed but the stream needs to be drained. +*/ +final class NullOutputStream : OutputStream { + void write(in ubyte[] bytes) {} + void write(InputStream stream, ulong nbytes = 0) + { + writeDefault(stream, nbytes); + } + void flush() {} + void finalize() {} +} + +enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null; +enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null; +enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null; +enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null; + +mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, InputStream); } +mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, OutputStream); } +mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, ConnectionStream); } +mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, RandomAccessStream); } diff --git a/source/vibe/internal/traits.d b/source/vibe/internal/traits.d index b2449e4..8d7f9a9 100644 --- a/source/vibe/internal/traits.d +++ b/source/vibe/internal/traits.d @@ -382,3 +382,109 @@ unittest { // DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704 import core.sync.mutex : Mutex; enum synchronizedIsNothrow = __traits(compiles, (Mutex m) nothrow { synchronized(m) {} }); + + +/// Mixin template that checks a particular aggregate type for conformance with a specific interface. +template validateInterfaceConformance(T, I) +{ + import vibe.internal.traits : checkInterfaceConformance; + static assert(checkInterfaceConformance!(T, I) is null, checkInterfaceConformance!(T, I)); +} + +/** Checks an aggregate type for conformance with a specific interface. + + The value of this template is either `null`, or an error message indicating the first method + of the interface that is not properly implemented by `T`. +*/ +template checkInterfaceConformance(T, I) { + import std.meta : AliasSeq; + import std.traits : FunctionAttribute, FunctionTypeOf, MemberFunctionsTuple, ParameterTypeTuple, ReturnType, functionAttributes; + + alias Members = AliasSeq!(__traits(allMembers, I)); + + template checkMemberConformance(string mem) { + alias Overloads = MemberFunctionsTuple!(I, mem); + template impl(size_t i) { + static if (i < Overloads.length) { + alias F = Overloads[i]; + alias FT = FunctionTypeOf!F; + alias PT = ParameterTypeTuple!F; + alias RT = ReturnType!F; + static if (functionAttributes!F & FunctionAttribute.property) { + static if (PT.length > 0) { + static if (!is(typeof({ T t; return mixin("t."~mem) = PT.init; } ()) : RT)) + enum impl = T.stringof ~ " does not implement property setter \"" ~ mem ~ "\" of type " ~ FT.stringof; + else enum string impl = impl!(i+1); + } else { + static if (!is(typeof({ T t; return mixin("t."~mem); }()) : RT)) + enum impl = T.stringof ~ " does not implement property getter \"" ~ mem ~ "\" of type " ~ FT.stringof; + else enum string impl = impl!(i+1); + } + } else { + //pragma(msg, typeof({ T t; PT p; return mixin("t."~mem)(p); } ())); + static if (!is(typeof({ T t; PT p; return mixin("t."~mem)(p); } ()) : RT)) + enum impl = T.stringof ~ " does not implement method \"" ~ mem ~ "\" of type " ~ FT.stringof; + else enum string impl = impl!(i+1); + } + } else enum string impl = null; + } + alias checkMemberConformance = impl!0; + } + + template impl(size_t i) { + static if (i < Members.length) { + enum mc = checkMemberConformance!(Members[i]); + static if (mc is null) enum impl = impl!(i+1); + else enum impl = mc; + } else enum string impl = null; + } + + static if (is(T == struct) || is(T == class) || is(T == interface)) + enum checkInterfaceConformance = impl!0; + else + enum checkInterfaceConformance = "Aggregate type expected, not " ~ T.stringof; +} + +unittest { + interface InputStream { + @safe: + @property bool empty() nothrow; + void read(ubyte[] dst); + } + + interface OutputStream { + @safe: + void write(in ubyte[] bytes); + void flush(); + void finalize(); + void write(InputStream stream, ulong nbytes = 0); + } + + static class OSClass : OutputStream { + override void write(in ubyte[] bytes) {} + override void flush() {} + override void finalize() {} + override void write(InputStream stream, ulong nbytes) {} + } + + mixin validateInterfaceConformance!(OSClass, OutputStream); + + static struct OSStruct { + void write(in ubyte[] bytes) {} + void flush() {} + void finalize() {} + void write(IS)(IS stream, ulong nbytes) {} + } + + mixin validateInterfaceConformance!(OSStruct, OutputStream); + + static struct NonOSStruct { + void write(in ubyte[] bytes) {} + void flush(bool) {} + void finalize() {} + void write(InputStream stream, ulong nbytes) {} + } + + static assert(checkInterfaceConformance!(NonOSStruct, OutputStream) == + "NonOSStruct does not implement method \"flush\" of type @safe void()"); +}