From 2acc60934fc9f308914345abe5f28ef4b5cc3b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 19 Jan 2017 00:36:32 +0100 Subject: [PATCH] Add IOMode parameters to stream read/write methods. --- dub.sdl | 2 +- source/vibe/core/file.d | 21 ++++++++-- source/vibe/core/net.d | 25 ++++++++---- source/vibe/core/stream.d | 83 ++++++++++++++++++++++++++++----------- 4 files changed, 95 insertions(+), 36 deletions(-) diff --git a/dub.sdl b/dub.sdl index 004b1b9..d9e1204 100644 --- a/dub.sdl +++ b/dub.sdl @@ -4,7 +4,7 @@ authors "Sönke Ludwig" copyright "Copyright © 2016, rejectedsoftware e.K." license "MIT" -dependency "eventcore" version="~>0.6.0" +dependency "eventcore" version="~>0.7.0" targetName "vibe_core" diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index ea0d463..cddcd52 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -458,25 +458,38 @@ struct FileStream { return null; } - void read(ubyte[] dst) + size_t read(ubyte[] dst, IOMode mode) { auto res = asyncAwait!(FileIOCallback, - cb => eventDriver.files.read(m_fd, ctx.ptr, dst, cb), + cb => eventDriver.files.read(m_fd, ctx.ptr, dst, mode, cb), cb => eventDriver.files.cancelRead(m_fd) ); enforce(res[1] == IOStatus.ok, "Failed to read data from disk."); + return res[2]; } - void write(in ubyte[] bytes) + void read(ubyte[] dst) + { + auto ret = read(dst, IOMode.all); + assert(ret == dst.length); + } + + size_t write(in ubyte[] bytes, IOMode mode) { auto res = asyncAwait!(FileIOCallback, - cb => eventDriver.files.write(m_fd, ctx.ptr, bytes, cb), + cb => eventDriver.files.write(m_fd, ctx.ptr, bytes, mode, cb), cb => eventDriver.files.cancelWrite(m_fd) ); ctx.ptr += res[2]; logDebug("Written %s", res[2]); if (ctx.ptr > ctx.size) ctx.size = ctx.ptr; enforce(res[1] == IOStatus.ok, "Failed to read data from disk."); + return res[2]; + } + + void write(in ubyte[] bytes) + { + write(bytes, IOMode.all); } void write(in char[] bytes) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index bc1fd75..2aa87d8 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -512,28 +512,37 @@ mixin(tracer); }); } - void read(ubyte[] dst) + size_t read(scope ubyte[] dst, IOMode mode) { mixin(tracer); import std.algorithm.comparison : min; - if (!dst.length) return; + if (!dst.length) return 0; + size_t nbytes = 0; m_context.readTimeout.loopWithTimeout!((remaining) { - enforce(waitForData(), "Reached end of stream while reading data."); + if (m_context.readBuffer.length == 0) { + if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) + return true; + enforce(waitForData(remaining), "Reached end of stream while reading data."); + } assert(m_context.readBuffer.length > 0); auto l = min(dst.length, m_context.readBuffer.length); m_context.readBuffer.read(dst[0 .. l]); dst = dst[l .. $]; + nbytes += l; return dst.length == 0; }); + return nbytes; } - void write(in ubyte[] bytes) + void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); } + + size_t write(in ubyte[] bytes, IOMode mode) { mixin(tracer); - if (bytes.length == 0) return; + if (bytes.length == 0) return 0; auto res = asyncAwait!(IOCallback, - cb => eventDriver.sockets.write(m_socket, bytes, IOMode.all, cb), + cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), cb => eventDriver.sockets.cancelWrite(m_socket)); switch (res[1]) { @@ -541,10 +550,12 @@ mixin(tracer); throw new Exception("Error writing data to socket."); case IOStatus.ok: break; case IOStatus.disconnected: break; - } + + return res[2]; } + void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } void write(InputStream stream) { write(stream, 0); } diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index e5264a6..d25b939 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -21,6 +21,19 @@ import core.time; import std.algorithm; import std.conv; +public import eventcore.driver : IOMode; + + +/** Marks a function as blocking. + + Blocking in this case means that it may contain an operation that needs to wait for + external events, such as I/O operations, and may result in other tasks in the same + threa being executed before it returns. + + Currently this attribute serves only as a documentation aid and is not enforced + or used for deducation in any way. +*/ +struct blocking {} /**************************************************************************************************/ /* Public functions */ @@ -50,16 +63,20 @@ interface InputStream { @safe: /** Returns true $(I iff) the end of the input stream has been reached. - */ - @property bool empty(); - /** Returns the maximum number of bytes that are known to remain in this stream until the end is - reached. After `leastSize()` bytes have been read, the stream will either have reached EOS - and `empty()` returns `true`, or `leastSize()` returns again a number greater than 0. + For connection oriented streams, this function will block until either + new data arrives or the connection got closed. */ - @property ulong leastSize(); + @property bool empty() @blocking; - /** Queries if there is data available for immediate, non-blocking read. + /** (Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read. + + After `leastSize()` bytes have been read, the stream will either have reached EOS + and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`. + */ + @property ulong leastSize() @blocking; + + /** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read. */ @property bool dataAvailableForRead(); @@ -76,11 +93,28 @@ interface InputStream { /** Fills the preallocated array 'bytes' with data from the stream. + This function will continue read from the stream until the buffer has + been fully filled. + + Params: + dst = The buffer into which to write the data that was read + mode = Optional reading mode (defaults to `IOMode.all`). + + Return: + Returns the number of bytes read. The `dst` buffer will be filled up + to this index. The return value is guaranteed to be `dst.length` for + `IOMode.all`. + Throws: An exception if the operation reads past the end of the stream + + See_Also: `readOnce`, `tryRead` */ - void read(ubyte[] dst); + size_t read(scope ubyte[] dst, IOMode mode) @blocking; + /// ditto + final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } } + /** Interface for all classes implementing writeable streams. */ @@ -89,25 +123,25 @@ interface OutputStream { /** Writes an array of bytes to the stream. */ - void write(in ubyte[] bytes); + size_t write(in ubyte[] bytes, IOMode mode) @blocking; + /// ditto + final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } + /// ditto + final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } /** Flushes the stream and makes sure that all data is being written to the output device. */ - void flush(); + void flush() @blocking; /** Flushes and finalizes the stream. Finalize has to be called on certain types of streams. No writes are possible after a call to finalize(). */ - void finalize(); + void finalize() @blocking; /** Writes an array of chars to the stream. */ - final void write(in char[] bytes) - { - write(cast(const(ubyte)[])bytes); - } /** Pipes an InputStream directly into this OutputStream. @@ -115,12 +149,12 @@ interface OutputStream { `nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an exception is thrown. */ - void write(InputStream stream, ulong nbytes); + void write(InputStream stream, ulong nbytes) @blocking; /// ditto - final void write(InputStream stream) { write(stream, 0); } + final void write(InputStream stream) @blocking { write(stream, ulong.max); } protected final void writeDefault(InputStream stream, ulong nbytes = 0) - @trusted // FreeListRef + @trusted @blocking // FreeListRef { import vibe.internal.allocator : theAllocator, make, dispose; @@ -130,7 +164,7 @@ interface OutputStream { auto buffer = bufferobj.bytes; //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); - if( nbytes == 0 ){ + if (nbytes == 0 || nbytes == ulong.max) { while( !stream.empty ){ size_t chunk = min(stream.leastSize, buffer.length); assert(chunk > 0, "leastSize returned zero for non-empty stream."); @@ -184,7 +218,7 @@ interface ConnectionStream : Stream { Closing a connection implies a call to `finalize`, so that it doesn't need to be called explicitly (it will be a no-op in that case). */ - void close(); + void close() @blocking; /** Blocks until data becomes available for read. @@ -199,7 +233,7 @@ interface ConnectionStream : Stream { The function will return `true` if data becomes available before the timeout is reached. If the connection gets closed, or the timeout gets reached, `false` is returned instead. */ - bool waitForData(Duration timeout = Duration.max); + bool waitForData(Duration timeout = Duration.max) @blocking; } @@ -219,7 +253,7 @@ interface RandomAccessStream : Stream { @property bool writable() const nothrow; /// Seeks to a specific position in the file if supported by the stream. - void seek(ulong offset); + void seek(ulong offset) @blocking; /// Returns the current offset of the file pointer ulong tell() nothrow; @@ -233,11 +267,12 @@ interface RandomAccessStream : Stream { the output of a particular stream is not needed but the stream needs to be drained. */ final class NullOutputStream : OutputStream { - void write(in ubyte[] bytes) {} - void write(InputStream stream, ulong nbytes = 0) + size_t write(in ubyte[] bytes, IOMode) { return bytes.length; } + void write(InputStream stream, ulong nbytes) { writeDefault(stream, nbytes); } + alias write = OutputStream.write; void flush() {} void finalize() {} }