diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index 6995c28..c8bec6f 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -18,6 +18,8 @@ import core.sync.mutex; // TODO: implement a multi-channel wait, e.g. // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function +// NOTE: not using synchronized (m_mutex) because it is not nothrow + /** Creates a new channel suitable for cross-task and cross-thread communication. */ @@ -135,24 +137,33 @@ private final class ChannelImpl(T, size_t buffer_size) { } @property bool empty() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); return thisus.m_closed && thisus.m_items.empty; } } @property size_t bufferFill() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); return thisus.m_items.length; } } void close() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); thisus.m_closed = true; thisus.m_condition.notifyAll(); @@ -160,11 +171,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool tryConsumeOne(ref T dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -185,7 +199,10 @@ private final class ChannelImpl(T, size_t buffer_size) { T ret; bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) throw new Exception("Attempt to consume from an empty channel."); thisus.m_condition.wait(); @@ -201,11 +218,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -225,7 +245,10 @@ private final class ChannelImpl(T, size_t buffer_size) { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool need_notify = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + enforce(!m_closed, "Sending on closed channel."); while (thisus.m_items.full) thisus.m_condition.wait(); @@ -274,3 +297,23 @@ private final class ChannelImpl(T, size_t buffer_size) { ch.close(); assert(ch.empty); } + +@safe unittest { // ensure nothrow'ness for throwing struct + static struct S { + this(this) { throw new Exception("meh!"); } + } + auto ch = createChannel!S; + ch.put(S.init); + ch.put(S.init); + + S s; + FixedRingBuffer!(S, 100, true) sb; + + () nothrow { + assert(ch.tryConsumeOne(s)); + assert(ch.consumeAll(sb)); + assert(sb.length == 1); + ch.close(); + assert(ch.empty); + } (); +}