Implement manual events in the PosixEventDriver and add waitTimer.

This commit is contained in:
Sönke Ludwig 2016-02-03 14:21:02 +01:00
parent 844e955cdb
commit 0b7adc993f
4 changed files with 40 additions and 24 deletions

View file

@ -67,8 +67,7 @@ interface EventDriver {
EventID createEvent(); EventID createEvent();
void triggerEvent(EventID event, bool notify_all = true); void triggerEvent(EventID event, bool notify_all = true);
void triggerEvent(EventID event, bool notify_all = true) shared; void triggerEvent(EventID event, bool notify_all = true) shared;
EventWaitID waitForEvent(EventID event, EventCallback on_event); void waitForEvent(EventID event, EventCallback on_event);
void stopWaitingForEvent(EventID event, EventWaitID wait_id);
// //
// Timers // Timers
@ -78,6 +77,7 @@ interface EventDriver {
void stopTimer(TimerID timer); void stopTimer(TimerID timer);
bool isTimerPending(TimerID timer); bool isTimerPending(TimerID timer);
bool isTimerPeriodic(TimerID timer); bool isTimerPeriodic(TimerID timer);
void waitTimer(TimerID timer, TimerCallback callback);
// //
// Resource ownership // Resource ownership
@ -98,7 +98,7 @@ interface EventDriver {
Decrements the reference count of the given resource. Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be Once the reference count reaches zero, all associated resources will be
freed and the descriptor gets invalidated. freed and the resource descriptor gets invalidated.
*/ */
void releaseRef(SocketFD descriptor); void releaseRef(SocketFD descriptor);
/// ditto /// ditto

View file

@ -449,36 +449,34 @@ abstract class PosixEventDriver : EventDriver {
final override void triggerEvent(EventID event, bool notify_all = true) final override void triggerEvent(EventID event, bool notify_all = true)
{ {
if (notify_all) {
foreach (w; m_fds[event].waiters.consume) foreach (w; m_fds[event].waiters.consume)
w(event); w(event);
} else {
if (!m_fds[event].waiters.empty)
m_fds[event].waiters.consumeOne();
}
} }
final override void triggerEvent(EventID event, bool notify_all = true) final override void triggerEvent(EventID event, bool notify_all = true)
shared { shared @trusted {
/*int one = 1; import core.atomic : atomicStore;
if (notify_all) atomicStore(m_fds[event].triggerAll, true); auto thisus = cast(PosixEventDriver)this;
() @trusted { write(event, &one, one.sizeof); } ();*/ int one = 1;
assert(false); if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true);
() @trusted { write(event, &one, one.sizeof); } ();
} }
final override EventWaitID waitForEvent(EventID event, EventCallback on_event) final override void waitForEvent(EventID event, EventCallback on_event)
{ {
//return m_fds[event].waiters.put(on_event); return m_fds[event].waiters.put(on_event);
assert(false);
}
final override void stopWaitingForEvent(EventID event, EventWaitID wait_id)
{
assert(false);
//m_fds[event].waiters.remove(wait_id);
} }
private void onEvent(FD event) private void onEvent(FD event)
{ @trusted {
assert(false); import core.atomic : cas;
/*auto all = atomicLoad(m_fds[event].triggerAll); auto all = cas(&m_fds[event].triggerAll, true, false);
atomicStore(m_fds[event].triggerAll, false); triggerEvent(cast(EventID)event, all);
triggerEvent(cast(EventID)event, all);*/
} }
final override void addRef(SocketFD fd) final override void addRef(SocketFD fd)
@ -621,6 +619,7 @@ private struct FDSlot {
ConnectCallback connectCallback; ConnectCallback connectCallback;
AcceptCallback acceptCallback; AcceptCallback acceptCallback;
ConsumableQueue!EventCallback waiters; ConsumableQueue!EventCallback waiters;
shared bool triggerAll;
@property EventMask eventMask() const nothrow { @property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0; EventMask ret = cast(EventMask)0;

View file

@ -124,6 +124,11 @@ mixin template DefaultTimerImpl() {
return m_timers[descriptor].repeatDuration > 0; return m_timers[descriptor].repeatDuration > 0;
} }
final override void waitTimer(TimerID timer, TimerCallback callback)
{
assert(false);
}
final override void addRef(TimerID descriptor) final override void addRef(TimerID descriptor)
{ {
m_timers[descriptor].refCount++; m_timers[descriptor].refCount++;

View file

@ -4,6 +4,7 @@ module eventcore.internal.consumablequeue;
*/ */
class ConsumableQueue(T) class ConsumableQueue(T)
{ {
@safe:
nothrow: nothrow:
private { private {
@ -20,6 +21,8 @@ class ConsumableQueue(T)
@property size_t length() const { return m_pendingCount; } @property size_t length() const { return m_pendingCount; }
@property bool empty() const { return length == 0; }
/** Inserts a single element into the queue. /** Inserts a single element into the queue.
*/ */
@safe void put(T element) @safe void put(T element)
@ -64,6 +67,15 @@ class ConsumableQueue(T)
return ConsumedRange(this, first, count); return ConsumedRange(this, first, count);
} }
T consumeOne()
{
assert(!empty);
auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value;
if (m_consumedCount) m_consumedCount++;
else m_first = (m_first + 1) & m_capacityMask;
return ret;
}
static struct ConsumedRange { static struct ConsumedRange {
nothrow: nothrow: