diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 0c83c7c..1918075 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -67,8 +67,7 @@ interface EventDriver { EventID createEvent(); void triggerEvent(EventID event, bool notify_all = true); void triggerEvent(EventID event, bool notify_all = true) shared; - EventWaitID waitForEvent(EventID event, EventCallback on_event); - void stopWaitingForEvent(EventID event, EventWaitID wait_id); + void waitForEvent(EventID event, EventCallback on_event); // // Timers @@ -78,6 +77,7 @@ interface EventDriver { void stopTimer(TimerID timer); bool isTimerPending(TimerID timer); bool isTimerPeriodic(TimerID timer); + void waitTimer(TimerID timer, TimerCallback callback); // // Resource ownership @@ -98,7 +98,7 @@ interface EventDriver { Decrements the reference count of the given resource. Once the reference count reaches zero, all associated resources will be - freed and the descriptor gets invalidated. + freed and the resource descriptor gets invalidated. */ void releaseRef(SocketFD descriptor); /// ditto diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 275d550..c55e9d0 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -449,36 +449,34 @@ abstract class PosixEventDriver : EventDriver { final override void triggerEvent(EventID event, bool notify_all = true) { - foreach (w; m_fds[event].waiters.consume) - w(event); + if (notify_all) { + foreach (w; m_fds[event].waiters.consume) + w(event); + } else { + if (!m_fds[event].waiters.empty) + m_fds[event].waiters.consumeOne(); + } } final override void triggerEvent(EventID event, bool notify_all = true) - shared { - /*int one = 1; - if (notify_all) atomicStore(m_fds[event].triggerAll, true); - () @trusted { write(event, &one, one.sizeof); } ();*/ - assert(false); + shared @trusted { + import core.atomic : atomicStore; + auto thisus = cast(PosixEventDriver)this; + int one = 1; + if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true); + () @trusted { write(event, &one, one.sizeof); } (); } - final override EventWaitID waitForEvent(EventID event, EventCallback on_event) + final override void waitForEvent(EventID event, EventCallback on_event) { - //return m_fds[event].waiters.put(on_event); - assert(false); - } - - final override void stopWaitingForEvent(EventID event, EventWaitID wait_id) - { - assert(false); - //m_fds[event].waiters.remove(wait_id); + return m_fds[event].waiters.put(on_event); } private void onEvent(FD event) - { - assert(false); - /*auto all = atomicLoad(m_fds[event].triggerAll); - atomicStore(m_fds[event].triggerAll, false); - triggerEvent(cast(EventID)event, all);*/ + @trusted { + import core.atomic : cas; + auto all = cas(&m_fds[event].triggerAll, true, false); + triggerEvent(cast(EventID)event, all); } final override void addRef(SocketFD fd) @@ -621,6 +619,7 @@ private struct FDSlot { ConnectCallback connectCallback; AcceptCallback acceptCallback; ConsumableQueue!EventCallback waiters; + shared bool triggerAll; @property EventMask eventMask() const nothrow { EventMask ret = cast(EventMask)0; diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index a7c1333..929400f 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -124,6 +124,11 @@ mixin template DefaultTimerImpl() { return m_timers[descriptor].repeatDuration > 0; } + final override void waitTimer(TimerID timer, TimerCallback callback) + { + assert(false); + } + final override void addRef(TimerID descriptor) { m_timers[descriptor].refCount++; diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 2a9c636..ac381d7 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -4,6 +4,7 @@ module eventcore.internal.consumablequeue; */ class ConsumableQueue(T) { + @safe: nothrow: private { @@ -20,6 +21,8 @@ class ConsumableQueue(T) @property size_t length() const { return m_pendingCount; } + @property bool empty() const { return length == 0; } + /** Inserts a single element into the queue. */ @safe void put(T element) @@ -64,6 +67,15 @@ class ConsumableQueue(T) return ConsumedRange(this, first, count); } + T consumeOne() + { + assert(!empty); + auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value; + if (m_consumedCount) m_consumedCount++; + else m_first = (m_first + 1) & m_capacityMask; + return ret; + } + static struct ConsumedRange { nothrow: