Refine the semantics of consumeAll.

This commit is contained in:
Sönke Ludwig 2019-01-20 11:57:16 +01:00
parent f31db98144
commit 3be1de2fdb

View file

@ -70,19 +70,25 @@ struct Channel(T, size_t buffer_size) {
/** Attempts to consume a single element. /** Attempts to consume a single element.
If no more elements are available and the channel has been closed, If no more elements are available and the channel has been closed,
a null value is returned. `false` is returned and `dst` is left untouched.
*/ */
bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); }
/** Attempts to consume all elements currently in the queue. /** Attempts to consume all elements currently in the queue.
This function will block if no elements are available. If the `empty` This function will block if no elements are available. Once at least one
property is `true`, an exception will be thrown. element is available, the contents of `dst` will be replaced with all
available elements.
If the `empty` property is or becomes `true` before data becomes
avaiable, `dst` will be left untouched and `false` is returned.
*/ */
void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
{ return m_impl.consumeAll(dst); } { return m_impl.consumeAll(dst); }
/** Enqueues an element. /** Enqueues an element.
This function may block the the event that the internal buffer is full.
*/ */
void put(T item) { m_impl.put(item.move); } void put(T item) { m_impl.put(item.move); }
} }
@ -163,14 +169,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
return ret.move; return ret.move;
} }
void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
shared { shared {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false; bool was_full = false;
synchronized (m_mutex) { synchronized (m_mutex) {
while (thisus.m_items.empty) { while (thisus.m_items.empty) {
if (m_closed) throw new Exception("Attempt to consume from an empty channel."); if (m_closed) return false;
thisus.m_condition.wait(); thisus.m_condition.wait();
} }
@ -179,6 +185,8 @@ private final class ChannelImpl(T, size_t buffer_size) {
} }
if (was_full) thisus.m_condition.notifyAll(); if (was_full) thisus.m_condition.notifyAll();
return true;
} }
void put(T item) void put(T item)