diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index cb7ed37..3553b9e 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -70,19 +70,25 @@ struct Channel(T, size_t buffer_size) { /** Attempts to consume a single element. If no more elements are available and the channel has been closed, - a null value is returned. + `false` is returned and `dst` is left untouched. */ bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } /** Attempts to consume all elements currently in the queue. - This function will block if no elements are available. If the `empty` - property is `true`, an exception will be thrown. + This function will block if no elements are available. Once at least one + element is available, the contents of `dst` will be replaced with all + available elements. + + If the `empty` property is or becomes `true` before data becomes + avaiable, `dst` will be left untouched and `false` is returned. */ - void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) { return m_impl.consumeAll(dst); } /** Enqueues an element. + + This function may block the the event that the internal buffer is full. */ void put(T item) { m_impl.put(item.move); } } @@ -163,14 +169,14 @@ private final class ChannelImpl(T, size_t buffer_size) { return ret.move; } - void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; synchronized (m_mutex) { while (thisus.m_items.empty) { - if (m_closed) throw new Exception("Attempt to consume from an empty channel."); + if (m_closed) return false; thisus.m_condition.wait(); } @@ -179,6 +185,8 @@ private final class ChannelImpl(T, size_t buffer_size) { } if (was_full) thisus.m_condition.notifyAll(); + + return true; } void put(T item)