Improve convenience of the Channel API.

- allows all methods to be called on a `shared(Channel!T)` instance.
- `Channel` pre-defines the `buffer_size` argument to 100, matching `createChannel`
This commit is contained in:
Sönke Ludwig 2019-01-22 10:49:57 +01:00
parent 9b00bd7b4b
commit 857be9459e

View file

@ -34,7 +34,7 @@ Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)()
The implementation supports multiple-reader-multiple-writer operation across The implementation supports multiple-reader-multiple-writer operation across
multiple tasks in multiple threads. multiple tasks in multiple threads.
*/ */
struct Channel(T, size_t buffer_size) { struct Channel(T, size_t buffer_size = 100) {
enum bufferSize = buffer_size; enum bufferSize = buffer_size;
private shared ChannelImpl!(T, buffer_size) m_impl; private shared ChannelImpl!(T, buffer_size) m_impl;
@ -51,6 +51,8 @@ struct Channel(T, size_t buffer_size) {
`tryConsumeOne` in a multiple-reader scenario instead. `tryConsumeOne` in a multiple-reader scenario instead.
*/ */
@property bool empty() { return m_impl.empty; } @property bool empty() { return m_impl.empty; }
/// ditto
@property bool empty() shared { return m_impl.empty; }
/** Closes the channel. /** Closes the channel.
@ -59,6 +61,8 @@ struct Channel(T, size_t buffer_size) {
been consumed. been consumed.
*/ */
void close() { m_impl.close(); } void close() { m_impl.close(); }
/// ditto
void close() shared { m_impl.close(); }
/** Consumes a single element off the queue. /** Consumes a single element off the queue.
@ -66,6 +70,8 @@ struct Channel(T, size_t buffer_size) {
property is `true`, an exception will be thrown. property is `true`, an exception will be thrown.
*/ */
T consumeOne() { return m_impl.consumeOne(); } T consumeOne() { return m_impl.consumeOne(); }
/// ditto
T consumeOne() shared { return m_impl.consumeOne(); }
/** Attempts to consume a single element. /** Attempts to consume a single element.
@ -73,6 +79,8 @@ struct Channel(T, size_t buffer_size) {
`false` is returned and `dst` is left untouched. `false` is returned and `dst` is left untouched.
*/ */
bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); }
/// ditto
bool tryConsumeOne(ref T dst) shared { return m_impl.tryConsumeOne(dst); }
/** Attempts to consume all elements currently in the queue. /** Attempts to consume all elements currently in the queue.
@ -86,12 +94,18 @@ struct Channel(T, size_t buffer_size) {
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
in { assert(dst.empty); } in { assert(dst.empty); }
body { return m_impl.consumeAll(dst); } body { return m_impl.consumeAll(dst); }
/// ditto
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared
in { assert(dst.empty); }
body { return m_impl.consumeAll(dst); }
/** Enqueues an element. /** Enqueues an element.
This function may block the the event that the internal buffer is full. This function may block the the event that the internal buffer is full.
*/ */
void put(T item) { m_impl.put(item.move); } void put(T item) { m_impl.put(item.move); }
/// ditto
void put(T item) shared { m_impl.put(item.move); }
} }
@ -235,3 +249,12 @@ unittest { // test basic operation and non-copyable struct compatiblity
assert(ch.empty); assert(ch.empty);
assert(!ch.tryConsumeOne(v)); assert(!ch.tryConsumeOne(v));
} }
unittest { // make sure shared(Channel!T) can also be used
shared ch = createChannel!int;
ch.put(1);
assert(!ch.empty);
assert(ch.consumeOne == 1);
ch.close();
assert(ch.empty);
}