Introduce ChannelConfig/ChannelPriority.

Adds a low-overhead mode to Channel!T that causes the buffer to be fully processed before notifying waiting peers instead of notifying immediately once data/space is available. This heavily reduces the overhead of cross-task/thread notifications at the expense of introducing processing latency and requiring a call to close() to guarantee that all data has been processed.
This commit is contained in:
Sönke Ludwig 2021-01-12 10:02:40 +01:00
parent 9980eae7a5
commit b6ed92e8b5

View file

@ -23,13 +23,36 @@ import core.sync.mutex;
/** Creates a new channel suitable for cross-task and cross-thread communication.
*/
Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)()
Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)(ChannelConfig config = ChannelConfig.init)
{
Channel!(T, buffer_size) ret;
ret.m_impl = new shared ChannelImpl!(T, buffer_size);
ret.m_impl = new shared ChannelImpl!(T, buffer_size)(config);
return ret;
}
struct ChannelConfig {
ChannelPriority priority = ChannelPriority.latency;
}
enum ChannelPriority {
/** Minimize latency
Triggers readers immediately once data is available and triggers writers
as soon as the queue has space.
*/
latency,
/** Minimize overhead.
Triggers readers once the queue is full and triggers writers once the
queue is empty in order to maximize batch sizes and minimize
synchronization overhead.
Note that in this mode it is necessary to close the channel to ensure
that the buffered data is fully processed.
*/
overhead
}
/** Thread-safe typed data channel implementation.
@ -132,12 +155,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
TaskCondition m_condition;
FixedRingBuffer!(T, buffer_size) m_items;
bool m_closed = false;
ChannelConfig m_config;
}
this()
this(ChannelConfig config)
shared @trusted {
m_mutex = cast(shared)new Mutex;
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
m_config = config;
}
@property bool empty()
@ -183,7 +208,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
bool tryConsumeOne(ref T dst)
shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false;
bool need_notify = false;
{
m_mutex.lock_nothrow();
@ -193,44 +218,39 @@ private final class ChannelImpl(T, size_t buffer_size) {
if (m_closed) return false;
thisus.m_condition.wait();
}
was_full = thisus.m_items.full;
if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.full;
move(thisus.m_items.front, dst);
thisus.m_items.popFront();
if (m_config.priority == ChannelPriority.overhead)
need_notify = thisus.m_items.empty;
}
if (was_full) thisus.m_condition.notify();
if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else
thisus.m_condition.notify();
}
return true;
}
T consumeOne()
shared {
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
T ret;
bool was_full = false;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
while (thisus.m_items.empty) {
if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
thisus.m_condition.wait();
}
was_full = thisus.m_items.full;
move(thisus.m_items.front, ret);
thisus.m_items.popFront();
}
if (was_full) thisus.m_condition.notify();
return ret.move;
if (!tryConsumeOne(ret))
throw new Exception("Attempt to consume from an empty channel.");
return ret;
}
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false;
bool need_notify = false;
{
m_mutex.lock_nothrow();
@ -241,11 +261,20 @@ private final class ChannelImpl(T, size_t buffer_size) {
thisus.m_condition.wait();
}
was_full = thisus.m_items.full;
if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.full;
swap(thisus.m_items, dst);
if (m_config.priority == ChannelPriority.overhead)
need_notify = true;
}
if (was_full) thisus.m_condition.notify();
if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else thisus.m_condition.notify();
}
return true;
}
@ -262,11 +291,18 @@ private final class ChannelImpl(T, size_t buffer_size) {
enforce(!m_closed, "Sending on closed channel.");
while (thisus.m_items.full)
thisus.m_condition.wait();
need_notify = thisus.m_items.empty;
if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.empty;
thisus.m_items.put(item.move);
if (m_config.priority == ChannelPriority.overhead)
need_notify = thisus.m_items.full;
}
if (need_notify) thisus.m_condition.notify();
if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else thisus.m_condition.notify();
}
}
}
@ -327,3 +363,28 @@ private final class ChannelImpl(T, size_t buffer_size) {
assert(ch.empty);
} ();
}
unittest {
import std.traits : EnumMembers;
import vibe.core.core : runTask;
void test(ChannelPriority prio)
{
auto ch = createChannel!int(ChannelConfig(prio));
runTask({
ch.put(1);
ch.put(2);
ch.put(3);
ch.close();
});
int i;
assert(ch.tryConsumeOne(i) && i == 1);
assert(ch.tryConsumeOne(i) && i == 2);
assert(ch.tryConsumeOne(i) && i == 3);
assert(!ch.tryConsumeOne(i));
}
foreach (m; EnumMembers!ChannelPriority)
test(m);
}