diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index 6dac979..d426fce 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -184,6 +184,12 @@ final class WinAPIEventDriverCore : EventDriverCore { nogc_assert((h in m_handles) !is null, "Handle not in use - cannot free."); m_handles.remove(h); } + + package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...) + { + import std.algorithm.searching : canFind; + m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); + } } private long currStdTime() diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index 871e7ec..3297989 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -146,8 +146,10 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override bool releaseRef(FileFD descriptor) { auto h = idToHandle(descriptor); - return m_core.m_handles[h].releaseRef({ + auto slot = &m_core.m_handles[h]; + return slot.releaseRef({ close(descriptor); + m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); m_core.freeSlot(h); }); } @@ -161,7 +163,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { InternalHigh = 0; Offset = cast(uint)(slot.offset & 0xFFFFFFFF); OffsetHigh = cast(uint)(slot.offset >> 32); - hEvent = () @trusted { return cast(HANDLE)slot; } (); + hEvent = h; } auto nbytes = min(slot.buffer.length, DWORD.max); diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 8e5eba3..4d9b9b4 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -697,13 +697,15 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override bool releaseRef(SocketFD fd) { import taggedalgebraic : hasType; - nogc_assert(m_sockets[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_sockets[fd].common.refCount == 0) { - final switch (m_sockets[fd].specific.kind) with (SocketVector.FieldType) { + auto slot = () @trusted { return &m_sockets[fd]; } (); + nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); + if (--slot.common.refCount == 0) { + final switch (slot.specific.kind) with (SocketVector.FieldType) { case Kind.none: break; case Kind.streamSocket: cancelRead(cast(StreamSocketFD)fd); cancelWrite(cast(StreamSocketFD)fd); + m_core.discardEvents(&slot.streamSocket.read.overlapped, &slot.streamSocket.write.overlapped); break; case Kind.streamListen: if (m_sockets[fd].streamListen.acceptCallback) @@ -712,6 +714,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { case Kind.datagramSocket: cancelReceive(cast(DatagramSocketFD)fd); cancelSend(cast(DatagramSocketFD)fd); + m_core.discardEvents(&slot.datagramSocket.read.overlapped, &slot.datagramSocket.write.overlapped); break; } diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index 524af9d..9606148 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -82,6 +82,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { catch (Exception e) assert(false, "Freeing directory watcher buffer failed."); } (); slot.watcher.buffer = null; + core.discardEvents(&slot.watcher.overlapped, &slot.file.write.overlapped); core.freeSlot(handle); })) { diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 6187fdf..63aec28 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -235,3 +235,35 @@ unittest { q.put(i); assert(q.consume().equal(iota(4))); } + + +void filterPending(alias pred, T)(ConsumableQueue!T q) +{ + size_t ir = 0; + size_t iw = 0; + + while (ir < q.m_pendingCount) { + if (!pred(q.getPendingAt(ir))) { + } else { + if (ir != iw) q.getPendingAt(iw) = q.getPendingAt(ir); + iw++; + } + ir++; + } + q.m_pendingCount = iw; +} + + +unittest { + import std.algorithm.comparison : equal; + import std.range : only; + + auto q = new ConsumableQueue!int; + foreach (i; 0 .. 14) q.put(i); + q.filterPending!(i => i % 2 != 0); + assert(q.consume().equal(only(1, 3, 5, 7, 9, 11, 13))); + + foreach (i; 0 .. 14) q.put(i); + q.filterPending!(i => i % 3 == 1); + assert(q.consume().equal(only(1, 4, 7, 10, 13))); +}