Merge pull request #163 from vibe-d/issue157-empty-consumeOne-on-closed-channel

Fix empty-consumeOne channel usage pattern for a single consumer
This commit is contained in:
Leonid Kramer 2019-06-20 09:31:13 +02:00 committed by GitHub
commit c39fdb2208
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 3 deletions

View file

@ -41,7 +41,7 @@ struct Channel(T, size_t buffer_size = 100) {
private shared ChannelImpl!(T, buffer_size) m_impl; private shared ChannelImpl!(T, buffer_size) m_impl;
/** Determines whether there is more data to read. /** Determines whether there is more data to read in a single-reader scenario.
This property is empty $(I iff) no more elements are in the internal This property is empty $(I iff) no more elements are in the internal
buffer and `close()` has been called. Once the channel is empty, buffer and `close()` has been called. Once the channel is empty,
@ -49,8 +49,8 @@ struct Channel(T, size_t buffer_size = 100) {
exception. exception.
Note that relying on the return value to determine whether another Note that relying on the return value to determine whether another
element can be read is only safe in a single-reader scenario. Use element can be read is only safe in a single-reader scenario. It is
`tryConsumeOne` in a multiple-reader scenario instead. generally recommended to use `tryConsumeOne` instead.
*/ */
@property bool empty() { return m_impl.empty; } @property bool empty() { return m_impl.empty; }
/// ditto /// ditto
@ -78,6 +78,10 @@ struct Channel(T, size_t buffer_size = 100) {
This function will block if no elements are available. If the `empty` This function will block if no elements are available. If the `empty`
property is `true`, an exception will be thrown. property is `true`, an exception will be thrown.
Note that it is recommended to use `tryConsumeOne` instead of a
combination of `empty` and `consumeOne` due to being more efficient and
also being reliable in a multiple-reader scenario.
*/ */
T consumeOne() { return m_impl.consumeOne(); } T consumeOne() { return m_impl.consumeOne(); }
/// ditto /// ditto
@ -143,6 +147,12 @@ private final class ChannelImpl(T, size_t buffer_size) {
scope (exit) m_mutex.unlock_nothrow(); scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
// ensure that in a single-reader scenario !empty guarantees a
// successful call to consumeOne
while (!thisus.m_closed && thisus.m_items.empty)
thisus.m_condition.wait();
return thisus.m_closed && thisus.m_items.empty; return thisus.m_closed && thisus.m_items.empty;
} }
} }

View file

@ -0,0 +1,29 @@
/+ dub.sdl:
name "tests"
dependency "vibe-core" path=".."
+/
module tests;
import vibe.core.channel;
import vibe.core.core;
import core.time;
void main()
{
auto ch = createChannel!int();
auto p = runTask({
sleep(1.seconds);
ch.close();
});
auto c = runTask({
while (!ch.empty) {
try ch.consumeOne();
catch (Exception e) assert(false, e.msg);
}
});
p.join();
c.join();
}