diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index b699811..c8bec6f 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -18,6 +18,8 @@ import core.sync.mutex; // TODO: implement a multi-channel wait, e.g. // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function +// NOTE: not using synchronized (m_mutex) because it is not nothrow + /** Creates a new channel suitable for cross-task and cross-thread communication. */ @@ -54,6 +56,14 @@ struct Channel(T, size_t buffer_size = 100) { /// ditto @property bool empty() shared { return m_impl.empty; } + /** Returns the current count of items in the buffer. + + This function is useful for diagnostic purposes. + */ + @property size_t bufferFill() { return m_impl.bufferFill; } + /// ditto + @property size_t bufferFill() shared { return m_impl.bufferFill; } + /** Closes the channel. A closed channel does not accept any new items enqueued using `put` and @@ -121,22 +131,39 @@ private final class ChannelImpl(T, size_t buffer_size) { } this() - shared { + shared @trusted { m_mutex = cast(shared)new Mutex; m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); } @property bool empty() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); return thisus.m_closed && thisus.m_items.empty; } } + @property size_t bufferFill() + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + return thisus.m_items.length; + } + } + void close() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); thisus.m_closed = true; thisus.m_condition.notifyAll(); @@ -144,11 +171,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool tryConsumeOne(ref T dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -169,7 +199,10 @@ private final class ChannelImpl(T, size_t buffer_size) { T ret; bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) throw new Exception("Attempt to consume from an empty channel."); thisus.m_condition.wait(); @@ -185,11 +218,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -209,7 +245,10 @@ private final class ChannelImpl(T, size_t buffer_size) { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool need_notify = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + enforce(!m_closed, "Sending on closed channel."); while (thisus.m_items.full) thisus.m_condition.wait(); @@ -221,7 +260,7 @@ private final class ChannelImpl(T, size_t buffer_size) { } } -unittest { // test basic operation and non-copyable struct compatiblity +@safe unittest { // test basic operation and non-copyable struct compatiblity static struct S { int i; @disable this(this); @@ -250,7 +289,7 @@ unittest { // test basic operation and non-copyable struct compatiblity assert(!ch.tryConsumeOne(v)); } -unittest { // make sure shared(Channel!T) can also be used +@safe unittest { // make sure shared(Channel!T) can also be used shared ch = createChannel!int; ch.put(1); assert(!ch.empty); @@ -258,3 +297,23 @@ unittest { // make sure shared(Channel!T) can also be used ch.close(); assert(ch.empty); } + +@safe unittest { // ensure nothrow'ness for throwing struct + static struct S { + this(this) { throw new Exception("meh!"); } + } + auto ch = createChannel!S; + ch.put(S.init); + ch.put(S.init); + + S s; + FixedRingBuffer!(S, 100, true) sb; + + () nothrow { + assert(ch.tryConsumeOne(s)); + assert(ch.consumeAll(sb)); + assert(sb.length == 1); + ch.close(); + assert(ch.empty); + } (); +} diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 724a769..dffc335 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -664,24 +664,34 @@ private void runMutexUnitTests(M)() Note that it is generally not safe to use a `TaskCondition` together with an interruptible mutex type. - See_Also: InterruptibleTaskCondition + See_Also: `InterruptibleTaskCondition` */ final class TaskCondition : core.sync.condition.Condition { @safe: private TaskConditionImpl!(false, Mutex) m_impl; - this(core.sync.mutex.Mutex mtx) { + this(core.sync.mutex.Mutex mtx) + { + assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo, + "TaskCondition can only be used with Mutex or TaskMutex"); + m_impl.setup(mtx); super(mtx); } - override @property Mutex mutex() { return m_impl.mutex; } - override void wait() { m_impl.wait(); } - override bool wait(Duration timeout) { return m_impl.wait(timeout); } - override void notify() { m_impl.notify(); } - override void notifyAll() { m_impl.notifyAll(); } + override @property Mutex mutex() nothrow { return m_impl.mutex; } + override void wait() nothrow { m_impl.wait(); } + override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); } + override void notify() nothrow { m_impl.notify(); } + override void notifyAll() nothrow { m_impl.notifyAll(); } } +unittest { + new TaskCondition(new Mutex); + new TaskCondition(new TaskMutex); +} + + /** This example shows the typical usage pattern using a `while` loop to make sure that the final condition is reached. */ @@ -1631,7 +1641,8 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { private { LOCKABLE m_mutex; - + static if (is(LOCKABLE == Mutex)) + TaskMutex m_taskMutex; shared(ManualEvent) m_signal; } @@ -1653,6 +1664,8 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { void setup(LOCKABLE mtx) { m_mutex = mtx; + static if (is(typeof(m_taskMutex))) + m_taskMutex = cast(TaskMutex)mtx; } @property LOCKABLE mutex() { return m_mutex; } @@ -1665,8 +1678,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { } auto refcount = m_signal.emitCount; - m_mutex.unlock(); - scope(exit) m_mutex.lock(); + + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.unlock(); + else m_mutex.unlock_nothrow(); + } else m_mutex.unlock(); + + scope(exit) { + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.lock(); + else m_mutex.lock_nothrow(); + } else m_mutex.lock(); + } static if (INTERRUPTIBLE) m_signal.wait(refcount); else m_signal.waitUninterruptible(refcount); } @@ -1680,8 +1703,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { } auto refcount = m_signal.emitCount; - m_mutex.unlock(); - scope(exit) m_mutex.lock(); + + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.unlock(); + else m_mutex.unlock_nothrow(); + } else m_mutex.unlock(); + + scope(exit) { + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.lock(); + else m_mutex.lock_nothrow(); + } else m_mutex.lock(); + } static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount; else return m_signal.waitUninterruptible(timeout, refcount) != refcount;