Make createChannel safe and add Channel.bufferFill.

This commit is contained in:
Sönke Ludwig 2019-01-23 15:19:29 +01:00
parent 5437b9ecb6
commit c6a29e1c3b

View file

@ -54,6 +54,14 @@ struct Channel(T, size_t buffer_size = 100) {
/// ditto
@property bool empty() shared { return m_impl.empty; }
/** Returns the current count of items in the buffer.
This function is useful for diagnostic purposes.
*/
@property size_t bufferFill() { return m_impl.bufferFill; }
/// ditto
@property size_t bufferFill() shared { return m_impl.bufferFill; }
/** Closes the channel.
A closed channel does not accept any new items enqueued using `put` and
@ -121,7 +129,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
}
this()
shared {
shared @trusted {
m_mutex = cast(shared)new Mutex;
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
}
@ -134,6 +142,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
}
}
@property size_t bufferFill()
shared {
synchronized (m_mutex) {
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
return thisus.m_items.length;
}
}
void close()
shared {
synchronized (m_mutex) {
@ -221,7 +237,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
}
}
unittest { // test basic operation and non-copyable struct compatiblity
@safe unittest { // test basic operation and non-copyable struct compatiblity
static struct S {
int i;
@disable this(this);
@ -250,7 +266,7 @@ unittest { // test basic operation and non-copyable struct compatiblity
assert(!ch.tryConsumeOne(v));
}
unittest { // make sure shared(Channel!T) can also be used
@safe unittest { // make sure shared(Channel!T) can also be used
shared ch = createChannel!int;
ch.put(1);
assert(!ch.empty);