From bce39e512c722ad0152abb964f6279a5b2cb111c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 18 Jul 2017 23:10:08 +0200 Subject: [PATCH 1/5] Implement a simple thread-safe cross-task channel. --- source/vibe/core/channel.d | 75 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 source/vibe/core/channel.d diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d new file mode 100644 index 0000000..66eff08 --- /dev/null +++ b/source/vibe/core/channel.d @@ -0,0 +1,75 @@ +/** Implements a thread-safe, typed producer-consumer queue. + + Copyright: © 2017 RejectedSoftware e.K. + Authors: Sönke Ludwig + License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. +*/ +module vibe.core.channel; + +// multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable? +// currently always buffered - TODO: implement blocking non-buffered mode +// TODO: implement a multi-channel wait, e.g. +// TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function +// TODO: implement close() + +private final class Channel(T, size_t buffer_size = 100) { + import vibe.core.concurrency : isWeaklyIsolated; + //static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); + + Mutex m_mutex; + TaskCondition m_condition; + FixedRingBuffer!(T, buffer_size) m_items; + + this() + shared { + m_mutex = cast(shared)new Mutex; + m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); + } + + bool empty() + shared { + synchronized (m_mutex) + return (cast(Channel)this).m_items.empty; + } + + T consumeOne() + shared { + auto thisus = cast(Channel)this; + T ret; + bool was_full = false; + synchronized (m_mutex) { + while (thisus.m_items.empty) + thisus.m_condition.wait(); + was_full = thisus.m_items.full; + swap(thisus.m_items.front, ret); + } + if (was_full) thisus.m_condition.notifyAll(); + return ret.move; + } + + void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + shared { + auto thisus = cast(Channel)this; + bool was_full = false; + synchronized (m_mutex) { + while (thisus.m_items.empty) + thisus.m_condition.wait(); + was_full = thisus.m_items.full; + swap(thisus.m_items, dst); + } + if (was_full) thisus.m_condition.notifyAll(); + } + + void put(T item) + shared { + auto thisus = cast(Channel)this; + bool need_notify = false; + synchronized (m_mutex) { + while (thisus.m_items.full) + thisus.m_condition.wait(); + need_notify = thisus.m_items.empty; + thisus.m_items.put(item.move); + } + if (need_notify) thisus.m_condition.notifyAll(); + } +} From 82936041e446ce8290b94d92a7df48ca22735c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 18 Jul 2017 23:12:26 +0200 Subject: [PATCH 2/5] Add some more notes. --- source/vibe/core/channel.d | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index 66eff08..e66c295 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -11,10 +11,13 @@ module vibe.core.channel; // TODO: implement a multi-channel wait, e.g. // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function // TODO: implement close() +// TODO: fully support non-copyable types using swap/move where appropriate +// TODO: add unit tests +// Q: Should this be exposed as a class, or as an RC struct? private final class Channel(T, size_t buffer_size = 100) { import vibe.core.concurrency : isWeaklyIsolated; - //static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); + static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); Mutex m_mutex; TaskCondition m_condition; From dee54e505a60f1edb6c396a8c4080475ad7b5c6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 14 Jan 2019 22:15:38 +0100 Subject: [PATCH 3/5] Make FixedRingBuffer compatible with non-copyable structs. --- source/vibe/internal/array.d | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/source/vibe/internal/array.d b/source/vibe/internal/array.d index 44fb128..f537018 100644 --- a/source/vibe/internal/array.d +++ b/source/vibe/internal/array.d @@ -361,7 +361,7 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) { m_start = 0; } - void put()(T itm) { assert(m_fill < m_buffer.length); m_buffer[mod(m_start + m_fill++)] = itm; } + void put()(T itm) { assert(m_fill < m_buffer.length); move(itm, m_buffer[mod(m_start + m_fill++)]); } void put(TC : T)(scope TC[] itms) { if( !itms.length ) return; @@ -391,18 +391,18 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) { assert(r.m_start >= m_start && r.m_start < m_buffer.length || r.m_start < mod(m_start+m_fill)); if( r.m_start > m_start ){ foreach(i; r.m_start .. m_buffer.length-1) - m_buffer[i] = m_buffer[i+1]; - m_buffer[$-1] = m_buffer[0]; + move(m_buffer[i+1], m_buffer[i]); + move(m_buffer[0], m_buffer[$-1]); foreach(i; 0 .. mod(m_start + m_fill - 1)) - m_buffer[i] = m_buffer[i+1]; + move(m_buffer[i+1], m_buffer[i]); } else { foreach(i; r.m_start .. mod(m_start + m_fill - 1)) - m_buffer[i] = m_buffer[i+1]; + move(m_buffer[i+1], m_buffer[i]); } } else { assert(r.m_start >= m_start && r.m_start < m_start+m_fill); foreach(i; r.m_start .. m_start+m_fill-1) - m_buffer[i] = m_buffer[i+1]; + move(m_buffer[i+1], m_buffer[i]); } m_fill--; destroy(m_buffer[mod(m_start+m_fill)]); // TODO: only call destroy for non-POD T @@ -422,10 +422,20 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) { if( mod(m_start) >= mod(m_start+dst.length) ){ size_t chunk1 = m_buffer.length - m_start; size_t chunk2 = dst.length - chunk1; - dst[0 .. chunk1] = m_buffer[m_start .. $]; - dst[chunk1 .. $] = m_buffer[0 .. chunk2]; + static if (isCopyable!T) { + dst[0 .. chunk1] = m_buffer[m_start .. $]; + dst[chunk1 .. $] = m_buffer[0 .. chunk2]; + } else { + foreach (i; 0 .. chunk1) move(m_buffer[m_start+i], dst[i]); + foreach (i; chunk1 .. this.length) move(m_buffer[i-chunk1], dst[i]); + } } else { - dst[] = m_buffer[m_start .. m_start+dst.length]; + static if (isCopyable!T) { + dst[] = m_buffer[m_start .. m_start+dst.length]; + } else { + foreach (i; 0 .. dst.length) + move(m_buffer[m_start + i], dst[i]); + } } popFrontN(dst.length); } @@ -507,7 +517,7 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) { @property bool empty() const { return m_length == 0; } - @property inout(T) front() inout { assert(!empty); return m_buffer[m_start]; } + @property ref inout(T) front() inout { assert(!empty); return m_buffer[m_start]; } void popFront() { From f31db98144bb0338ddab5ece76ac6a192db1d541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 14 Jan 2019 22:16:45 +0100 Subject: [PATCH 4/5] Add documentation, basic unit test, createChannel(), close() and tryConsumeOne(). --- source/vibe/core/channel.d | 186 +++++++++++++++++++++++++++++++++---- 1 file changed, 168 insertions(+), 18 deletions(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index e66c295..cb7ed37 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -1,27 +1,103 @@ /** Implements a thread-safe, typed producer-consumer queue. - Copyright: © 2017 RejectedSoftware e.K. + Copyright: © 2017-2019 RejectedSoftware e.K. Authors: Sönke Ludwig License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. */ module vibe.core.channel; +import vibe.core.sync : TaskCondition; +import vibe.internal.array : FixedRingBuffer; + +import std.algorithm.mutation : move, swap; +import std.exception : enforce; +import core.sync.mutex; + // multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable? // currently always buffered - TODO: implement blocking non-buffered mode // TODO: implement a multi-channel wait, e.g. // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function -// TODO: implement close() -// TODO: fully support non-copyable types using swap/move where appropriate -// TODO: add unit tests -// Q: Should this be exposed as a class, or as an RC struct? -private final class Channel(T, size_t buffer_size = 100) { + +/** Creates a new channel suitable for cross-task and cross-thread communication. +*/ +Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)() +{ + Channel!(T, buffer_size) ret; + ret.m_impl = new shared ChannelImpl!(T, buffer_size); + return ret; +} + + +/** Thread-safe typed data channel implementation. + + The implementation supports multiple-reader-multiple-writer operation across + multiple tasks in multiple threads. +*/ +struct Channel(T, size_t buffer_size) { + enum bufferSize = buffer_size; + + private shared ChannelImpl!(T, buffer_size) m_impl; + + /** Determines whether there is more data to read. + + This property is empty $(I iff) no more elements are in the internal + buffer and `close()` has been called. Once the channel is empty, + subsequent calls to `consumeOne` or `consumeAll` will throw an + exception. + + Note that relying on the return value to determine whether another + element can be read is only safe in a single-reader scenario. Use + `tryConsumeOne` in a multiple-reader scenario instead. + */ + @property bool empty() { return m_impl.empty; } + + /** Closes the channel. + + A closed channel does not accept any new items enqueued using `put` and + causes `empty` to return `fals` as soon as all preceeding elements have + been consumed. + */ + void close() { m_impl.close(); } + + /** Consumes a single element off the queue. + + This function will block if no elements are available. If the `empty` + property is `true`, an exception will be thrown. + */ + T consumeOne() { return m_impl.consumeOne(); } + + /** Attempts to consume a single element. + + If no more elements are available and the channel has been closed, + a null value is returned. + */ + bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } + + /** Attempts to consume all elements currently in the queue. + + This function will block if no elements are available. If the `empty` + property is `true`, an exception will be thrown. + */ + void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + { return m_impl.consumeAll(dst); } + + /** Enqueues an element. + */ + void put(T item) { m_impl.put(item.move); } +} + + +private final class ChannelImpl(T, size_t buffer_size) { import vibe.core.concurrency : isWeaklyIsolated; static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); - Mutex m_mutex; - TaskCondition m_condition; - FixedRingBuffer!(T, buffer_size) m_items; + private { + Mutex m_mutex; + TaskCondition m_condition; + FixedRingBuffer!(T, buffer_size) m_items; + bool m_closed = false; + } this() shared { @@ -29,50 +105,124 @@ private final class Channel(T, size_t buffer_size = 100) { m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); } - bool empty() + @property bool empty() shared { - synchronized (m_mutex) - return (cast(Channel)this).m_items.empty; + synchronized (m_mutex) { + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + return thisus.m_closed && thisus.m_items.empty; + } + } + + void close() + shared { + synchronized (m_mutex) { + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + thisus.m_closed = true; + thisus.m_condition.notifyAll(); + } + } + + bool tryConsumeOne(ref T dst) + shared { + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + bool was_full = false; + + synchronized (m_mutex) { + while (thisus.m_items.empty) { + if (m_closed) return false; + thisus.m_condition.wait(); + } + was_full = thisus.m_items.full; + move(thisus.m_items.front, dst); + thisus.m_items.popFront(); + } + + if (was_full) thisus.m_condition.notifyAll(); + + return true; } T consumeOne() shared { - auto thisus = cast(Channel)this; + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); T ret; bool was_full = false; + synchronized (m_mutex) { - while (thisus.m_items.empty) + while (thisus.m_items.empty) { + if (m_closed) throw new Exception("Attempt to consume from an empty channel."); thisus.m_condition.wait(); + } was_full = thisus.m_items.full; - swap(thisus.m_items.front, ret); + move(thisus.m_items.front, ret); + thisus.m_items.popFront(); } + if (was_full) thisus.m_condition.notifyAll(); + return ret.move; } void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared { - auto thisus = cast(Channel)this; + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; + synchronized (m_mutex) { - while (thisus.m_items.empty) + while (thisus.m_items.empty) { + if (m_closed) throw new Exception("Attempt to consume from an empty channel."); thisus.m_condition.wait(); + } + was_full = thisus.m_items.full; swap(thisus.m_items, dst); } + if (was_full) thisus.m_condition.notifyAll(); } void put(T item) shared { - auto thisus = cast(Channel)this; + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool need_notify = false; + synchronized (m_mutex) { + enforce(!m_closed, "Sending on closed channel."); while (thisus.m_items.full) thisus.m_condition.wait(); need_notify = thisus.m_items.empty; thisus.m_items.put(item.move); } + if (need_notify) thisus.m_condition.notifyAll(); } } + +unittest { // test basic operation and non-copyable struct compatiblity + static struct S { + int i; + @disable this(this); + } + + auto ch = createChannel!S; + ch.put(S(1)); + assert(ch.consumeOne().i == 1); + ch.put(S(4)); + ch.put(S(5)); + { + FixedRingBuffer!(S, 100) buf; + ch.consumeAll(buf); + assert(buf.length == 2); + assert(buf[0].i == 4); + assert(buf[1].i == 5); + } + ch.put(S(2)); + assert(!ch.empty); + ch.close(); + assert(!ch.empty); + S v; + assert(ch.tryConsumeOne(v)); + assert(v.i == 2); + assert(ch.empty); + assert(!ch.tryConsumeOne(v)); +} From 3be1de2fdb0dbdcb695c29497f15a79ae9adf2ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 20 Jan 2019 11:57:16 +0100 Subject: [PATCH 5/5] Refine the semantics of consumeAll. --- source/vibe/core/channel.d | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index cb7ed37..3553b9e 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -70,19 +70,25 @@ struct Channel(T, size_t buffer_size) { /** Attempts to consume a single element. If no more elements are available and the channel has been closed, - a null value is returned. + `false` is returned and `dst` is left untouched. */ bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } /** Attempts to consume all elements currently in the queue. - This function will block if no elements are available. If the `empty` - property is `true`, an exception will be thrown. + This function will block if no elements are available. Once at least one + element is available, the contents of `dst` will be replaced with all + available elements. + + If the `empty` property is or becomes `true` before data becomes + avaiable, `dst` will be left untouched and `false` is returned. */ - void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) { return m_impl.consumeAll(dst); } /** Enqueues an element. + + This function may block the the event that the internal buffer is full. */ void put(T item) { m_impl.put(item.move); } } @@ -163,14 +169,14 @@ private final class ChannelImpl(T, size_t buffer_size) { return ret.move; } - void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); bool was_full = false; synchronized (m_mutex) { while (thisus.m_items.empty) { - if (m_closed) throw new Exception("Attempt to consume from an empty channel."); + if (m_closed) return false; thisus.m_condition.wait(); } @@ -179,6 +185,8 @@ private final class ChannelImpl(T, size_t buffer_size) { } if (was_full) thisus.m_condition.notifyAll(); + + return true; } void put(T item)