From f808f89e7c0c2a480f6fd70c31e15fe8353ac5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 15 Jun 2016 18:19:22 +0200 Subject: [PATCH] Fix issues for posix events and cleanup cancel semantics. Cancelling an operation now guarantees that the callback won't be called. --- source/eventcore/driver.d | 2 +- source/eventcore/drivers/posix.d | 40 ++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 41f7a93..4b7c41e 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -76,6 +76,7 @@ interface EventDriver { void triggerEvent(EventID event, bool notify_all = true); void triggerEvent(EventID event, bool notify_all = true) shared; void waitForEvent(EventID event, EventCallback on_event); + void cancelWaitForEvent(EventID event, EventCallback on_event); // // Timers @@ -174,7 +175,6 @@ enum IOMode { enum IOStatus { ok, /// The data has been transferred normally disconnected, /// The connection was closed before all data could be transterred - cancelled, /// The operation was cancelled manually error, /// An error occured while transferring the data wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable } diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 44491ec..04a4e66 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -8,6 +8,7 @@ module eventcore.drivers.posix; public import eventcore.driver; import eventcore.drivers.timer; +import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.utils; import std.socket : Address, AddressFamily, UnknownAddress; @@ -282,7 +283,6 @@ abstract class PosixEventDriver : EventDriver { setNotifyCallback!(EventType.read)(socket, null); with (m_fds[socket]) { readBuffer = null; - readCallback(socket, IOStatus.cancelled, bytesRead); } } @@ -378,7 +378,6 @@ abstract class PosixEventDriver : EventDriver { setNotifyCallback!(EventType.write)(socket, null); with (m_fds[socket]) { writeBuffer = null; - writeCallback(socket, IOStatus.cancelled, bytesWritten); } } @@ -482,6 +481,7 @@ abstract class PosixEventDriver : EventDriver { { auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); initFD(id); + m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation registerFD(id, EventMask.read); startNotify!(EventType.read)(id, &onEvent); return id; @@ -489,6 +489,7 @@ abstract class PosixEventDriver : EventDriver { final override void triggerEvent(EventID event, bool notify_all = true) { + assert(event < m_fds.length, "Invalid event ID passed to triggerEvent."); if (notify_all) { foreach (w; m_fds[event].waiters.consume) w(event); @@ -502,6 +503,7 @@ abstract class PosixEventDriver : EventDriver { shared @trusted { import core.atomic : atomicStore; auto thisus = cast(PosixEventDriver)this; + assert(event < thisus.m_fds.length, "Invalid event ID passed to shared triggerEvent."); int one = 1; if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true); () @trusted { write(event, &one, one.sizeof); } (); @@ -509,9 +511,19 @@ abstract class PosixEventDriver : EventDriver { final override void waitForEvent(EventID event, EventCallback on_event) { + assert(event < m_fds.length, "Invalid event ID passed to waitForEvent."); return m_fds[event].waiters.put(on_event); } + final override void cancelWaitForEvent(EventID event, EventCallback on_event) + { + import std.algorithm.searching : countUntil; + import std.algorithm.mutation : remove; + + auto slot = &m_fds[event]; + slot.waiters.removePending(on_event); + } + private void onEvent(FD event) @trusted { import core.atomic : cas; @@ -522,7 +534,7 @@ abstract class PosixEventDriver : EventDriver { final override void addRef(SocketFD fd) { auto pfd = &m_fds[fd]; - assert(pfd.refCount > 0); + assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD."); m_fds[fd].refCount++; } @@ -534,14 +546,14 @@ abstract class PosixEventDriver : EventDriver { final override void addRef(EventID descriptor) { auto pfd = &m_fds[descriptor]; - assert(pfd.refCount > 0); + assert(pfd.refCount > 0, "Adding reference to unreferenced event FD."); m_fds[descriptor].refCount++; } final override void releaseRef(SocketFD fd) { auto pfd = &m_fds[fd]; - assert(pfd.refCount > 0); + assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD."); if (--m_fds[fd].refCount == 0) { unregisterFD(fd); clearFD(fd); @@ -553,16 +565,11 @@ abstract class PosixEventDriver : EventDriver { { assert(false); } - - final override void releaseRef(TimerID descriptor) - { - assert(false); - } - + final override void releaseRef(EventID descriptor) { auto pfd = &m_fds[descriptor]; - assert(pfd.refCount > 0); + assert(pfd.refCount > 0, "Releasing reference to unreferenced event FD."); if (--m_fds[descriptor].refCount == 0) { unregisterFD(descriptor); clearFD(descriptor); @@ -627,7 +634,8 @@ import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(E private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) { - assert((callback !is null) != (m_fds[fd].callback[evt] !is null)); + assert((callback !is null) != (m_fds[fd].callback[evt] !is null), + "Overwriting notification callback."); m_fds[fd].callback[evt] = callback; } @@ -649,6 +657,10 @@ import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(E { if (m_fds[fd].userDataDestructor) () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); + () @trusted nothrow { + scope (failure) assert(false); + destroy(m_fds[fd].waiters); + } (); m_fds[fd] = FDSlot.init; } } @@ -658,8 +670,6 @@ alias FDEnumerateCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD); private struct FDSlot { - import eventcore.internal.consumablequeue; - FDSlotCallback[EventType.max+1] callback; uint refCount;