Merge pull request #61 from vibe-d/remove_obsolete_events

Fix issues in the WinAPI driver
This commit is contained in:
Sönke Ludwig 2018-03-12 00:00:10 +01:00 committed by GitHub
commit 3d7cf5744c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 5 deletions

View file

@ -184,6 +184,12 @@ final class WinAPIEventDriverCore : EventDriverCore {
nogc_assert((h in m_handles) !is null, "Handle not in use - cannot free."); nogc_assert((h in m_handles) !is null, "Handle not in use - cannot free.");
m_handles.remove(h); m_handles.remove(h);
} }
package void discardEvents(scope OVERLAPPED_CORE*[] overlapped...)
{
import std.algorithm.searching : canFind;
m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped));
}
} }
private long currStdTime() private long currStdTime()

View file

@ -146,8 +146,10 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
override bool releaseRef(FileFD descriptor) override bool releaseRef(FileFD descriptor)
{ {
auto h = idToHandle(descriptor); auto h = idToHandle(descriptor);
return m_core.m_handles[h].releaseRef({ auto slot = &m_core.m_handles[h];
return slot.releaseRef({
close(descriptor); close(descriptor);
m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped);
m_core.freeSlot(h); m_core.freeSlot(h);
}); });
} }
@ -161,7 +163,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
InternalHigh = 0; InternalHigh = 0;
Offset = cast(uint)(slot.offset & 0xFFFFFFFF); Offset = cast(uint)(slot.offset & 0xFFFFFFFF);
OffsetHigh = cast(uint)(slot.offset >> 32); OffsetHigh = cast(uint)(slot.offset >> 32);
hEvent = () @trusted { return cast(HANDLE)slot; } (); hEvent = h;
} }
auto nbytes = min(slot.buffer.length, DWORD.max); auto nbytes = min(slot.buffer.length, DWORD.max);

View file

@ -697,13 +697,15 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
override bool releaseRef(SocketFD fd) override bool releaseRef(SocketFD fd)
{ {
import taggedalgebraic : hasType; import taggedalgebraic : hasType;
nogc_assert(m_sockets[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); auto slot = () @trusted { return &m_sockets[fd]; } ();
if (--m_sockets[fd].common.refCount == 0) { nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");
final switch (m_sockets[fd].specific.kind) with (SocketVector.FieldType) { if (--slot.common.refCount == 0) {
final switch (slot.specific.kind) with (SocketVector.FieldType) {
case Kind.none: break; case Kind.none: break;
case Kind.streamSocket: case Kind.streamSocket:
cancelRead(cast(StreamSocketFD)fd); cancelRead(cast(StreamSocketFD)fd);
cancelWrite(cast(StreamSocketFD)fd); cancelWrite(cast(StreamSocketFD)fd);
m_core.discardEvents(&slot.streamSocket.read.overlapped, &slot.streamSocket.write.overlapped);
break; break;
case Kind.streamListen: case Kind.streamListen:
if (m_sockets[fd].streamListen.acceptCallback) if (m_sockets[fd].streamListen.acceptCallback)
@ -712,6 +714,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
case Kind.datagramSocket: case Kind.datagramSocket:
cancelReceive(cast(DatagramSocketFD)fd); cancelReceive(cast(DatagramSocketFD)fd);
cancelSend(cast(DatagramSocketFD)fd); cancelSend(cast(DatagramSocketFD)fd);
m_core.discardEvents(&slot.datagramSocket.read.overlapped, &slot.datagramSocket.write.overlapped);
break; break;
} }

View file

@ -82,6 +82,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
catch (Exception e) assert(false, "Freeing directory watcher buffer failed."); catch (Exception e) assert(false, "Freeing directory watcher buffer failed.");
} (); } ();
slot.watcher.buffer = null; slot.watcher.buffer = null;
core.discardEvents(&slot.watcher.overlapped, &slot.file.write.overlapped);
core.freeSlot(handle); core.freeSlot(handle);
})) }))
{ {

View file

@ -235,3 +235,35 @@ unittest {
q.put(i); q.put(i);
assert(q.consume().equal(iota(4))); assert(q.consume().equal(iota(4)));
} }
void filterPending(alias pred, T)(ConsumableQueue!T q)
{
size_t ir = 0;
size_t iw = 0;
while (ir < q.m_pendingCount) {
if (!pred(q.getPendingAt(ir))) {
} else {
if (ir != iw) q.getPendingAt(iw) = q.getPendingAt(ir);
iw++;
}
ir++;
}
q.m_pendingCount = iw;
}
unittest {
import std.algorithm.comparison : equal;
import std.range : only;
auto q = new ConsumableQueue!int;
foreach (i; 0 .. 14) q.put(i);
q.filterPending!(i => i % 2 != 0);
assert(q.consume().equal(only(1, 3, 5, 7, 9, 11, 13)));
foreach (i; 0 .. 14) q.put(i);
q.filterPending!(i => i % 3 == 1);
assert(q.consume().equal(only(1, 4, 7, 10, 13)));
}