From c6a29e1c3bc41d6cdf7fa8a0400e3afd05ee06b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 23 Jan 2019 15:19:29 +0100 Subject: [PATCH 1/3] Make createChannel safe and add Channel.bufferFill. --- source/vibe/core/channel.d | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index b699811..6995c28 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -54,6 +54,14 @@ struct Channel(T, size_t buffer_size = 100) { /// ditto @property bool empty() shared { return m_impl.empty; } + /** Returns the current count of items in the buffer. + + This function is useful for diagnostic purposes. + */ + @property size_t bufferFill() { return m_impl.bufferFill; } + /// ditto + @property size_t bufferFill() shared { return m_impl.bufferFill; } + /** Closes the channel. A closed channel does not accept any new items enqueued using `put` and @@ -121,7 +129,7 @@ private final class ChannelImpl(T, size_t buffer_size) { } this() - shared { + shared @trusted { m_mutex = cast(shared)new Mutex; m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); } @@ -134,6 +142,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } } + @property size_t bufferFill() + shared { + synchronized (m_mutex) { + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + return thisus.m_items.length; + } + } + void close() shared { synchronized (m_mutex) { @@ -221,7 +237,7 @@ private final class ChannelImpl(T, size_t buffer_size) { } } -unittest { // test basic operation and non-copyable struct compatiblity +@safe unittest { // test basic operation and non-copyable struct compatiblity static struct S { int i; @disable this(this); @@ -250,7 +266,7 @@ unittest { // test basic operation and non-copyable struct compatiblity assert(!ch.tryConsumeOne(v)); } -unittest { // make sure shared(Channel!T) can also be used +@safe unittest { // make sure shared(Channel!T) can also be used shared ch = createChannel!int; ch.put(1); assert(!ch.empty); From 1a463cafc9341831bbf2aecf8cf9ba8b667e549b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 23 Jan 2019 16:11:03 +0100 Subject: [PATCH 2/3] Make TaskCondition.wait/notify/notifyAll nothrow. --- source/vibe/core/sync.d | 57 ++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 724a769..dffc335 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -664,24 +664,34 @@ private void runMutexUnitTests(M)() Note that it is generally not safe to use a `TaskCondition` together with an interruptible mutex type. - See_Also: InterruptibleTaskCondition + See_Also: `InterruptibleTaskCondition` */ final class TaskCondition : core.sync.condition.Condition { @safe: private TaskConditionImpl!(false, Mutex) m_impl; - this(core.sync.mutex.Mutex mtx) { + this(core.sync.mutex.Mutex mtx) + { + assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo, + "TaskCondition can only be used with Mutex or TaskMutex"); + m_impl.setup(mtx); super(mtx); } - override @property Mutex mutex() { return m_impl.mutex; } - override void wait() { m_impl.wait(); } - override bool wait(Duration timeout) { return m_impl.wait(timeout); } - override void notify() { m_impl.notify(); } - override void notifyAll() { m_impl.notifyAll(); } + override @property Mutex mutex() nothrow { return m_impl.mutex; } + override void wait() nothrow { m_impl.wait(); } + override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); } + override void notify() nothrow { m_impl.notify(); } + override void notifyAll() nothrow { m_impl.notifyAll(); } } +unittest { + new TaskCondition(new Mutex); + new TaskCondition(new TaskMutex); +} + + /** This example shows the typical usage pattern using a `while` loop to make sure that the final condition is reached. */ @@ -1631,7 +1641,8 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { private { LOCKABLE m_mutex; - + static if (is(LOCKABLE == Mutex)) + TaskMutex m_taskMutex; shared(ManualEvent) m_signal; } @@ -1653,6 +1664,8 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { void setup(LOCKABLE mtx) { m_mutex = mtx; + static if (is(typeof(m_taskMutex))) + m_taskMutex = cast(TaskMutex)mtx; } @property LOCKABLE mutex() { return m_mutex; } @@ -1665,8 +1678,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { } auto refcount = m_signal.emitCount; - m_mutex.unlock(); - scope(exit) m_mutex.lock(); + + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.unlock(); + else m_mutex.unlock_nothrow(); + } else m_mutex.unlock(); + + scope(exit) { + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.lock(); + else m_mutex.lock_nothrow(); + } else m_mutex.lock(); + } static if (INTERRUPTIBLE) m_signal.wait(refcount); else m_signal.waitUninterruptible(refcount); } @@ -1680,8 +1703,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { } auto refcount = m_signal.emitCount; - m_mutex.unlock(); - scope(exit) m_mutex.lock(); + + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.unlock(); + else m_mutex.unlock_nothrow(); + } else m_mutex.unlock(); + + scope(exit) { + static if (is(LOCKABLE == Mutex)) { + if (m_taskMutex) m_taskMutex.lock(); + else m_mutex.lock_nothrow(); + } else m_mutex.lock(); + } static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount; else return m_signal.waitUninterruptible(timeout, refcount) != refcount; From 92bb067f4b6257e17085a11668459b32cfb973e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 23 Jan 2019 16:14:57 +0100 Subject: [PATCH 3/3] Make Channel.tryConsumeOne/consumeAll/close/empty/bufferFill nothrow. --- source/vibe/core/channel.d | 67 +++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index 6995c28..c8bec6f 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -18,6 +18,8 @@ import core.sync.mutex; // TODO: implement a multi-channel wait, e.g. // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function +// NOTE: not using synchronized (m_mutex) because it is not nothrow + /** Creates a new channel suitable for cross-task and cross-thread communication. */ @@ -135,24 +137,33 @@ private final class ChannelImpl(T, size_t buffer_size) { } @property bool empty() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); return thisus.m_closed && thisus.m_items.empty; } } @property size_t bufferFill() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); return thisus.m_items.length; } } void close() - shared { - synchronized (m_mutex) { + shared nothrow { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); thisus.m_closed = true; thisus.m_condition.notifyAll(); @@ -160,11 +171,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool tryConsumeOne(ref T dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -185,7 +199,10 @@ private final class ChannelImpl(T, size_t buffer_size) { T ret; bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) throw new Exception("Attempt to consume from an empty channel."); thisus.m_condition.wait(); @@ -201,11 +218,14 @@ private final class ChannelImpl(T, size_t buffer_size) { } bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) - shared { + shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + while (thisus.m_items.empty) { if (m_closed) return false; thisus.m_condition.wait(); @@ -225,7 +245,10 @@ private final class ChannelImpl(T, size_t buffer_size) { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool need_notify = false; - synchronized (m_mutex) { + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + enforce(!m_closed, "Sending on closed channel."); while (thisus.m_items.full) thisus.m_condition.wait(); @@ -274,3 +297,23 @@ private final class ChannelImpl(T, size_t buffer_size) { ch.close(); assert(ch.empty); } + +@safe unittest { // ensure nothrow'ness for throwing struct + static struct S { + this(this) { throw new Exception("meh!"); } + } + auto ch = createChannel!S; + ch.put(S.init); + ch.put(S.init); + + S s; + FixedRingBuffer!(S, 100, true) sb; + + () nothrow { + assert(ch.tryConsumeOne(s)); + assert(ch.consumeAll(sb)); + assert(sb.length == 1); + ch.close(); + assert(ch.empty); + } (); +}