Make Channel.tryConsumeOne/consumeAll/close/empty/bufferFill nothrow.

This commit is contained in:
Sönke Ludwig 2019-01-23 16:14:57 +01:00
parent 1a463cafc9
commit 92bb067f4b

View file

@ -18,6 +18,8 @@ import core.sync.mutex;
// TODO: implement a multi-channel wait, e.g. // TODO: implement a multi-channel wait, e.g.
// TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function
// NOTE: not using synchronized (m_mutex) because it is not nothrow
/** Creates a new channel suitable for cross-task and cross-thread communication. /** Creates a new channel suitable for cross-task and cross-thread communication.
*/ */
@ -135,24 +137,33 @@ private final class ChannelImpl(T, size_t buffer_size) {
} }
@property bool empty() @property bool empty()
shared { shared nothrow {
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
return thisus.m_closed && thisus.m_items.empty; return thisus.m_closed && thisus.m_items.empty;
} }
} }
@property size_t bufferFill() @property size_t bufferFill()
shared { shared nothrow {
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
return thisus.m_items.length; return thisus.m_items.length;
} }
} }
void close() void close()
shared { shared nothrow {
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
thisus.m_closed = true; thisus.m_closed = true;
thisus.m_condition.notifyAll(); thisus.m_condition.notifyAll();
@ -160,11 +171,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
} }
bool tryConsumeOne(ref T dst) bool tryConsumeOne(ref T dst)
shared { shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false; bool was_full = false;
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
while (thisus.m_items.empty) { while (thisus.m_items.empty) {
if (m_closed) return false; if (m_closed) return false;
thisus.m_condition.wait(); thisus.m_condition.wait();
@ -185,7 +199,10 @@ private final class ChannelImpl(T, size_t buffer_size) {
T ret; T ret;
bool was_full = false; bool was_full = false;
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
while (thisus.m_items.empty) { while (thisus.m_items.empty) {
if (m_closed) throw new Exception("Attempt to consume from an empty channel."); if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
thisus.m_condition.wait(); thisus.m_condition.wait();
@ -201,11 +218,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
} }
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
shared { shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false; bool was_full = false;
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
while (thisus.m_items.empty) { while (thisus.m_items.empty) {
if (m_closed) return false; if (m_closed) return false;
thisus.m_condition.wait(); thisus.m_condition.wait();
@ -225,7 +245,10 @@ private final class ChannelImpl(T, size_t buffer_size) {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool need_notify = false; bool need_notify = false;
synchronized (m_mutex) { {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
enforce(!m_closed, "Sending on closed channel."); enforce(!m_closed, "Sending on closed channel.");
while (thisus.m_items.full) while (thisus.m_items.full)
thisus.m_condition.wait(); thisus.m_condition.wait();
@ -274,3 +297,23 @@ private final class ChannelImpl(T, size_t buffer_size) {
ch.close(); ch.close();
assert(ch.empty); assert(ch.empty);
} }
@safe unittest { // ensure nothrow'ness for throwing struct
static struct S {
this(this) { throw new Exception("meh!"); }
}
auto ch = createChannel!S;
ch.put(S.init);
ch.put(S.init);
S s;
FixedRingBuffer!(S, 100, true) sb;
() nothrow {
assert(ch.tryConsumeOne(s));
assert(ch.consumeAll(sb));
assert(sb.length == 1);
ch.close();
assert(ch.empty);
} ();
}