From 3090bc2c7cadf1308706a64634804b8c444f859a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 11 Mar 2018 19:01:37 +0100 Subject: [PATCH 1/3] Implement filterPending for ConsumableQueue. --- source/eventcore/internal/consumablequeue.d | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 6187fdf..63aec28 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -235,3 +235,35 @@ unittest { q.put(i); assert(q.consume().equal(iota(4))); } + + +void filterPending(alias pred, T)(ConsumableQueue!T q) +{ + size_t ir = 0; + size_t iw = 0; + + while (ir < q.m_pendingCount) { + if (!pred(q.getPendingAt(ir))) { + } else { + if (ir != iw) q.getPendingAt(iw) = q.getPendingAt(ir); + iw++; + } + ir++; + } + q.m_pendingCount = iw; +} + + +unittest { + import std.algorithm.comparison : equal; + import std.range : only; + + auto q = new ConsumableQueue!int; + foreach (i; 0 .. 14) q.put(i); + q.filterPending!(i => i % 2 != 0); + assert(q.consume().equal(only(1, 3, 5, 7, 9, 11, 13))); + + foreach (i; 0 .. 14) q.put(i); + q.filterPending!(i => i % 3 == 1); + assert(q.consume().equal(only(1, 4, 7, 10, 13))); +} From 26907c74897624a4550d150f3b9b7f9abadc13ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 11 Mar 2018 20:12:32 +0100 Subject: [PATCH 2/3] Fix set up of OVERLAPPED struct for files. --- source/eventcore/drivers/winapi/files.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index 871e7ec..af63e27 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -161,7 +161,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { InternalHigh = 0; Offset = cast(uint)(slot.offset & 0xFFFFFFFF); OffsetHigh = cast(uint)(slot.offset >> 32); - hEvent = () @trusted { return cast(HANDLE)slot; } (); + hEvent = h; } auto nbytes = min(slot.buffer.length, DWORD.max); From f094b3934bef7fb4306b805a7be44c67ea0cbd05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 11 Mar 2018 20:15:58 +0100 Subject: [PATCH 3/3] Make sure that no obsolete I/O events get processed. Removes overlapped I/O events when a handle gets closed prematurely (before all events have been processed) to avoid potential range violation errors as a consequence. --- source/eventcore/drivers/winapi/core.d | 6 ++++++ source/eventcore/drivers/winapi/files.d | 4 +++- source/eventcore/drivers/winapi/sockets.d | 9 ++++++--- source/eventcore/drivers/winapi/watchers.d | 1 + 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index 6dac979..d426fce 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -184,6 +184,12 @@ final class WinAPIEventDriverCore : EventDriverCore { nogc_assert((h in m_handles) !is null, "Handle not in use - cannot free."); m_handles.remove(h); } + + package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...) + { + import std.algorithm.searching : canFind; + m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); + } } private long currStdTime() diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index af63e27..3297989 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -146,8 +146,10 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override bool releaseRef(FileFD descriptor) { auto h = idToHandle(descriptor); - return m_core.m_handles[h].releaseRef({ + auto slot = &m_core.m_handles[h]; + return slot.releaseRef({ close(descriptor); + m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); m_core.freeSlot(h); }); } diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 8e5eba3..4d9b9b4 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -697,13 +697,15 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override bool releaseRef(SocketFD fd) { import taggedalgebraic : hasType; - nogc_assert(m_sockets[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_sockets[fd].common.refCount == 0) { - final switch (m_sockets[fd].specific.kind) with (SocketVector.FieldType) { + auto slot = () @trusted { return &m_sockets[fd]; } (); + nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); + if (--slot.common.refCount == 0) { + final switch (slot.specific.kind) with (SocketVector.FieldType) { case Kind.none: break; case Kind.streamSocket: cancelRead(cast(StreamSocketFD)fd); cancelWrite(cast(StreamSocketFD)fd); + m_core.discardEvents(&slot.streamSocket.read.overlapped, &slot.streamSocket.write.overlapped); break; case Kind.streamListen: if (m_sockets[fd].streamListen.acceptCallback) @@ -712,6 +714,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { case Kind.datagramSocket: cancelReceive(cast(DatagramSocketFD)fd); cancelSend(cast(DatagramSocketFD)fd); + m_core.discardEvents(&slot.datagramSocket.read.overlapped, &slot.datagramSocket.write.overlapped); break; } diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index 524af9d..9606148 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -82,6 +82,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { catch (Exception e) assert(false, "Freeing directory watcher buffer failed."); } (); slot.watcher.buffer = null; + core.discardEvents(&slot.watcher.overlapped, &slot.file.write.overlapped); core.freeSlot(handle); })) {