From bce39e512c722ad0152abb964f6279a5b2cb111c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 18 Jul 2017 23:10:08 +0200 Subject: [PATCH] Implement a simple thread-safe cross-task channel. --- source/vibe/core/channel.d | 75 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 source/vibe/core/channel.d diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d new file mode 100644 index 0000000..66eff08 --- /dev/null +++ b/source/vibe/core/channel.d @@ -0,0 +1,75 @@ +/** Implements a thread-safe, typed producer-consumer queue. + + Copyright: © 2017 RejectedSoftware e.K. + Authors: Sönke Ludwig + License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. +*/ +module vibe.core.channel; + +// multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable? +// currently always buffered - TODO: implement blocking non-buffered mode +// TODO: implement a multi-channel wait, e.g. +// TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function +// TODO: implement close() + +private final class Channel(T, size_t buffer_size = 100) { + import vibe.core.concurrency : isWeaklyIsolated; + //static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); + + Mutex m_mutex; + TaskCondition m_condition; + FixedRingBuffer!(T, buffer_size) m_items; + + this() + shared { + m_mutex = cast(shared)new Mutex; + m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); + } + + bool empty() + shared { + synchronized (m_mutex) + return (cast(Channel)this).m_items.empty; + } + + T consumeOne() + shared { + auto thisus = cast(Channel)this; + T ret; + bool was_full = false; + synchronized (m_mutex) { + while (thisus.m_items.empty) + thisus.m_condition.wait(); + was_full = thisus.m_items.full; + swap(thisus.m_items.front, ret); + } + if (was_full) thisus.m_condition.notifyAll(); + return ret.move; + } + + void consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) + shared { + auto thisus = cast(Channel)this; + bool was_full = false; + synchronized (m_mutex) { + while (thisus.m_items.empty) + thisus.m_condition.wait(); + was_full = thisus.m_items.full; + swap(thisus.m_items, dst); + } + if (was_full) thisus.m_condition.notifyAll(); + } + + void put(T item) + shared { + auto thisus = cast(Channel)this; + bool need_notify = false; + synchronized (m_mutex) { + while (thisus.m_items.full) + thisus.m_condition.wait(); + need_notify = thisus.m_items.empty; + thisus.m_items.put(item.move); + } + if (need_notify) thisus.m_condition.notifyAll(); + } +}