Merge pull request #130 from vibe-d/channel_improvements
Channel improvements
This commit is contained in:
commit
1dedff027e
|
@ -18,6 +18,8 @@ import core.sync.mutex;
|
||||||
// TODO: implement a multi-channel wait, e.g.
|
// TODO: implement a multi-channel wait, e.g.
|
||||||
// TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function
|
// TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function
|
||||||
|
|
||||||
|
// NOTE: not using synchronized (m_mutex) because it is not nothrow
|
||||||
|
|
||||||
|
|
||||||
/** Creates a new channel suitable for cross-task and cross-thread communication.
|
/** Creates a new channel suitable for cross-task and cross-thread communication.
|
||||||
*/
|
*/
|
||||||
|
@ -54,6 +56,14 @@ struct Channel(T, size_t buffer_size = 100) {
|
||||||
/// ditto
|
/// ditto
|
||||||
@property bool empty() shared { return m_impl.empty; }
|
@property bool empty() shared { return m_impl.empty; }
|
||||||
|
|
||||||
|
/** Returns the current count of items in the buffer.
|
||||||
|
|
||||||
|
This function is useful for diagnostic purposes.
|
||||||
|
*/
|
||||||
|
@property size_t bufferFill() { return m_impl.bufferFill; }
|
||||||
|
/// ditto
|
||||||
|
@property size_t bufferFill() shared { return m_impl.bufferFill; }
|
||||||
|
|
||||||
/** Closes the channel.
|
/** Closes the channel.
|
||||||
|
|
||||||
A closed channel does not accept any new items enqueued using `put` and
|
A closed channel does not accept any new items enqueued using `put` and
|
||||||
|
@ -121,22 +131,39 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
this()
|
this()
|
||||||
shared {
|
shared @trusted {
|
||||||
m_mutex = cast(shared)new Mutex;
|
m_mutex = cast(shared)new Mutex;
|
||||||
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
|
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@property bool empty()
|
@property bool empty()
|
||||||
shared {
|
shared nothrow {
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
return thisus.m_closed && thisus.m_items.empty;
|
return thisus.m_closed && thisus.m_items.empty;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@property size_t bufferFill()
|
||||||
|
shared nothrow {
|
||||||
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
|
return thisus.m_items.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void close()
|
void close()
|
||||||
shared {
|
shared nothrow {
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
thisus.m_closed = true;
|
thisus.m_closed = true;
|
||||||
thisus.m_condition.notifyAll();
|
thisus.m_condition.notifyAll();
|
||||||
|
@ -144,11 +171,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tryConsumeOne(ref T dst)
|
bool tryConsumeOne(ref T dst)
|
||||||
shared {
|
shared nothrow {
|
||||||
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
bool was_full = false;
|
bool was_full = false;
|
||||||
|
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
while (thisus.m_items.empty) {
|
while (thisus.m_items.empty) {
|
||||||
if (m_closed) return false;
|
if (m_closed) return false;
|
||||||
thisus.m_condition.wait();
|
thisus.m_condition.wait();
|
||||||
|
@ -169,7 +199,10 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
T ret;
|
T ret;
|
||||||
bool was_full = false;
|
bool was_full = false;
|
||||||
|
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
while (thisus.m_items.empty) {
|
while (thisus.m_items.empty) {
|
||||||
if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
|
if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
|
||||||
thisus.m_condition.wait();
|
thisus.m_condition.wait();
|
||||||
|
@ -185,11 +218,14 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
|
bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
|
||||||
shared {
|
shared nothrow {
|
||||||
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
bool was_full = false;
|
bool was_full = false;
|
||||||
|
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
while (thisus.m_items.empty) {
|
while (thisus.m_items.empty) {
|
||||||
if (m_closed) return false;
|
if (m_closed) return false;
|
||||||
thisus.m_condition.wait();
|
thisus.m_condition.wait();
|
||||||
|
@ -209,7 +245,10 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
|
||||||
bool need_notify = false;
|
bool need_notify = false;
|
||||||
|
|
||||||
synchronized (m_mutex) {
|
{
|
||||||
|
m_mutex.lock_nothrow();
|
||||||
|
scope (exit) m_mutex.unlock_nothrow();
|
||||||
|
|
||||||
enforce(!m_closed, "Sending on closed channel.");
|
enforce(!m_closed, "Sending on closed channel.");
|
||||||
while (thisus.m_items.full)
|
while (thisus.m_items.full)
|
||||||
thisus.m_condition.wait();
|
thisus.m_condition.wait();
|
||||||
|
@ -221,7 +260,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest { // test basic operation and non-copyable struct compatiblity
|
@safe unittest { // test basic operation and non-copyable struct compatiblity
|
||||||
static struct S {
|
static struct S {
|
||||||
int i;
|
int i;
|
||||||
@disable this(this);
|
@disable this(this);
|
||||||
|
@ -250,7 +289,7 @@ unittest { // test basic operation and non-copyable struct compatiblity
|
||||||
assert(!ch.tryConsumeOne(v));
|
assert(!ch.tryConsumeOne(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest { // make sure shared(Channel!T) can also be used
|
@safe unittest { // make sure shared(Channel!T) can also be used
|
||||||
shared ch = createChannel!int;
|
shared ch = createChannel!int;
|
||||||
ch.put(1);
|
ch.put(1);
|
||||||
assert(!ch.empty);
|
assert(!ch.empty);
|
||||||
|
@ -258,3 +297,23 @@ unittest { // make sure shared(Channel!T) can also be used
|
||||||
ch.close();
|
ch.close();
|
||||||
assert(ch.empty);
|
assert(ch.empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@safe unittest { // ensure nothrow'ness for throwing struct
|
||||||
|
static struct S {
|
||||||
|
this(this) { throw new Exception("meh!"); }
|
||||||
|
}
|
||||||
|
auto ch = createChannel!S;
|
||||||
|
ch.put(S.init);
|
||||||
|
ch.put(S.init);
|
||||||
|
|
||||||
|
S s;
|
||||||
|
FixedRingBuffer!(S, 100, true) sb;
|
||||||
|
|
||||||
|
() nothrow {
|
||||||
|
assert(ch.tryConsumeOne(s));
|
||||||
|
assert(ch.consumeAll(sb));
|
||||||
|
assert(sb.length == 1);
|
||||||
|
ch.close();
|
||||||
|
assert(ch.empty);
|
||||||
|
} ();
|
||||||
|
}
|
||||||
|
|
|
@ -664,24 +664,34 @@ private void runMutexUnitTests(M)()
|
||||||
Note that it is generally not safe to use a `TaskCondition` together with an
|
Note that it is generally not safe to use a `TaskCondition` together with an
|
||||||
interruptible mutex type.
|
interruptible mutex type.
|
||||||
|
|
||||||
See_Also: InterruptibleTaskCondition
|
See_Also: `InterruptibleTaskCondition`
|
||||||
*/
|
*/
|
||||||
final class TaskCondition : core.sync.condition.Condition {
|
final class TaskCondition : core.sync.condition.Condition {
|
||||||
@safe:
|
@safe:
|
||||||
|
|
||||||
private TaskConditionImpl!(false, Mutex) m_impl;
|
private TaskConditionImpl!(false, Mutex) m_impl;
|
||||||
|
|
||||||
this(core.sync.mutex.Mutex mtx) {
|
this(core.sync.mutex.Mutex mtx)
|
||||||
|
{
|
||||||
|
assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
|
||||||
|
"TaskCondition can only be used with Mutex or TaskMutex");
|
||||||
|
|
||||||
m_impl.setup(mtx);
|
m_impl.setup(mtx);
|
||||||
super(mtx);
|
super(mtx);
|
||||||
}
|
}
|
||||||
override @property Mutex mutex() { return m_impl.mutex; }
|
override @property Mutex mutex() nothrow { return m_impl.mutex; }
|
||||||
override void wait() { m_impl.wait(); }
|
override void wait() nothrow { m_impl.wait(); }
|
||||||
override bool wait(Duration timeout) { return m_impl.wait(timeout); }
|
override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
|
||||||
override void notify() { m_impl.notify(); }
|
override void notify() nothrow { m_impl.notify(); }
|
||||||
override void notifyAll() { m_impl.notifyAll(); }
|
override void notifyAll() nothrow { m_impl.notifyAll(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
new TaskCondition(new Mutex);
|
||||||
|
new TaskCondition(new TaskMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** This example shows the typical usage pattern using a `while` loop to make
|
/** This example shows the typical usage pattern using a `while` loop to make
|
||||||
sure that the final condition is reached.
|
sure that the final condition is reached.
|
||||||
*/
|
*/
|
||||||
|
@ -1631,7 +1641,8 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
|
||||||
private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
||||||
private {
|
private {
|
||||||
LOCKABLE m_mutex;
|
LOCKABLE m_mutex;
|
||||||
|
static if (is(LOCKABLE == Mutex))
|
||||||
|
TaskMutex m_taskMutex;
|
||||||
shared(ManualEvent) m_signal;
|
shared(ManualEvent) m_signal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1653,6 +1664,8 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
||||||
void setup(LOCKABLE mtx)
|
void setup(LOCKABLE mtx)
|
||||||
{
|
{
|
||||||
m_mutex = mtx;
|
m_mutex = mtx;
|
||||||
|
static if (is(typeof(m_taskMutex)))
|
||||||
|
m_taskMutex = cast(TaskMutex)mtx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@property LOCKABLE mutex() { return m_mutex; }
|
@property LOCKABLE mutex() { return m_mutex; }
|
||||||
|
@ -1665,8 +1678,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto refcount = m_signal.emitCount;
|
auto refcount = m_signal.emitCount;
|
||||||
m_mutex.unlock();
|
|
||||||
scope(exit) m_mutex.lock();
|
static if (is(LOCKABLE == Mutex)) {
|
||||||
|
if (m_taskMutex) m_taskMutex.unlock();
|
||||||
|
else m_mutex.unlock_nothrow();
|
||||||
|
} else m_mutex.unlock();
|
||||||
|
|
||||||
|
scope(exit) {
|
||||||
|
static if (is(LOCKABLE == Mutex)) {
|
||||||
|
if (m_taskMutex) m_taskMutex.lock();
|
||||||
|
else m_mutex.lock_nothrow();
|
||||||
|
} else m_mutex.lock();
|
||||||
|
}
|
||||||
static if (INTERRUPTIBLE) m_signal.wait(refcount);
|
static if (INTERRUPTIBLE) m_signal.wait(refcount);
|
||||||
else m_signal.waitUninterruptible(refcount);
|
else m_signal.waitUninterruptible(refcount);
|
||||||
}
|
}
|
||||||
|
@ -1680,8 +1703,18 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto refcount = m_signal.emitCount;
|
auto refcount = m_signal.emitCount;
|
||||||
m_mutex.unlock();
|
|
||||||
scope(exit) m_mutex.lock();
|
static if (is(LOCKABLE == Mutex)) {
|
||||||
|
if (m_taskMutex) m_taskMutex.unlock();
|
||||||
|
else m_mutex.unlock_nothrow();
|
||||||
|
} else m_mutex.unlock();
|
||||||
|
|
||||||
|
scope(exit) {
|
||||||
|
static if (is(LOCKABLE == Mutex)) {
|
||||||
|
if (m_taskMutex) m_taskMutex.lock();
|
||||||
|
else m_mutex.lock_nothrow();
|
||||||
|
} else m_mutex.lock();
|
||||||
|
}
|
||||||
|
|
||||||
static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
|
static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
|
||||||
else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
|
else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
|
||||||
|
|
Loading…
Reference in a new issue