Merge pull request #241 from vibe-d/low_overhead_channel

Introduce ChannelConfig/ChannelPriority.
This commit is contained in:
Leonid Kramer 2021-01-12 11:30:03 +01:00 committed by GitHub
commit 6b6504b8c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 32 deletions

View file

@ -23,13 +23,36 @@ import core.sync.mutex;
/** Creates a new channel suitable for cross-task and cross-thread communication. /** Creates a new channel suitable for cross-task and cross-thread communication.
*/ */
Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)() Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)(ChannelConfig config = ChannelConfig.init)
{ {
Channel!(T, buffer_size) ret; Channel!(T, buffer_size) ret;
ret.m_impl = new shared ChannelImpl!(T, buffer_size); ret.m_impl = new shared ChannelImpl!(T, buffer_size)(config);
return ret; return ret;
} }
struct ChannelConfig {
ChannelPriority priority = ChannelPriority.latency;
}
enum ChannelPriority {
/** Minimize latency
Triggers readers immediately once data is available and triggers writers
as soon as the queue has space.
*/
latency,
/** Minimize overhead.
Triggers readers once the queue is full and triggers writers once the
queue is empty in order to maximize batch sizes and minimize
synchronization overhead.
Note that in this mode it is necessary to close the channel to ensure
that the buffered data is fully processed.
*/
overhead
}
/** Thread-safe typed data channel implementation. /** Thread-safe typed data channel implementation.
@ -132,12 +155,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
TaskCondition m_condition; TaskCondition m_condition;
FixedRingBuffer!(T, buffer_size) m_items; FixedRingBuffer!(T, buffer_size) m_items;
bool m_closed = false; bool m_closed = false;
ChannelConfig m_config;
} }
this() this(ChannelConfig config)
shared @trusted { shared @trusted {
m_mutex = cast(shared)new Mutex; m_mutex = cast(shared)new Mutex;
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
m_config = config;
} }
@property bool empty() @property bool empty()
@ -183,7 +208,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
bool tryConsumeOne(ref T dst) bool tryConsumeOne(ref T dst)
shared nothrow { shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false; bool need_notify = false;
{ {
m_mutex.lock_nothrow(); m_mutex.lock_nothrow();
@ -193,44 +218,39 @@ private final class ChannelImpl(T, size_t buffer_size) {
if (m_closed) return false; if (m_closed) return false;
thisus.m_condition.wait(); thisus.m_condition.wait();
} }
was_full = thisus.m_items.full;
if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.full;
move(thisus.m_items.front, dst); move(thisus.m_items.front, dst);
thisus.m_items.popFront(); thisus.m_items.popFront();
if (m_config.priority == ChannelPriority.overhead)
need_notify = thisus.m_items.empty;
} }
if (was_full) thisus.m_condition.notify(); if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else
thisus.m_condition.notify();
}
return true; return true;
} }
T consumeOne() T consumeOne()
shared { shared {
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
T ret; T ret;
bool was_full = false; if (!tryConsumeOne(ret))
throw new Exception("Attempt to consume from an empty channel.");
{ return ret;
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
while (thisus.m_items.empty) {
if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
thisus.m_condition.wait();
}
was_full = thisus.m_items.full;
move(thisus.m_items.front, ret);
thisus.m_items.popFront();
}
if (was_full) thisus.m_condition.notify();
return ret.move;
} }
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
shared nothrow { shared nothrow {
auto thisus = () @trusted { return cast(ChannelImpl)this; } (); auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
bool was_full = false; bool need_notify = false;
{ {
m_mutex.lock_nothrow(); m_mutex.lock_nothrow();
@ -241,11 +261,20 @@ private final class ChannelImpl(T, size_t buffer_size) {
thisus.m_condition.wait(); thisus.m_condition.wait();
} }
was_full = thisus.m_items.full; if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.full;
swap(thisus.m_items, dst); swap(thisus.m_items, dst);
if (m_config.priority == ChannelPriority.overhead)
need_notify = true;
} }
if (was_full) thisus.m_condition.notify(); if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else thisus.m_condition.notify();
}
return true; return true;
} }
@ -262,11 +291,18 @@ private final class ChannelImpl(T, size_t buffer_size) {
enforce(!m_closed, "Sending on closed channel."); enforce(!m_closed, "Sending on closed channel.");
while (thisus.m_items.full) while (thisus.m_items.full)
thisus.m_condition.wait(); thisus.m_condition.wait();
need_notify = thisus.m_items.empty; if (m_config.priority == ChannelPriority.latency)
need_notify = thisus.m_items.empty;
thisus.m_items.put(item.move); thisus.m_items.put(item.move);
if (m_config.priority == ChannelPriority.overhead)
need_notify = thisus.m_items.full;
} }
if (need_notify) thisus.m_condition.notify(); if (need_notify) {
if (m_config.priority == ChannelPriority.overhead)
thisus.m_condition.notifyAll();
else thisus.m_condition.notify();
}
} }
} }
@ -327,3 +363,28 @@ private final class ChannelImpl(T, size_t buffer_size) {
assert(ch.empty); assert(ch.empty);
} (); } ();
} }
unittest {
import std.traits : EnumMembers;
import vibe.core.core : runTask;
void test(ChannelPriority prio)
{
auto ch = createChannel!int(ChannelConfig(prio));
runTask({
ch.put(1);
ch.put(2);
ch.put(3);
ch.close();
});
int i;
assert(ch.tryConsumeOne(i) && i == 1);
assert(ch.tryConsumeOne(i) && i == 2);
assert(ch.tryConsumeOne(i) && i == 3);
assert(!ch.tryConsumeOne(i));
}
foreach (m; EnumMembers!ChannelPriority)
test(m);
}

View file

@ -642,7 +642,7 @@ final class SyslogLogger(OutputStream) : Logger {
string m_hostName; string m_hostName;
string m_appName; string m_appName;
OutputStream m_ostream; OutputStream m_ostream;
Facility m_facility; SyslogFacility m_facility;
} }
deprecated("Use `SyslogFacility` instead.") deprecated("Use `SyslogFacility` instead.")
@ -680,7 +680,7 @@ final class SyslogLogger(OutputStream) : Logger {
Logger uses the stream's write function when it logs and would hence Logger uses the stream's write function when it logs and would hence
log forevermore. log forevermore.
*/ */
this(OutputStream stream, Facility facility, string appName = null, string hostName = hostName()) this(OutputStream stream, SyslogFacility facility, string appName = null, string hostName = hostName())
{ {
m_hostName = hostName != "" ? hostName : NILVALUE; m_hostName = hostName != "" ? hostName : NILVALUE;
m_appName = appName != "" ? appName : NILVALUE; m_appName = appName != "" ? appName : NILVALUE;
@ -839,6 +839,7 @@ unittest
logger.endLine(); logger.endLine();
} }
auto path = fstream.path; auto path = fstream.path;
destroy(logger);
fstream.close(); fstream.close();
import std.file; import std.file;