From b6ed92e8b585d78280ebb86eb6b4e8eb1e13b08e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 12 Jan 2021 10:02:40 +0100 Subject: [PATCH] Introduce ChannelConfig/ChannelPriority. Adds a low-overhead mode to Channel!T that causes the buffer to be fully processed before notifying waiting peers instead of notifying immediately once data/space is available. This heavily reduces the overhead of cross-task/thread notifications at the expense of introducing processing latency and requiring a call to close() to guarantee that all data has been processed. --- source/vibe/core/channel.d | 121 ++++++++++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index de21fcf..1b6951d 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -23,13 +23,36 @@ import core.sync.mutex; /** Creates a new channel suitable for cross-task and cross-thread communication. */ -Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)() +Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)(ChannelConfig config = ChannelConfig.init) { Channel!(T, buffer_size) ret; - ret.m_impl = new shared ChannelImpl!(T, buffer_size); + ret.m_impl = new shared ChannelImpl!(T, buffer_size)(config); return ret; } +struct ChannelConfig { + ChannelPriority priority = ChannelPriority.latency; +} + +enum ChannelPriority { + /** Minimize latency + + Triggers readers immediately once data is available and triggers writers + as soon as the queue has space. + */ + latency, + + /** Minimize overhead. + + Triggers readers once the queue is full and triggers writers once the + queue is empty in order to maximize batch sizes and minimize + synchronization overhead. + + Note that in this mode it is necessary to close the channel to ensure + that the buffered data is fully processed. + */ + overhead +} /** Thread-safe typed data channel implementation. @@ -132,12 +155,14 @@ private final class ChannelImpl(T, size_t buffer_size) { TaskCondition m_condition; FixedRingBuffer!(T, buffer_size) m_items; bool m_closed = false; + ChannelConfig m_config; } - this() + this(ChannelConfig config) shared @trusted { m_mutex = cast(shared)new Mutex; m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); + m_config = config; } @property bool empty() @@ -183,7 +208,7 @@ private final class ChannelImpl(T, size_t buffer_size) { bool tryConsumeOne(ref T dst) shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); - bool was_full = false; + bool need_notify = false; { m_mutex.lock_nothrow(); @@ -193,44 +218,39 @@ private final class ChannelImpl(T, size_t buffer_size) { if (m_closed) return false; thisus.m_condition.wait(); } - was_full = thisus.m_items.full; + + if (m_config.priority == ChannelPriority.latency) + need_notify = thisus.m_items.full; + move(thisus.m_items.front, dst); thisus.m_items.popFront(); + + if (m_config.priority == ChannelPriority.overhead) + need_notify = thisus.m_items.empty; } - if (was_full) thisus.m_condition.notify(); + if (need_notify) { + if (m_config.priority == ChannelPriority.overhead) + thisus.m_condition.notifyAll(); + else + thisus.m_condition.notify(); + } return true; } T consumeOne() shared { - auto thisus = () @trusted { return cast(ChannelImpl)this; } (); T ret; - bool was_full = false; - - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - while (thisus.m_items.empty) { - if (m_closed) throw new Exception("Attempt to consume from an empty channel."); - thisus.m_condition.wait(); - } - was_full = thisus.m_items.full; - move(thisus.m_items.front, ret); - thisus.m_items.popFront(); - } - - if (was_full) thisus.m_condition.notify(); - - return ret.move; + if (!tryConsumeOne(ret)) + throw new Exception("Attempt to consume from an empty channel."); + return ret; } bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared nothrow { auto thisus = () @trusted { return cast(ChannelImpl)this; } (); - bool was_full = false; + bool need_notify = false; { m_mutex.lock_nothrow(); @@ -241,11 +261,20 @@ private final class ChannelImpl(T, size_t buffer_size) { thisus.m_condition.wait(); } - was_full = thisus.m_items.full; + if (m_config.priority == ChannelPriority.latency) + need_notify = thisus.m_items.full; + swap(thisus.m_items, dst); + + if (m_config.priority == ChannelPriority.overhead) + need_notify = true; } - if (was_full) thisus.m_condition.notify(); + if (need_notify) { + if (m_config.priority == ChannelPriority.overhead) + thisus.m_condition.notifyAll(); + else thisus.m_condition.notify(); + } return true; } @@ -262,11 +291,18 @@ private final class ChannelImpl(T, size_t buffer_size) { enforce(!m_closed, "Sending on closed channel."); while (thisus.m_items.full) thisus.m_condition.wait(); - need_notify = thisus.m_items.empty; + if (m_config.priority == ChannelPriority.latency) + need_notify = thisus.m_items.empty; thisus.m_items.put(item.move); + if (m_config.priority == ChannelPriority.overhead) + need_notify = thisus.m_items.full; } - if (need_notify) thisus.m_condition.notify(); + if (need_notify) { + if (m_config.priority == ChannelPriority.overhead) + thisus.m_condition.notifyAll(); + else thisus.m_condition.notify(); + } } } @@ -327,3 +363,28 @@ private final class ChannelImpl(T, size_t buffer_size) { assert(ch.empty); } (); } + +unittest { + import std.traits : EnumMembers; + import vibe.core.core : runTask; + + void test(ChannelPriority prio) + { + auto ch = createChannel!int(ChannelConfig(prio)); + runTask({ + ch.put(1); + ch.put(2); + ch.put(3); + ch.close(); + }); + + int i; + assert(ch.tryConsumeOne(i) && i == 1); + assert(ch.tryConsumeOne(i) && i == 2); + assert(ch.tryConsumeOne(i) && i == 3); + assert(!ch.tryConsumeOne(i)); + } + + foreach (m; EnumMembers!ChannelPriority) + test(m); +}