Add IOMode parameters to stream read/write methods.

This commit is contained in:
Sönke Ludwig 2017-01-19 00:36:32 +01:00
parent e80d7244bc
commit 2acc60934f
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
4 changed files with 95 additions and 36 deletions

View file

@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016, rejectedsoftware e.K."
license "MIT"
dependency "eventcore" version="~>0.6.0"
dependency "eventcore" version="~>0.7.0"
targetName "vibe_core"

View file

@ -458,25 +458,38 @@ struct FileStream {
return null;
}
void read(ubyte[] dst)
size_t read(ubyte[] dst, IOMode mode)
{
auto res = asyncAwait!(FileIOCallback,
cb => eventDriver.files.read(m_fd, ctx.ptr, dst, cb),
cb => eventDriver.files.read(m_fd, ctx.ptr, dst, mode, cb),
cb => eventDriver.files.cancelRead(m_fd)
);
enforce(res[1] == IOStatus.ok, "Failed to read data from disk.");
return res[2];
}
void write(in ubyte[] bytes)
void read(ubyte[] dst)
{
auto ret = read(dst, IOMode.all);
assert(ret == dst.length);
}
size_t write(in ubyte[] bytes, IOMode mode)
{
auto res = asyncAwait!(FileIOCallback,
cb => eventDriver.files.write(m_fd, ctx.ptr, bytes, cb),
cb => eventDriver.files.write(m_fd, ctx.ptr, bytes, mode, cb),
cb => eventDriver.files.cancelWrite(m_fd)
);
ctx.ptr += res[2];
logDebug("Written %s", res[2]);
if (ctx.ptr > ctx.size) ctx.size = ctx.ptr;
enforce(res[1] == IOStatus.ok, "Failed to read data from disk.");
return res[2];
}
void write(in ubyte[] bytes)
{
write(bytes, IOMode.all);
}
void write(in char[] bytes)

View file

@ -512,28 +512,37 @@ mixin(tracer);
});
}
void read(ubyte[] dst)
size_t read(scope ubyte[] dst, IOMode mode)
{
mixin(tracer);
import std.algorithm.comparison : min;
if (!dst.length) return;
if (!dst.length) return 0;
size_t nbytes = 0;
m_context.readTimeout.loopWithTimeout!((remaining) {
enforce(waitForData(), "Reached end of stream while reading data.");
if (m_context.readBuffer.length == 0) {
if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
return true;
enforce(waitForData(remaining), "Reached end of stream while reading data.");
}
assert(m_context.readBuffer.length > 0);
auto l = min(dst.length, m_context.readBuffer.length);
m_context.readBuffer.read(dst[0 .. l]);
dst = dst[l .. $];
nbytes += l;
return dst.length == 0;
});
return nbytes;
}
void write(in ubyte[] bytes)
void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); }
size_t write(in ubyte[] bytes, IOMode mode)
{
mixin(tracer);
if (bytes.length == 0) return;
if (bytes.length == 0) return 0;
auto res = asyncAwait!(IOCallback,
cb => eventDriver.sockets.write(m_socket, bytes, IOMode.all, cb),
cb => eventDriver.sockets.write(m_socket, bytes, mode, cb),
cb => eventDriver.sockets.cancelWrite(m_socket));
switch (res[1]) {
@ -541,10 +550,12 @@ mixin(tracer);
throw new Exception("Error writing data to socket.");
case IOStatus.ok: break;
case IOStatus.disconnected: break;
}
return res[2];
}
void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); }
void write(InputStream stream) { write(stream, 0); }

View file

@ -21,6 +21,19 @@ import core.time;
import std.algorithm;
import std.conv;
public import eventcore.driver : IOMode;
/** Marks a function as blocking.
Blocking in this case means that it may contain an operation that needs to wait for
external events, such as I/O operations, and may result in other tasks in the same
threa being executed before it returns.
Currently this attribute serves only as a documentation aid and is not enforced
or used for deducation in any way.
*/
struct blocking {}
/**************************************************************************************************/
/* Public functions */
@ -50,16 +63,20 @@ interface InputStream {
@safe:
/** Returns true $(I iff) the end of the input stream has been reached.
*/
@property bool empty();
/** Returns the maximum number of bytes that are known to remain in this stream until the end is
reached. After `leastSize()` bytes have been read, the stream will either have reached EOS
and `empty()` returns `true`, or `leastSize()` returns again a number greater than 0.
For connection oriented streams, this function will block until either
new data arrives or the connection got closed.
*/
@property ulong leastSize();
@property bool empty() @blocking;
/** Queries if there is data available for immediate, non-blocking read.
/** (Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read.
After `leastSize()` bytes have been read, the stream will either have reached EOS
and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`.
*/
@property ulong leastSize() @blocking;
/** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read.
*/
@property bool dataAvailableForRead();
@ -76,11 +93,28 @@ interface InputStream {
/** Fills the preallocated array 'bytes' with data from the stream.
This function will continue read from the stream until the buffer has
been fully filled.
Params:
dst = The buffer into which to write the data that was read
mode = Optional reading mode (defaults to `IOMode.all`).
Return:
Returns the number of bytes read. The `dst` buffer will be filled up
to this index. The return value is guaranteed to be `dst.length` for
`IOMode.all`.
Throws: An exception if the operation reads past the end of the stream
See_Also: `readOnce`, `tryRead`
*/
void read(ubyte[] dst);
size_t read(scope ubyte[] dst, IOMode mode) @blocking;
/// ditto
final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
}
/**
Interface for all classes implementing writeable streams.
*/
@ -89,25 +123,25 @@ interface OutputStream {
/** Writes an array of bytes to the stream.
*/
void write(in ubyte[] bytes);
size_t write(in ubyte[] bytes, IOMode mode) @blocking;
/// ditto
final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
/// ditto
final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
/** Flushes the stream and makes sure that all data is being written to the output device.
*/
void flush();
void flush() @blocking;
/** Flushes and finalizes the stream.
Finalize has to be called on certain types of streams. No writes are possible after a
call to finalize().
*/
void finalize();
void finalize() @blocking;
/** Writes an array of chars to the stream.
*/
final void write(in char[] bytes)
{
write(cast(const(ubyte)[])bytes);
}
/** Pipes an InputStream directly into this OutputStream.
@ -115,12 +149,12 @@ interface OutputStream {
`nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an
exception is thrown.
*/
void write(InputStream stream, ulong nbytes);
void write(InputStream stream, ulong nbytes) @blocking;
/// ditto
final void write(InputStream stream) { write(stream, 0); }
final void write(InputStream stream) @blocking { write(stream, ulong.max); }
protected final void writeDefault(InputStream stream, ulong nbytes = 0)
@trusted // FreeListRef
@trusted @blocking // FreeListRef
{
import vibe.internal.allocator : theAllocator, make, dispose;
@ -130,7 +164,7 @@ interface OutputStream {
auto buffer = bufferobj.bytes;
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if( nbytes == 0 ){
if (nbytes == 0 || nbytes == ulong.max) {
while( !stream.empty ){
size_t chunk = min(stream.leastSize, buffer.length);
assert(chunk > 0, "leastSize returned zero for non-empty stream.");
@ -184,7 +218,7 @@ interface ConnectionStream : Stream {
Closing a connection implies a call to `finalize`, so that it doesn't need to be called
explicitly (it will be a no-op in that case).
*/
void close();
void close() @blocking;
/** Blocks until data becomes available for read.
@ -199,7 +233,7 @@ interface ConnectionStream : Stream {
The function will return `true` if data becomes available before the timeout is reached.
If the connection gets closed, or the timeout gets reached, `false` is returned instead.
*/
bool waitForData(Duration timeout = Duration.max);
bool waitForData(Duration timeout = Duration.max) @blocking;
}
@ -219,7 +253,7 @@ interface RandomAccessStream : Stream {
@property bool writable() const nothrow;
/// Seeks to a specific position in the file if supported by the stream.
void seek(ulong offset);
void seek(ulong offset) @blocking;
/// Returns the current offset of the file pointer
ulong tell() nothrow;
@ -233,11 +267,12 @@ interface RandomAccessStream : Stream {
the output of a particular stream is not needed but the stream needs to be drained.
*/
final class NullOutputStream : OutputStream {
void write(in ubyte[] bytes) {}
void write(InputStream stream, ulong nbytes = 0)
size_t write(in ubyte[] bytes, IOMode) { return bytes.length; }
void write(InputStream stream, ulong nbytes)
{
writeDefault(stream, nbytes);
}
alias write = OutputStream.write;
void flush() {}
void finalize() {}
}