From 496e99c3b4bc22a526071acf8cfd5806e9710ab2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 9 May 2020 14:41:18 +0200 Subject: [PATCH] Make the API robust against using invalid handles. Fixes #105. Introduces a "validationCounter" field for all handle types that gets incremented (at least) whenever an OS file descriptor/handle gets invalidated or re-allocated. This way, an old eventcore handle to a reused OS handle can always be distinguished from the current one to avoid interference. --- source/eventcore/driver.d | 175 +++++- source/eventcore/drivers/posix/dns.d | 54 +- source/eventcore/drivers/posix/driver.d | 41 +- source/eventcore/drivers/posix/epoll.d | 2 +- source/eventcore/drivers/posix/events.d | 27 +- source/eventcore/drivers/posix/kqueue.d | 2 +- source/eventcore/drivers/posix/pipes.d | 643 +++++++++++--------- source/eventcore/drivers/posix/processes.d | 76 ++- source/eventcore/drivers/posix/signals.d | 23 +- source/eventcore/drivers/posix/sockets.d | 170 ++++-- source/eventcore/drivers/posix/watchers.d | 42 +- source/eventcore/drivers/threadedfile.d | 74 ++- source/eventcore/drivers/timer.d | 32 +- source/eventcore/drivers/winapi/core.d | 25 +- source/eventcore/drivers/winapi/dns.d | 9 +- source/eventcore/drivers/winapi/driver.d | 2 +- source/eventcore/drivers/winapi/events.d | 26 +- source/eventcore/drivers/winapi/files.d | 56 +- source/eventcore/drivers/winapi/pipes.d | 107 ++-- source/eventcore/drivers/winapi/processes.d | 89 +-- source/eventcore/drivers/winapi/signals.d | 9 + source/eventcore/drivers/winapi/sockets.d | 110 +++- source/eventcore/drivers/winapi/watchers.d | 23 +- source/eventcore/internal/utils.d | 12 + 24 files changed, 1243 insertions(+), 586 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index e5e13a0..6bd00e2 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -114,7 +114,23 @@ interface EventDriverCore { /** Executes a callback in the thread owning the driver. */ - void runInOwnerThread(ThreadCallback del, intptr_t param) shared; + void runInOwnerThread(ThreadCallback2 fun, intptr_t param1, intptr_t param2) shared; + /// ditto + final void runInOwnerThread(ThreadCallback1 fun, intptr_t param1) + shared { + runInOwnerThread((p1, p2) { + auto f = () @trusted { return cast(ThreadCallback1)p2; } (); + f(p1); + }, param1, cast(intptr_t)fun); + } + /// ditto + final void runInOwnerThread(ThreadCallback0 fun) + shared { + runInOwnerThread((p1, p2) { + auto f = () @trusted { return cast(ThreadCallback0)p2; } (); + f(); + }, 0, cast(intptr_t)fun); + } /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; @@ -341,6 +357,18 @@ interface EventDriverSockets { /// Cancels an ongoing wait for an outgoing datagram. void cancelSend(DatagramSocketFD socket); + /** Determines whether the given socket handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called and I/O operations will result in + `IOStatus.invalidHandle`. + + A valid handle gets invalid when either the reference count drops to + zero, or after the socket was explicitly closed. + */ + bool isValid(SocketFD handle) const @nogc; + /** Increments the reference count of the given socket. */ void addRef(SocketFD descriptor); @@ -352,6 +380,8 @@ interface EventDriverSockets { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(SocketFD descriptor); @@ -411,6 +441,13 @@ interface EventDriverDNS { /// Cancels an ongoing DNS lookup. void cancelLookup(DNSLookupID handle); + + /** Determines whether the given DNS lookup handle is valid. + + A valid handle gets invalid when the lookup has completed or got + cancelled. + */ + bool isValid(DNSLookupID handle) const @nogc; } @@ -447,6 +484,18 @@ interface EventDriverFiles { void cancelWrite(FileFD file); void cancelRead(FileFD file); + /** Determines whether the given file handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called and I/O operations will result in + `IOStatus.invalidHandle`. + + A valid handle gets invalid when either the reference count drops to + zero, or after the file was explicitly closed. + */ + bool isValid(FileFD handle) const @nogc; + /** Increments the reference count of the given file. */ void addRef(FileFD descriptor); @@ -458,6 +507,8 @@ interface EventDriverFiles { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(FileFD descriptor); @@ -505,6 +556,16 @@ interface EventDriverEvents { /// Cancels an ongoing wait operation. void cancelWait(EventID event, EventCallback on_event); + /** Determines whether the given event handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called. + + A valid handle gets invalid when the reference count drops to zero. + */ + bool isValid(EventID handle) const @nogc; + /** Increments the reference count of the given event. */ void addRef(EventID descriptor); @@ -516,6 +577,8 @@ interface EventDriverEvents { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(EventID descriptor); @@ -558,6 +621,16 @@ interface EventDriverSignals { */ SignalListenID listen(int sig, SignalCallback on_signal); + /** Determines whether the given signal handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called. + + A valid handle gets invalid when the reference count drops to zero. + */ + bool isValid(SignalListenID handle) const @nogc; + /** Increments the reference count of the given resource. */ void addRef(SignalListenID descriptor); @@ -569,6 +642,8 @@ interface EventDriverSignals { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(SignalListenID descriptor); } @@ -588,6 +663,16 @@ interface EventDriverTimers { void wait(TimerID timer, TimerCallback2 callback); void cancelWait(TimerID timer); + /** Determines whether the given timer handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called. + + A valid handle gets invalid when the reference count drops to zero. + */ + bool isValid(TimerID handle) const @nogc; + /** Increments the reference count of the given resource. */ void addRef(TimerID descriptor); @@ -599,6 +684,8 @@ interface EventDriverTimers { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(TimerID descriptor); @@ -624,6 +711,16 @@ interface EventDriverWatchers { /// Watches a directory or a directory sub tree for changes. WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback); + /** Determines whether the given watcher handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called. + + A valid handle gets invalid when the reference count drops to zero. + */ + bool isValid(WatcherID handle) const @nogc; + /** Increments the reference count of the given resource. */ void addRef(WatcherID descriptor); @@ -635,6 +732,8 @@ interface EventDriverWatchers { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(WatcherID descriptor); @@ -688,7 +787,9 @@ interface EventDriverProcesses { */ void kill(ProcessID pid, int signal); - /** Wait for the process to exit. Returns an identifier that can be used to cancel the wait. + /** Wait for the process to exit. + + Returns an identifier that can be used to cancel the wait. */ size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit); @@ -696,6 +797,16 @@ interface EventDriverProcesses { */ void cancelWait(ProcessID pid, size_t waitId); + /** Determines whether the given process handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called. + + A valid handle gets invalid when the reference count drops to zero. + */ + bool isValid(ProcessID handle) const @nogc; + /** Increments the reference count of the given resource. */ void addRef(ProcessID pid); @@ -708,6 +819,8 @@ interface EventDriverProcesses { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(ProcessID pid); @@ -772,6 +885,18 @@ interface EventDriverPipes { */ void close(PipeFD pipe); + /** Determines whether the given pipe handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called and I/O operations will result in + `IOStatus.invalidHandle`. + + A valid handle gets invalid when either the reference count drops to + zero, or the pipe is explicitly closed. + */ + bool isValid(PipeFD handle) const @nogc; + /** Increments the reference count of the given resource. */ void addRef(PipeFD pid); @@ -783,6 +908,8 @@ interface EventDriverPipes { Returns: Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. */ bool releaseRef(PipeFD pid); @@ -919,7 +1046,8 @@ enum IOStatus { ok, /// The data has been transferred normally disconnected, /// The connection was closed before all data could be transterred error, /// An error occured while transferring the data - wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable + wouldBlock, /// Returned for `IOMode.immediate` when no data is readily readable/writable + invalidHandle, /// The passed handle is not valid } enum DNSStatus { @@ -986,9 +1114,6 @@ struct Process { } mixin template Handle(string NAME, T, T invalid_value = T.init) { - static if (is(T.BaseType)) alias BaseType = T.BaseType; - else alias BaseType = T; - alias name = NAME; enum invalid = typeof(this).init; @@ -997,23 +1122,45 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) { T value = invalid_value; - this(BaseType value) { this.value = T(value); } + static if (is(T.BaseType)) { + alias BaseType = T.BaseType; - U opCast(U : Handle!(V, M), V, int M)() - const { - // TODO: verify that U derives from typeof(this)! - return U(value); + this(BaseType value, uint validation_counter) + { + this.value = T(value, validation_counter); + } + } else { + alias BaseType = T; + + uint validationCounter; + + this(BaseType value, uint validation_counter) + { + this.value = value; + this.validationCounter = validation_counter; + } } - U opCast(U : BaseType)() - const { + U opCast(U)() const + if (is(U.BaseType) && is(typeof(U.value))) + { + // TODO: verify that U derives from typeof(this)! + return U(cast(U.BaseType)value, validationCounter); + } + + U opCast(U)() const + if (is(typeof(U(BaseType.init)))) + { return cast(U)value; } alias value this; } -alias ThreadCallback = void function(intptr_t param) @safe nothrow; +alias ThreadCallback0 = void function() @safe nothrow; +alias ThreadCallback1 = void function(intptr_t param1) @safe nothrow; +alias ThreadCallback2 = void function(intptr_t param1, intptr_t param2) @safe nothrow; +alias ThreadCallback = ThreadCallback1; struct FD { mixin Handle!("fd", size_t, size_t.max); } struct SocketFD { mixin Handle!("socket", FD); } diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index e9e131b..c549648 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -35,6 +35,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver static struct Lookup { shared(bool) done; DNSLookupCallback callback; + uint validationCounter; addrinfo* result; int retcode; string name; @@ -44,6 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver Events m_events; EventID m_event = EventID.invalid; size_t m_maxHandle; + uint m_validationCounter; } this(Events events, Signals signals) @@ -64,7 +66,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver override DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished) { debug (EventCoreLogDNS) print("lookup %s", name); - auto handle = getFreeHandle(); + auto handle = allocateHandle(); if (handle > m_maxHandle) m_maxHandle = handle; assert(on_lookup_finished !is null, "Null callback passed to lookupHost"); @@ -123,15 +125,22 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver debug (EventCoreLogDNS) print("lookup handle: %s", handle); m_events.loop.m_waiterCount++; - return handle; + return DNSLookupID(handle, l.validationCounter); } override void cancelLookup(DNSLookupID handle) { + if (!isValid(handle)) return; m_lookups[handle].callback = null; m_events.loop.m_waiterCount--; } + override bool isValid(DNSLookupID handle) + const { + if (handle.value >= m_lookups.length) return false; + return m_lookups[handle.value].validationCounter == handle.validationCounter; + } + private void onDNSSignal(EventID event) @trusted nothrow { @@ -171,20 +180,26 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver l.done = false; if (i == m_maxHandle) m_maxHandle = lastmax; m_events.loop.m_waiterCount--; - passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai); + passToDNSCallback(DNSLookupID(i, l.validationCounter), cb, status, ai); } else lastmax = i; } } debug (EventCoreLogDNS) print("Max active DNS handle: %s", m_maxHandle); } - private DNSLookupID getFreeHandle() + private DNSLookupID allocateHandle() @safe nothrow { assert(m_lookups.length <= int.max); + int id = cast(int)m_lookups.length; foreach (i, ref l; m_lookups) - if (!l.callback) - return cast(DNSLookupID)cast(int)i; - return cast(DNSLookupID)cast(int)m_lookups.length; + if (!l.callback) { + id = cast(int)i; + break; + } + + auto vc = ++m_validationCounter; + m_lookups[id].validationCounter = vc; + return DNSLookupID(cast(int)id, vc); } private void setupEvent() @@ -204,12 +219,14 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive private { static struct Lookup { gaicb ctx; + uint validationCounter; DNSLookupCallback callback; } ChoppedVector!Lookup m_lookups; Events m_events; Signals m_signals; int m_dnsSignal; + uint m_validationCounter; SignalListenID m_sighandle; } @@ -232,7 +249,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive { import std.string : toStringz; - auto handle = getFreeHandle(); + auto handle = allocateHandle(); sigevent evt; evt.sigev_notify = SIGEV_SIGNAL; @@ -260,6 +277,13 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive m_events.loop.m_waiterCount--; } + override bool isValid(DNSLookupID handle) + { + if (handle.value >= m_lookups.length) + return false; + return m_lookups[handle.value].validationCounter == handle.validationCounter; + } + private void onDNSSignal(SignalListenID, SignalStatus status, int signal) @safe nothrow { @@ -284,11 +308,14 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive } } - private DNSLookupID getFreeHandle() + private DNSLookupID allocateHandle() { foreach (i, ref l; m_lookups) - if (!l.callback) + if (!l.callback) { + m_lookups[i].validationCounter = ++m_validationCounter; return cast(DNSLookupID)cast(int)i; + } + m_lookups[m_lookups.length].validationCounter = ++m_validationCounter; return cast(DNSLookupID)cast(int)m_lookups.length; } } @@ -344,7 +371,7 @@ final class EventDriverDNS_GHBN(Events : EventDriverEvents, Signals : EventDrive { import std.string : toStringz; - auto handle = DNSLookupID(m_maxHandle++); + auto handle = DNSLookupID(m_maxHandle++, 0); auto he = () @trusted { return gethostbyname(name.toStringz); } (); if (he is null) { @@ -377,6 +404,11 @@ final class EventDriverDNS_GHBN(Events : EventDriverEvents, Signals : EventDrive } override void cancelLookup(DNSLookupID) {} + + override bool isValid(DNSLookupID) + const { + return true; + } } package struct DNSSlot { diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 45e0184..c5e542e 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -160,6 +160,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime protected alias ExtraEventsCallback = bool delegate(long); + private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private { Loop m_loop; Timers m_timers; @@ -169,7 +171,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime EventID m_wakeupEvent; shared Mutex m_threadCallbackMutex; - ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; + ConsumableQueue!ThreadCallbackEntry m_threadCallbacks; } this(Loop loop, Timers timers, Events events, Processes processes) @@ -186,7 +188,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } (); } - m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t))); + m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry); m_threadCallbacks.reserve(1000); } @@ -260,7 +262,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_exit = false; } - final override void runInOwnerThread(ThreadCallback del, intptr_t param) + final override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) shared { auto m = atomicLoad(m_threadCallbackMutex); auto evt = atomicLoad(m_wakeupEvent); @@ -273,12 +275,14 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(tuple(del, param)); + .put(ThreadCallbackEntry(del, param1, param2)); } catch (Exception e) assert(false, e.msg); m_events.trigger(m_wakeupEvent, false); } + alias runInOwnerThread = EventDriverCore.runInOwnerThread; + final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { @@ -300,14 +304,14 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime import std.stdint : intptr_t; while (true) { - Tuple!(ThreadCallback, intptr_t) del; + ThreadCallbackEntry del; try { synchronized (m_threadCallbackMutex) { if (m_threadCallbacks.empty) break; del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1]); + del[0](del[1], del[2]); } } } @@ -336,11 +340,13 @@ package class PosixEventLoop { /// Updates the event mask to use for listening for notifications. protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc; - final protected void notify(EventType evt)(FD fd) + final protected void notify(EventType evt)(size_t fd) { //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); - if (m_fds[fd.value].common.callback[evt]) - m_fds[fd.value].common.callback[evt](fd); + if (m_fds[fd].common.callback[evt]) { + auto vc = m_fds[fd].common.validationCounter; + m_fds[fd].common.callback[evt](FD(fd, vc)); + } } final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) @@ -348,7 +354,7 @@ package class PosixEventLoop { // TODO: optimize! foreach (i; 0 .. cast(int)m_fds.length) if (m_fds[i].common.callback[evt]) - del(cast(FD)i); + del(FD(i, m_fds[i].common.validationCounter)); } package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) @@ -370,17 +376,23 @@ package class PosixEventLoop { } } - package void initFD(T)(FD fd, FDFlags flags, auto ref T slot_init) + package FDType initFD(FDType, T)(size_t fd, FDFlags flags, auto ref T slot_init) { - with (m_fds[fd.value]) { + uint vc; + + with (m_fds[fd]) { assert(common.refCount == 0, "Initializing referenced file descriptor slot."); assert(specific.kind == typeof(specific).Kind.none, "Initializing slot that has not been cleared."); common.refCount = 1; common.flags = flags; specific = slot_init; + vc = common.validationCounter; } + if (!(flags & FDFlags.internal)) m_handleCount++; + + return FDType(fd, vc); } package void clearFD(T)(FD fd) @@ -388,6 +400,7 @@ package class PosixEventLoop { import taggedalgebraic : hasType; auto slot = () @trusted { return &m_fds[fd.value]; } (); + assert(slot.common.validationCounter == fd.validationCounter, "Clearing FD slot for invalid FD"); assert(slot.common.refCount == 0, "Clearing referenced file descriptor slot."); assert(slot.specific.hasType!T, "Clearing file descriptor slot with unmatched type."); @@ -400,7 +413,10 @@ package class PosixEventLoop { foreach (cb; slot.common.callback) if (cb !is null) m_waiterCount--; + + auto vc = slot.common.validationCounter; *slot = m_fds.FullField.init; + slot.common.validationCounter = vc + 1; } package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @@ -426,6 +442,7 @@ alias FDSlotCallback = void delegate(FD); private struct FDSlot { FDSlotCallback[EventType.max+1] callback; uint refCount; + uint validationCounter; FDFlags flags; DataInitializer userDataDestructor; diff --git a/source/eventcore/drivers/posix/epoll.d b/source/eventcore/drivers/posix/epoll.d index 447f0c7..b6a5767 100644 --- a/source/eventcore/drivers/posix/epoll.d +++ b/source/eventcore/drivers/posix/epoll.d @@ -51,7 +51,7 @@ final class EpollEventLoop : PosixEventLoop { if (ret > 0) { foreach (ref evt; m_events[0 .. ret]) { debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events); - auto fd = cast(FD)evt.data.fd; + auto fd = cast(size_t)evt.data.fd; if (evt.events & (EPOLLERR|EPOLLHUP|EPOLLRDHUP)) notify!(EventType.status)(fd); if (evt.events & EPOLLIN) notify!(EventType.read)(fd); if (evt.events & EPOLLOUT) notify!(EventType.write)(fd); diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 41d3517..791907b 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -45,9 +45,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS version (linux) { auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (eid == -1) return EventID.invalid; - auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue - m_loop.initFD(id, FDFlags.internal, + auto id = m_loop.initFD!EventID(eid, FDFlags.internal, EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); @@ -101,19 +100,20 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS // use the second socket as the event ID and as the sending end for // other threads - auto id = cast(EventID)fd[1]; - try m_sockets.userData!EventID(s) = id; - catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue - m_loop.initFD(id, FDFlags.internal, + auto id = m_loop.initFD!EventID(fd[1], FDFlags.internal, EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); assert(getRC(id) == 1); + try m_sockets.userData!EventID(s) = id; + catch (Exception e) assert(false, e.msg); return id; } } final override void trigger(EventID event, bool notify_all) { + if (!isValid(event)) return; + auto slot = getSlot(event); if (notify_all) { //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); @@ -141,6 +141,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void wait(EventID event, EventCallback on_event) @nogc { + if (!isValid(event)) return; + if (!isInternal(event)) m_loop.m_waiterCount++; getSlot(event).waiters.put(on_event); } @@ -150,6 +152,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; + if (!isValid(event)) return; + if (!isInternal(event)) m_loop.m_waiterCount--; getSlot(event).waiters.removePending(on_event); } @@ -176,14 +180,24 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } } + override bool isValid(EventID handle) + const { + if (handle.value >= m_loop.m_fds.length) return false; + return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; + } + final override void addRef(EventID descriptor) { + if (!isValid(descriptor)) return; + assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD."); getRC(descriptor)++; } final override bool releaseRef(EventID descriptor) @nogc { + if (!isValid(descriptor)) return true; + nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); if (--getRC(descriptor) == 0) { if (!isInternal(descriptor)) @@ -209,6 +223,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final protected override void* rawUserData(EventID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } diff --git a/source/eventcore/drivers/posix/kqueue.d b/source/eventcore/drivers/posix/kqueue.d index 959f75d..64b15a6 100644 --- a/source/eventcore/drivers/posix/kqueue.d +++ b/source/eventcore/drivers/posix/kqueue.d @@ -66,7 +66,7 @@ final class KqueueEventLoop : PosixEventLoop { foreach (ref evt; m_events[0 .. ret]) { //print("event %s %s", evt.ident, evt.filter, evt.flags); assert(evt.ident <= uint.max); - auto fd = cast(FD)cast(int)evt.ident; + auto fd = cast(size_t)evt.ident; if (evt.flags & (EV_EOF|EV_ERROR)) notify!(EventType.status)(fd); switch (evt.filter) { diff --git a/source/eventcore/drivers/posix/pipes.d b/source/eventcore/drivers/posix/pipes.d index f1c833d..36a9e4c 100644 --- a/source/eventcore/drivers/posix/pipes.d +++ b/source/eventcore/drivers/posix/pipes.d @@ -10,383 +10,420 @@ import std.algorithm : min, max; final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN; - import core.sys.posix.unistd : close, read, write; - import core.sys.posix.fcntl; - import core.sys.posix.poll; + import core.stdc.errno : errno, EAGAIN; + import core.sys.posix.unistd : close, read, write; + import core.sys.posix.fcntl; + import core.sys.posix.poll; - private Loop m_loop; + private Loop m_loop; - this(Loop loop) - @nogc { - m_loop = loop; - } + this(Loop loop) + @nogc { + m_loop = loop; + } - final override PipeFD adopt(int system_fd) - { - auto fd = PipeFD(system_fd); - if (m_loop.m_fds[fd].common.refCount) // FD already in use? - return PipeFD.invalid; + final override PipeFD adopt(int system_fd) + { + if (m_loop.m_fds[system_fd].common.refCount) // FD already in use? + return PipeFD.invalid; // Suprisingly cannot use O_CLOEXEC here, so use FD_CLOEXEC instead. () @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK | FD_CLOEXEC); } (); - m_loop.initFD(fd, FDFlags.none, PipeSlot.init); - m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); - return fd; - } + auto fd = m_loop.initFD!PipeFD(system_fd, FDFlags.none, PipeSlot.init); + m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + return fd; + } - final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) - { - auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); + final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) + { + if (!isValid(pipe)) { + on_read_finish(pipe, IOStatus.invalidHandle, 0); + return; + } - // Read failed - if (ret < 0) { - auto err = errno; - if (err != EAGAIN) { - print("Pipe error %s!", err); - on_read_finish(pipe, IOStatus.error, 0); - return; - } - } + auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); - // EOF - if (ret == 0 && buffer.length > 0) { - on_read_finish(pipe, IOStatus.disconnected, 0); - return; - } + // Read failed + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + print("Pipe error %s!", err); + on_read_finish(pipe, IOStatus.error, 0); + return; + } + } - // Handle immediate mode - if (ret < 0 && mode == IOMode.immediate) { - on_read_finish(pipe, IOStatus.wouldBlock, 0); - return; - } + // EOF + if (ret == 0 && buffer.length > 0) { + on_read_finish(pipe, IOStatus.disconnected, 0); + return; + } - // Handle successful read - if (ret >= 0) { - buffer = buffer[ret .. $]; + // Handle immediate mode + if (ret < 0 && mode == IOMode.immediate) { + on_read_finish(pipe, IOStatus.wouldBlock, 0); + return; + } - // Handle completed read - if (mode != IOMode.all || buffer.length == 0) { - on_read_finish(pipe, IOStatus.ok, ret); - return; - } - } + // Handle successful read + if (ret >= 0) { + buffer = buffer[ret .. $]; - auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); - assert(slot.readCallback is null, "Concurrent reads are not allowed."); - slot.readCallback = on_read_finish; - slot.readMode = mode; - slot.bytesRead = max(ret, 0); - slot.readBuffer = buffer; + // Handle completed read + if (mode != IOMode.all || buffer.length == 0) { + on_read_finish(pipe, IOStatus.ok, ret); + return; + } + } - // Need to use EventType.status as well, as pipes don't otherwise notify - // of closes - m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead); - m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead); - } + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.readCallback is null, "Concurrent reads are not allowed."); + slot.readCallback = on_read_finish; + slot.readMode = mode; + slot.bytesRead = max(ret, 0); + slot.readBuffer = buffer; - private void onPipeRead(FD fd) - { - auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); - auto pipe = cast(PipeFD)fd; + // Need to use EventType.status as well, as pipes don't otherwise notify + // of closes + m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead); + m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead); + } - void finalize(IOStatus status) - { - addRef(pipe); - scope(exit) releaseRef(pipe); + private void onPipeRead(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; - m_loop.setNotifyCallback!(EventType.read)(pipe, null); - m_loop.setNotifyCallback!(EventType.status)(pipe, null); - auto cb = slot.readCallback; - slot.readCallback = null; - slot.readBuffer = null; - cb(pipe, status, slot.bytesRead); - } + void finalize(IOStatus status) + { + addRef(pipe); + scope(exit) releaseRef(pipe); - ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } (); + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + m_loop.setNotifyCallback!(EventType.status)(pipe, null); + auto cb = slot.readCallback; + slot.readCallback = null; + slot.readBuffer = null; + cb(pipe, status, slot.bytesRead); + } - // Read failed - if (ret < 0) { - auto err = errno; - if (err != EAGAIN) { - print("Pipe error %s!", err); - finalize(IOStatus.error); - return; - } - } + ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } (); - // EOF - if (ret == 0 && slot.readBuffer.length > 0) { - finalize(IOStatus.disconnected); - return; - } + // Read failed + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + print("Pipe error %s!", err); + finalize(IOStatus.error); + return; + } + } - // Successful read - if (ret > 0 || !slot.readBuffer.length) { - slot.readBuffer = slot.readBuffer[ret .. $]; - slot.bytesRead += ret; + // EOF + if (ret == 0 && slot.readBuffer.length > 0) { + finalize(IOStatus.disconnected); + return; + } - // Handle completed read - if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { - finalize(IOStatus.ok); - return; - } - } - } + // Successful read + if (ret > 0 || !slot.readBuffer.length) { + slot.readBuffer = slot.readBuffer[ret .. $]; + slot.bytesRead += ret; - final override void cancelRead(PipeFD pipe) - { - auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); - assert(slot.readCallback !is null, "Cancelling read when there is no read in progress."); - m_loop.setNotifyCallback!(EventType.read)(pipe, null); - slot.readBuffer = null; - } + // Handle completed read + if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { + finalize(IOStatus.ok); + return; + } + } + } - final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) - { - if (buffer.length == 0) { - on_write_finish(pipe, IOStatus.ok, 0); - return; - } + final override void cancelRead(PipeFD pipe) + { + if (!isValid(pipe)) return; - ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.readCallback !is null, "Cancelling read when there is no read in progress."); + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + slot.readBuffer = null; + } - if (ret < 0) { - auto err = errno; - if (err != EAGAIN) { - on_write_finish(pipe, IOStatus.error, 0); - return; - } + final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) + { + if (!isValid(pipe)) { + on_write_finish(pipe, IOStatus.invalidHandle, 0); + return; + } - if (mode == IOMode.immediate) { - on_write_finish(pipe, IOStatus.wouldBlock, 0); - return; - } - } else { - buffer = buffer[ret .. $]; + if (buffer.length == 0) { + on_write_finish(pipe, IOStatus.ok, 0); + return; + } - if (mode != IOMode.all || buffer.length == 0) { - on_write_finish(pipe, IOStatus.ok, ret); - return; - } - } + ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); - auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); - assert(slot.writeCallback is null, "Concurrent writes not allowed."); - slot.writeCallback = on_write_finish; - slot.writeMode = mode; - slot.bytesWritten = max(ret, 0); - slot.writeBuffer = buffer; + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + on_write_finish(pipe, IOStatus.error, 0); + return; + } - m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite); - } + if (mode == IOMode.immediate) { + on_write_finish(pipe, IOStatus.wouldBlock, 0); + return; + } + } else { + buffer = buffer[ret .. $]; - private void onPipeWrite(FD fd) - { - auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); - auto pipe = cast(PipeFD)fd; + if (mode != IOMode.all || buffer.length == 0) { + on_write_finish(pipe, IOStatus.ok, ret); + return; + } + } - void finalize(IOStatus status) - { - addRef(pipe); - scope(exit) releaseRef(pipe); + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.writeCallback is null, "Concurrent writes not allowed."); + slot.writeCallback = on_write_finish; + slot.writeMode = mode; + slot.bytesWritten = max(ret, 0); + slot.writeBuffer = buffer; - m_loop.setNotifyCallback!(EventType.write)(pipe, null); - auto cb = slot.writeCallback; - slot.writeCallback = null; - slot.writeBuffer = null; - cb(pipe, status, slot.bytesWritten); - } + m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite); + } - ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } (); + private void onPipeWrite(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; - if (ret < 0) { - auto err = errno; - if (err != EAGAIN) { - finalize(IOStatus.error); - } - } else { - slot.bytesWritten += ret; - slot.writeBuffer = slot.writeBuffer[ret .. $]; + void finalize(IOStatus status) + { + addRef(pipe); + scope(exit) releaseRef(pipe); - if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { - finalize(IOStatus.ok); - } - } + m_loop.setNotifyCallback!(EventType.write)(pipe, null); + auto cb = slot.writeCallback; + slot.writeCallback = null; + slot.writeBuffer = null; + cb(pipe, status, slot.bytesWritten); + } - } + ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } (); - final override void cancelWrite(PipeFD pipe) - { - auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); - assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress."); - m_loop.setNotifyCallback!(EventType.write)(pipe, null); - slot.writeCallback = null; - slot.writeBuffer = null; - } + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + finalize(IOStatus.error); + } + } else { + slot.bytesWritten += ret; + slot.writeBuffer = slot.writeBuffer[ret .. $]; - final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) - { - if (pollPipe(pipe, on_data_available)) - { - return; - } + if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { + finalize(IOStatus.ok); + } + } - auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + } - assert(slot.readCallback is null, "Concurrent reads are not allowed."); - slot.readCallback = on_data_available; - slot.readMode = IOMode.once; // currently meaningless - slot.bytesRead = 0; // currently meaningless - slot.readBuffer = null; - m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable); - } + final override void cancelWrite(PipeFD pipe) + { + if (!isValid(pipe)) return; - private void onPipeDataAvailable(FD fd) - { - auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); - auto pipe = cast(PipeFD)fd; + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); + assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress."); + m_loop.setNotifyCallback!(EventType.write)(pipe, null); + slot.writeCallback = null; + slot.writeBuffer = null; + } - auto callback = (PipeFD f, IOStatus s, size_t m) { - addRef(f); - scope(exit) releaseRef(f); + final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) + { + if (!isValid(pipe)) { + on_data_available(pipe, IOStatus.invalidHandle, 0); + return; + } - auto cb = slot.readCallback; - slot.readCallback = null; - slot.readBuffer = null; - cb(f, s, m); - }; + if (pollPipe(pipe, on_data_available)) + { + return; + } - if (pollPipe(pipe, callback)) - { - m_loop.setNotifyCallback!(EventType.read)(pipe, null); - } - } + auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); - private bool pollPipe(PipeFD pipe, PipeIOCallback callback) - @trusted { - // Use poll to check if any data is available - pollfd pfd; - pfd.fd = cast(int)pipe; - pfd.events = POLLIN; - int ret = poll(&pfd, 1, 0); + assert(slot.readCallback is null, "Concurrent reads are not allowed."); + slot.readCallback = on_data_available; + slot.readMode = IOMode.once; // currently meaningless + slot.bytesRead = 0; // currently meaningless + slot.readBuffer = null; + m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable); + } - if (ret == -1) { - print("Error polling pipe: %s!", errno); - callback(pipe, IOStatus.error, 0); - return true; - } + private void onPipeDataAvailable(FD fd) + { + auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); + auto pipe = cast(PipeFD)fd; - if (ret == 1) { - callback(pipe, IOStatus.error, 0); - return true; - } + auto callback = (PipeFD f, IOStatus s, size_t m) { + addRef(f); + scope(exit) releaseRef(f); - return false; - } + auto cb = slot.readCallback; + slot.readCallback = null; + slot.readBuffer = null; + cb(f, s, m); + }; - final override void close(PipeFD pipe) - { - // TODO: Maybe actually close here instead of waiting for releaseRef? - close(cast(int)pipe); - } + if (pollPipe(pipe, callback)) + { + m_loop.setNotifyCallback!(EventType.read)(pipe, null); + } + } - final override void addRef(PipeFD pipe) - { - auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); - assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD."); - slot.common.refCount++; - } + private bool pollPipe(PipeFD pipe, PipeIOCallback callback) + @trusted { + // Use poll to check if any data is available + pollfd pfd; + pfd.fd = cast(int)pipe; + pfd.events = POLLIN; + int ret = poll(&pfd, 1, 0); - final override bool releaseRef(PipeFD pipe) - { - import taggedalgebraic : hasType; - auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); - nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); + if (ret == -1) { + print("Error polling pipe: %s!", errno); + callback(pipe, IOStatus.error, 0); + return true; + } - if (--slot.common.refCount == 0) { - m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!PipeSlot(pipe); + if (ret == 1) { + callback(pipe, IOStatus.error, 0); + return true; + } - close(cast(int)pipe); - return false; - } - return true; - } + return false; + } - final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - return m_loop.rawUserDataImpl(fd, size, initialize, destroy); - } + final override void close(PipeFD pipe) + { + if (!isValid(pipe)) return; + + // TODO: Maybe actually close here instead of waiting for releaseRef? + close(cast(int)pipe); + } + + override bool isValid(PipeFD handle) + const { + if (handle.value >= m_loop.m_fds.length) return false; + return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; + } + + final override void addRef(PipeFD pipe) + { + if (!isValid(pipe)) return; + + auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); + assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD."); + slot.common.refCount++; + } + + final override bool releaseRef(PipeFD pipe) + { + import taggedalgebraic : hasType; + + if (!isValid(pipe)) return true; + + auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); + nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); + + if (--slot.common.refCount == 0) { + m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!PipeSlot(pipe); + + close(cast(int)pipe); + return false; + } + return true; + } + + final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + return m_loop.rawUserDataImpl(fd, size, initialize, destroy); + } +>>>>>>> 568465d... Make the API robust against using invalid handles. Fixes #105. } final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @safe: /*@nogc:*/ nothrow: - this(Loop loop) {} + this(Loop loop) {} - override PipeFD adopt(int system_pipe_handle) - { - assert(false, "TODO!"); - } + override PipeFD adopt(int system_pipe_handle) + { + assert(false, "TODO!"); + } - override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) - { - assert(false, "TODO!"); - } + override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) + { + assert(false, "TODO!"); + } - override void cancelRead(PipeFD pipe) - { - assert(false, "TODO!"); - } + override void cancelRead(PipeFD pipe) + { + assert(false, "TODO!"); + } - override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) - { - assert(false, "TODO!"); - } + override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) + { + assert(false, "TODO!"); + } - override void cancelWrite(PipeFD pipe) - { - assert(false, "TODO!"); - } + override void cancelWrite(PipeFD pipe) + { + assert(false, "TODO!"); + } - override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) - { - assert(false, "TODO!"); - } + override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) + { + assert(false, "TODO!"); + } - override void close(PipeFD pipe) - { - assert(false, "TODO!"); - } + override void close(PipeFD pipe) + { + assert(false, "TODO!"); + } - override void addRef(PipeFD pid) - { - assert(false, "TODO!"); - } + override bool isValid(PipeFD handle) + const { + return false; + } - override bool releaseRef(PipeFD pid) - { - assert(false, "TODO!"); - } + override void addRef(PipeFD pid) + { + assert(false, "TODO!"); + } - protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - assert(false, "TODO!"); - } + override bool releaseRef(PipeFD pid) + { + assert(false, "TODO!"); + } + + protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } } package struct PipeSlot { - alias Handle = PipeFD; + alias Handle = PipeFD; - size_t bytesRead; - ubyte[] readBuffer; - IOMode readMode; - PipeIOCallback readCallback; + size_t bytesRead; + ubyte[] readBuffer; + IOMode readMode; + PipeIOCallback readCallback; - size_t bytesWritten; - const(ubyte)[] writeBuffer; - IOMode writeMode; - PipeIOCallback writeCallback; + size_t bytesWritten; + const(ubyte)[] writeBuffer; + IOMode writeMode; + PipeIOCallback writeCallback; } diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 23cf4ec..93f3593 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -22,12 +22,13 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces private { static shared Mutex s_mutex; - static __gshared ProcessInfo[ProcessID] s_processes; + static __gshared ProcessInfo[int] s_processes; static __gshared Thread s_waitThread; Loop m_loop; // FIXME: avoid virtual funciton calls and use the final type instead EventDriver m_driver; + uint m_validationCounter; } this(Loop loop, EventDriver driver) @@ -42,13 +43,14 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces final override ProcessID adopt(int system_pid) { - auto pid = cast(ProcessID)system_pid; - ProcessInfo info; info.exited = false; info.refCount = 1; + info.validationCounter = ++m_validationCounter; info.driver = this; - add(pid, info); + + auto pid = ProcessID(system_pid, info.validationCounter); + add(system_pid, info); return pid; } @@ -154,7 +156,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces @trusted { import core.sys.posix.signal : pkill = kill; - assert(cast(int)pid > 0, "Invalid PID passed to kill."); + if (!isValid(pid)) return; if (cast(int)pid > 0) pkill(cast(int)pid, signal); @@ -167,7 +169,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces size_t id = size_t.max; lockedProcessInfo(pid, (info) { - assert(info !is null, "Unknown process ID"); + if (!info) return; if (info.exited) { exited = true; @@ -190,7 +192,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces if (wait_id == size_t.max) return; lockedProcessInfo(pid, (info) { - assert(info !is null, "Unknown process ID"); + if (!info) return; + assert(!info.exited, "Cannot cancel wait when none are pending"); assert(info.callbacks.length > wait_id, "Invalid process wait ID"); @@ -205,18 +208,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces private static void onLocalProcessExit(intptr_t system_pid) { - auto pid = cast(ProcessID)system_pid; - int exitCode; ProcessWaitCallback[] callbacks; + ProcessID pid; + PosixEventDriverProcesses driver; - lockedProcessInfo(pid, (info) { + lockedProcessInfoPlain(cast(int)system_pid, (info) { assert(info !is null); exitCode = info.exitCode; - callbacks = info.callbacks; + pid = ProcessID(cast(int)system_pid, info.validationCounter); info.callbacks = null; driver = info.driver; @@ -234,15 +237,25 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { bool ret; lockedProcessInfo(pid, (info) { - assert(info !is null, "Unknown process ID"); - ret = info.exited; + if (info) ret = info.exited; + else ret = true; }); return ret; } + override bool isValid(ProcessID handle) + const { + s_mutex.lock_nothrow(); + scope (exit) s_mutex.unlock_nothrow(); + auto info = () @trusted { return cast(int)handle.value in s_processes; } (); + return info && info.validationCounter == handle.validationCounter; + } + final override void addRef(ProcessID pid) { lockedProcessInfo(pid, (info) { + if (!info) return; + nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); info.refCount++; }); @@ -252,13 +265,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces { bool ret; lockedProcessInfo(pid, (info) { + if (!info) { + ret = true; + return; + } + nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); if (--info.refCount == 0) { // Remove/deallocate process if (info.userDataDestructor) () @trusted { info.userDataDestructor(info.userData.ptr); } (); - () @trusted { s_processes.remove(pid); } (); + () @trusted { s_processes.remove(cast(int)pid.value); } (); ret = false; } else ret = true; }); @@ -290,7 +308,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces s_mutex = new shared Mutex; } - private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) + private static void lockedProcessInfoPlain(int pid, scope void delegate(ProcessInfo*) nothrow @safe fn) { s_mutex.lock_nothrow(); scope (exit) s_mutex.unlock_nothrow(); @@ -298,7 +316,14 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces fn(info); } - private static void add(ProcessID pid, ProcessInfo info) @trusted { + private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) + { + lockedProcessInfoPlain(cast(int)pid.value, (pi) { + fn(pi.validationCounter == pid.validationCounter ? pi : null); + }); + } + + private static void add(int pid, ProcessInfo info) @trusted { s_mutex.lock_nothrow(); scope (exit) s_mutex.unlock_nothrow(); @@ -328,7 +353,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces break; } - ProcessID[] allprocs; + int[] allprocs; { s_mutex.lock_nothrow(); @@ -345,8 +370,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces foreach (pid; allprocs) { int status; - ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } (); - if (ret == cast(int)pid) { + ret = () @trusted { return waitpid(pid, &status, WNOHANG); } (); + if (ret == pid) { int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); onProcessExitStatic(ret, exitstatus); } @@ -356,24 +381,21 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces private static void onProcessExitStatic(int system_pid, int exit_status) { - auto pid = cast(ProcessID)system_pid; - PosixEventDriverProcesses driver; - lockedProcessInfo(pid, (ProcessInfo* info) @safe { + lockedProcessInfoPlain(system_pid, (ProcessInfo* info) @safe { // We get notified of any child exiting, so ignore the ones we're // not aware of if (info is null) return; // Increment the ref count to make sure it doesn't get removed info.refCount++; - info.exited = true; info.exitCode = exit_status; driver = info.driver; }); if (driver) - () @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid); + () @trusted { return cast(shared)driver; } ().onProcessExit(system_pid); } private static struct ProcessInfo { @@ -381,6 +403,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces int exitCode; ProcessWaitCallback[] callbacks; size_t refCount = 0; + uint validationCounter; PosixEventDriverProcesses driver; DataInitializer userDataDestructor; @@ -425,6 +448,11 @@ final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces assert(false, "TODO!"); } + override bool isValid(ProcessID handle) + const { + return false; + } + override void addRef(ProcessID pid) { assert(false, "TODO!"); diff --git a/source/eventcore/drivers/posix/signals.d b/source/eventcore/drivers/posix/signals.d index fdd45ae..2445dfa 100644 --- a/source/eventcore/drivers/posix/signals.d +++ b/source/eventcore/drivers/posix/signals.d @@ -26,19 +26,19 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true) { - auto fd = () @trusted { + auto sigfd = () @trusted { sigset_t sset; sigemptyset(&sset); sigaddset(&sset, sig); if (sigprocmask(SIG_BLOCK, &sset, null) != 0) - return SignalListenID.invalid; + return -1; - return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); + return signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC); } (); - m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none, SignalSlot(on_signal)); + auto fd = m_loop.initFD!SignalListenID(sigfd, is_internal ? FDFlags.internal : FDFlags.none, SignalSlot(on_signal)); m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); @@ -47,14 +47,24 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna return fd; } + override bool isValid(SignalListenID handle) + const { + if (handle.value >= m_loop.m_fds.length) return false; + return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; + } + override void addRef(SignalListenID descriptor) { + if (!isValid(descriptor)) return; + assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD."); m_loop.m_fds[descriptor].common.refCount++; } override bool releaseRef(SignalListenID descriptor) { + if (!isValid(descriptor)) return true; + FD fd = cast(FD)descriptor; nogc_assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference @@ -104,6 +114,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals assert(false); } + override bool isValid(SignalListenID handle) + const { + return false; + } + override void addRef(SignalListenID descriptor) { assert(false); diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index ac3e572..fe6aaea 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -135,21 +135,17 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return StreamSocketFD.invalid; } - auto sock = cast(StreamSocketFD)sockfd; - - void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; } - int bret; if (bind_address !is null) - () @trusted { bret = bind(cast(sock_t)sock, bind_address.name, bind_address.nameLen); } (); + () @trusted { bret = bind(sockfd, bind_address.name, bind_address.nameLen); } (); if (bret != 0) { - invalidateSocket(); - on_connect(sock, ConnectStatus.bindFailure); - return sock; + closeSocket(sockfd); + on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure); + return StreamSocketFD.invalid; } - m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init); + auto sock = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init); m_loop.registerFD(sock, EventMask.read|EventMask.write); auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); @@ -167,7 +163,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } else { m_loop.unregisterFD(sock, EventMask.read|EventMask.write); m_loop.clearFD!StreamSocketSlot(sock); - invalidateSocket(); + closeSocket(sockfd); on_connect(StreamSocketFD.invalid, determineConnectStatus(err)); return StreamSocketFD.invalid; } @@ -178,7 +174,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void cancelConnectStream(StreamSocketFD sock) { - assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor"); + if (!isValid(sock)) return; + with (m_loop.m_fds[sock].streamSocket) { assert(state == ConnectionState.connecting, @@ -191,11 +188,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override StreamSocketFD adoptStream(int socket) { - auto fd = StreamSocketFD(socket); - if (m_loop.m_fds[fd].common.refCount) // FD already in use? + if (m_loop.m_fds[socket].common.refCount) // FD already in use? return StreamSocketFD.invalid; - setSocketNonBlocking(fd); - m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); + setSocketNonBlocking(socket); + auto fd = m_loop.initFD!StreamSocketFD(socket, FDFlags.none, StreamSocketSlot.init); m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } @@ -240,39 +236,35 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto sockfd = createSocket(address.addressFamily, SOCK_STREAM); if (sockfd == -1) return StreamListenSocketFD.invalid; - auto sock = cast(StreamListenSocketFD)sockfd; - - void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; } - - () @trusted { + auto succ = () @trusted { int tmp_reuse = 1; // FIXME: error handling! if (options & StreamListenOptions.reuseAddress) { - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { - invalidateSocket(); - return; - } + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) + return false; } - version (Windows) {} else { - if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) { - invalidateSocket(); - return; - } - } - if (bind(sockfd, address.name, address.nameLen) != 0) { - invalidateSocket(); - return; - } - if (listen(sockfd, getBacklogSize()) != 0) { - invalidateSocket(); - return; + + version (Windows) {} + else { + if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) + return false; } + + if (bind(sockfd, address.name, address.nameLen) != 0) + return false; + + if (listen(sockfd, getBacklogSize()) != 0) + return false; + + return true; } (); - if (sock == StreamListenSocketFD.invalid) - return sock; + if (!succ) { + closeSocket(sockfd); + return StreamListenSocketFD.invalid; + } - m_loop.initFD(sock, FDFlags.none, StreamListenSocketSlot.init); + auto sock = m_loop.initFD!StreamListenSocketFD(sockfd, FDFlags.none, StreamListenSocketSlot.init); if (on_accept) waitForConnections(sock, on_accept); @@ -281,6 +273,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { + if (!isValid(sock)) return; + m_loop.registerFD(sock, EventMask.read, false); m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept); @@ -298,10 +292,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } else { () @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } (); if (sockfd == -1) return; - setSocketNonBlocking(cast(SocketFD)sockfd, true); + setSocketNonBlocking(sockfd, true); } - auto fd = cast(StreamSocketFD)sockfd; - m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init); + auto fd = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init); m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.registerFD(fd, EventMask.read|EventMask.write); //print("accept %d", sockfd); @@ -311,11 +304,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets ConnectionState getConnectionState(StreamSocketFD sock) { + if (!isValid(sock)) return ConnectionState.closed; return m_loop.m_fds[sock].streamSocket.state; } final override bool getLocalAddress(SocketFD sock, scope RefAddress dst) { + if (!isValid(sock)) return false; + socklen_t addr_len = dst.nameLen; if (() @trusted { return getsockname(cast(sock_t)sock, dst.name, &addr_len); } () != 0) return false; @@ -325,6 +321,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool getRemoteAddress(SocketFD sock, scope RefAddress dst) { + if (!isValid(sock)) return false; + socklen_t addr_len = dst.nameLen; if (() @trusted { return getpeername(cast(sock_t)sock, dst.name, &addr_len); } () != 0) return false; @@ -334,12 +332,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void setTCPNoDelay(StreamSocketFD socket, bool enable) { + if (!isValid(socket)) return; + int opt = enable; () @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); } override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted { + if (!isValid(socket)) return; + int opt = enable; int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof); if (err != 0) @@ -348,6 +350,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted { + if (!isValid(socket)) return; + // dunnno about BSD\OSX, maybe someone should fix it for them later version (linux) { setKeepAlive(socket, true); @@ -371,6 +375,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted { + if (!isValid(socket)) return; + version (linux) { uint tmsecs = cast(uint) timeout.total!"msecs"; int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof); @@ -381,6 +387,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { + if (!isValid(socket)) { + on_read_finish(socket, IOStatus.invalidHandle, 0); + return; + } + /*if (buffer.length == 0) { on_read_finish(socket, IOStatus.ok, 0); return; @@ -435,6 +446,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void cancelRead(StreamSocketFD socket) { + if (!isValid(socket)) return; + assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); with (m_loop.m_fds[socket].streamSocket) { @@ -511,6 +524,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { + if (!isValid(socket)) { + on_write_finish(socket, IOStatus.invalidHandle, 0); + return; + } + if (buffer.length == 0) { on_write_finish(socket, IOStatus.ok, 0); return; @@ -560,6 +578,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void cancelWrite(StreamSocketFD socket) { + if (!isValid(socket)) return; + assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.m_fds[socket].streamSocket.writeBuffer = null; @@ -616,6 +636,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void waitForData(StreamSocketFD socket, IOCallback on_data_available) { + if (!isValid(socket)) { + on_data_available(socket, IOStatus.invalidHandle, 0); + return; + } + sizediff_t ret; ubyte dummy; () @trusted { ret = recv(cast(sock_t)socket, &dummy, 1, MSG_PEEK); } (); @@ -674,6 +699,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write) { + if (!isValid(socket)) return; + auto st = m_loop.m_fds[socket].streamSocket.state; () @trusted { .shutdown(cast(sock_t)socket, shut_read ? shut_write ? SHUT_RDWR : SHUT_RD : shut_write ? SHUT_WR : 0); } (); if (st == ConnectionState.passiveClose) shut_read = true; @@ -690,7 +717,6 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM); if (sockfd == -1) return DatagramSocketFD.invalid; - auto sock = cast(DatagramSocketFD)sockfd; if (bind_address && () @trusted { return bind(sockfd, bind_address.name, bind_address.nameLen); } () != 0) { closeSocket(sockfd); @@ -718,9 +744,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); + auto flags = is_internal ? FDFlags.internal : FDFlags.none; + auto sock = m_loop.initFD!DatagramSocketFD(sockfd, flags, DgramSocketSlot.init); m_loop.registerFD(sock, EventMask.read|EventMask.write); - return sock; } @@ -731,28 +757,34 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false) @nogc { - auto fd = DatagramSocketFD(socket); - if (m_loop.m_fds[fd].common.refCount) // FD already in use? + if (m_loop.m_fds[socket].common.refCount) // FD already in use? return DatagramSocketFD.init; - setSocketNonBlocking(fd, close_on_exec); - m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init); + setSocketNonBlocking(socket, close_on_exec); + auto flags = is_internal ? FDFlags.internal : FDFlags.none; + auto fd = m_loop.initFD!DatagramSocketFD(socket, flags, DgramSocketSlot.init); m_loop.registerFD(fd, EventMask.read|EventMask.write); return fd; } final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) { + if (!isValid(socket)) return; + () @trusted { connect(cast(sock_t)socket, target_address.name, target_address.nameLen); } (); } final override bool setBroadcast(DatagramSocketFD socket, bool enable) { + if (!isValid(socket)) return false; + int tmp_broad = enable; return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; } final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0) { + if (!isValid(socket)) return false; + switch (multicast_address.addressFamily) { default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); case AddressFamily.INET: @@ -784,6 +816,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets @safe { assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); + if (!isValid(socket)) { + RefAddress addr; + on_receive_finish(socket, IOStatus.invalidHandle, 0, addr); + return; + } + sizediff_t ret; sockaddr_storage src_addr; socklen_t src_addr_len = src_addr.sizeof; @@ -826,6 +864,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void cancelReceive(DatagramSocketFD socket) @nogc { + if (!isValid(socket)) return; + assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.m_fds[socket].datagramSocket.readBuffer = null; @@ -861,6 +901,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); + if (!isValid(socket)) { + RefAddress addr; + on_send_finish(socket, IOStatus.invalidHandle, 0, addr); + return; + } + sizediff_t ret; if (target_address) { () @trusted { ret = .sendto(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS, target_address.name, target_address.nameLen); } (); @@ -897,6 +943,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void cancelSend(DatagramSocketFD socket) { + if (!isValid(socket)) return; + assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.m_fds[socket].datagramSocket.writeBuffer = null; @@ -929,8 +977,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); } + final override bool isValid(SocketFD handle) + const { + if (handle.value > m_loop.m_fds.length) return false; + return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; + } + final override void addRef(SocketFD fd) { + if (!isValid(fd)) return; + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); assert(slot.common.refCount > 0, "Adding reference to unreferenced socket FD."); slot.common.refCount++; @@ -939,6 +995,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool releaseRef(SocketFD fd) @nogc { import taggedalgebraic : hasType; + + if (!isValid(fd)) return true; + auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); // listening sockets have an incremented the reference count because of setNotifyCallback @@ -966,6 +1025,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) { + if (!isValid(socket)) return false; + int proto, opt; final switch (option) { case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; @@ -977,6 +1038,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) { + if (!isValid(socket)) return false; + int proto, opt; final switch (option) { case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; @@ -988,16 +1051,19 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } @@ -1010,7 +1076,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } else { () @trusted { sock = socket(family, type, 0); } (); if (sock == -1) return -1; - setSocketNonBlocking(cast(SocketFD)sock, true); + setSocketNonBlocking(sock, true); // Prevent SIGPIPE on failed send version (OSX) { @@ -1078,7 +1144,7 @@ private void closeSocket(sock_t sockfd) else close(sockfd); } -private void setSocketNonBlocking(SocketFD sockfd, bool close_on_exec = false) +private void setSocketNonBlocking(SocketFD.BaseType sockfd, bool close_on_exec = false) @nogc nothrow { version (Windows) { uint enable = 1; diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index ee973f2..ac2b753 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -38,7 +38,9 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver auto handle = () @trusted { return inotify_init1(IN_NONBLOCK | IN_CLOEXEC); } (); if (handle == -1) return WatcherID.invalid; - auto ret = WatcherID(handle); + auto ret = m_loop.initFD!WatcherID(handle, FDFlags.none, WatcherSlot(callback)); + m_loop.registerFD(cast(FD)ret, EventMask.read); + m_loop.setNotifyCallback!(EventType.read)(cast(FD)ret, &onChanges); m_watches[ret] = WatchState(null, path, recursive); @@ -46,23 +48,28 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver if (recursive) addSubWatches(ret, path, ""); - m_loop.initFD(FD(handle), FDFlags.none, WatcherSlot(callback)); - m_loop.registerFD(FD(handle), EventMask.read); - m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); - - processEvents(WatcherID(handle)); + processEvents(ret); return ret; } + final override bool isValid(WatcherID handle) + const { + if (handle.value >= m_loop.m_fds.length) return false; + return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; + } + final override void addRef(WatcherID descriptor) { + if (!isValid(descriptor)) return; assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD."); m_loop.m_fds[descriptor].common.refCount++; } final override bool releaseRef(WatcherID descriptor) { + if (!isValid(descriptor)) return true; + FD fd = cast(FD)descriptor; auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced event FD."); @@ -80,6 +87,7 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver final protected override void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } @@ -204,13 +212,22 @@ final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDrive assert(false, "TODO!"); } + final override bool isValid(WatcherID handle) + const { + return false; + } + final override void addRef(WatcherID descriptor) { + if (!isValid(descriptor)) return; + assert(false, "TODO!"); } final override bool releaseRef(WatcherID descriptor) { + if (!isValid(descriptor)) return true; + /*FSEventStreamStop FSEventStreamUnscheduleFromRunLoop FSEventStreamInvalidate @@ -220,6 +237,8 @@ final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDrive final protected override void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; + return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } @@ -283,9 +302,15 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat return cast(WatcherID)evt; } + final override bool isValid(WatcherID handle) + const { + return m_events.isValid(cast(EventID)handle); + } + final override void addRef(WatcherID descriptor) { - assert(descriptor != WatcherID.invalid); + if (!isValid(descriptor)) return; + auto evt = cast(EventID)descriptor; auto pt = evt in m_pollers; assert(pt !is null); @@ -294,7 +319,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat final override bool releaseRef(WatcherID descriptor) { - nogc_assert(descriptor != WatcherID.invalid, "Invalid directory watcher ID released"); + if (!isValid(descriptor)) return true; + auto evt = cast(EventID)descriptor; auto pt = evt in m_pollers; nogc_assert(pt !is null, "Directory watcher polling thread does not exist"); diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 45d64f0..185c693 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -103,6 +103,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil IOInfo write; bool open = true; + uint validationCounter; + int refCount; DataInitializer userDataDestructor; ubyte[16*size_t.sizeof] userData; @@ -110,8 +112,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil TaskPool m_fileThreadPool; ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop - SmallIntegerSet!FileFD m_activeReads; - SmallIntegerSet!FileFD m_activeWrites; + SmallIntegerSet!size_t m_activeReads; + SmallIntegerSet!size_t m_activeWrites; EventID m_readyEvent = EventID.invalid; bool m_waiting; Events m_events; @@ -170,13 +172,17 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil } if (m_files[system_file_handle].refCount > 0) return FileFD.invalid; + auto vc = m_files[system_file_handle].validationCounter; m_files[system_file_handle] = FileInfo.init; m_files[system_file_handle].refCount = 1; - return FileFD(system_file_handle); + m_files[system_file_handle].validationCounter = vc + 1; + return FileFD(system_file_handle, vc + 1); } void close(FileFD file) { + if (!isValid(file)) return; + // NOTE: The file descriptor itself must stay open until the reference // count drops to zero, or this would result in dangling handles. // In case of an exclusive file lock, the lock should be lifted @@ -186,6 +192,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil ulong getSize(FileFD file) { + if (!isValid(file)) return ulong.max; + version (linux) { // stat_t seems to be defined wrong on linux/64 return .lseek(cast(int)file, 0, SEEK_END); @@ -198,6 +206,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil override void truncate(FileFD file, ulong size, FileIOCallback on_finish) { + if (!isValid(file)) return; + version (Posix) { // FIXME: do this in the thread pool @@ -214,6 +224,11 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil final override void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode, FileIOCallback on_write_finish) { + if (!isValid(file)) { + on_write_finish(file, IOStatus.invalidHandle, 0); + return; + } + //assert(this.writable); auto f = () @trusted { return &m_files[file]; } (); @@ -226,7 +241,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil assert(false, "Concurrent file writes are not allowed."); assert(f.write.callback is null, "Concurrent file writes are not allowed."); f.write.callback = on_write_finish; - m_activeWrites.insert(file); + m_activeWrites.insert(file.value); threadSetup(); log("start write task"); try { @@ -235,7 +250,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { - m_activeWrites.remove(file); + m_activeWrites.remove(file.value); on_write_finish(file, IOStatus.error, 0); return; } @@ -243,17 +258,24 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil final override void cancelWrite(FileFD file) { - assert(m_activeWrites.contains(file), "Cancelling write when no write is in progress."); + if (!isValid(file)) return; + + assert(m_activeWrites.contains(file.value), "Cancelling write when no write is in progress."); auto f = &m_files[file].write; f.callback = null; - m_activeWrites.remove(file); + m_activeWrites.remove(file.value); m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish) { + if (!isValid(file)) { + on_read_finish(file, IOStatus.invalidHandle, 0); + return; + } + auto f = () @trusted { return &m_files[file]; } (); if (!f.open) { @@ -265,7 +287,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil assert(false, "Concurrent file reads are not allowed."); assert(f.read.callback is null, "Concurrent file reads are not allowed."); f.read.callback = on_read_finish; - m_activeReads.insert(file); + m_activeReads.insert(file.value); threadSetup(); log("start read task"); try { @@ -274,7 +296,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { - m_activeReads.remove(file); + m_activeReads.remove(file.value); on_read_finish(file, IOStatus.error, 0); return; } @@ -282,28 +304,40 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil final override void cancelRead(FileFD file) { - assert(m_activeReads.contains(file), "Cancelling read when no read is in progress."); + if (!isValid(file)) return; + + assert(m_activeReads.contains(file.value), "Cancelling read when no read is in progress."); auto f = &m_files[file].read; f.callback = null; - m_activeReads.remove(file); + m_activeReads.remove(file.value); m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } + final override bool isValid(FileFD handle) + const { + if (handle.value >= m_files.length) return false; + return m_files[handle.value].validationCounter == handle.validationCounter; + } + final override void addRef(FileFD descriptor) { + if (!isValid(descriptor)) return; + m_files[descriptor].refCount++; } final override bool releaseRef(FileFD descriptor) { + if (!isValid(descriptor)) return true; + auto f = () @trusted { return &m_files[descriptor]; } (); if (!--f.refCount) { .close(cast(int)descriptor); *f = FileInfo.init; - assert(!m_activeReads.contains(descriptor)); - assert(!m_activeWrites.contains(descriptor)); + assert(!m_activeReads.contains(descriptor.value)); + assert(!m_activeWrites.contains(descriptor.value)); return false; } return true; @@ -311,6 +345,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil protected final override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; + FileInfo* fds = &m_files[descriptor]; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); @@ -370,11 +406,15 @@ log("wait for status set"); private void onReady(EventID) { log("ready event"); - foreach (f; m_activeReads) - m_files[f].read.finalize(f, { m_activeReads.remove(f); }); + foreach (f; m_activeReads) { + auto fd = FileFD(f, m_files[f].validationCounter); + m_files[f].read.finalize(fd, { m_activeReads.remove(f); }); + } - foreach (f; m_activeWrites) - m_files[f].write.finalize(f, { m_activeWrites.remove(f); }); + foreach (f; m_activeWrites) { + auto fd = FileFD(f, m_files[f].validationCounter); + m_files[f].write.finalize(fd, { m_activeWrites.remove(f); }); + } m_waiting = false; startWaiting(); diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index 14080cc..788900e 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -87,7 +87,7 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { final override TimerID create() @trusted { - auto id = cast(TimerID)(++m_lastTimerID); + auto id = TimerID(++m_lastTimerID, 0); TimerSlot* tm; try tm = ms_allocator.make!TimerSlot; catch (Exception e) return TimerID.invalid; @@ -102,6 +102,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { final override void set(TimerID timer, Duration timeout, Duration repeat) @trusted { + if (!isValid(timer)) return; + scope (failure) assert(false); auto tm = m_timers[timer]; if (tm.pending) stop(timer); @@ -115,6 +117,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { @trusted { import std.algorithm.mutation : swap; + if (!isValid(timer)) return; + auto tm = m_timers[timer]; if (!tm.pending) return; TimerCallback2 cb; @@ -129,16 +133,22 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { final override bool isPending(TimerID descriptor) { + if (!isValid(descriptor)) return false; + return m_timers[descriptor].pending; } final override bool isPeriodic(TimerID descriptor) { + if (!isValid(descriptor)) return false; + return m_timers[descriptor].repeatDuration > Duration.zero; } final override void wait(TimerID timer, TimerCallback2 callback) { + if (!isValid(timer)) return; + assert(!m_timers[timer].callback, "Calling wait() on a timer that is already waiting."); m_timers[timer].callback = callback; addRef(timer); @@ -147,26 +157,29 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { final override void cancelWait(TimerID timer) { + if (!isValid(timer)) return; + auto pt = m_timers[timer]; assert(pt.callback); pt.callback = null; releaseRef(timer); } + override bool isValid(TimerID handle) + const { + return (handle in m_timers) !is null; + } + final override void addRef(TimerID descriptor) { - assert(descriptor != TimerID.init, "Invalid timer ID."); - assert(descriptor in m_timers, "Unknown timer ID."); - if (descriptor !in m_timers) return; + if (!isValid(descriptor)) return; m_timers[descriptor].refCount++; } final override bool releaseRef(TimerID descriptor) { - nogc_assert(descriptor != TimerID.init, "Invalid timer ID."); - nogc_assert((descriptor in m_timers) !is null, "Unknown timer ID."); - if (descriptor !in m_timers) return true; + if (!isValid(descriptor)) return true; auto tm = m_timers[descriptor]; tm.refCount--; @@ -199,12 +212,15 @@ final class LoopTimeoutTimerDriver : EventDriverTimers { final bool isUnique(TimerID descriptor) const { - if (descriptor == TimerID.init) return false; + if (!isValid(descriptor)) return false; + return m_timers[descriptor].refCount == 1; } protected final override void* rawUserData(TimerID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; + TimerSlot* fds = m_timers[descriptor]; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index fb41e93..4227082 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -16,6 +16,8 @@ import std.typecons : Tuple, tuple; final class WinAPIEventDriverCore : EventDriverCore { @safe: /*@nogc:*/ nothrow: + private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private { bool m_exit; size_t m_waiterCount; @@ -25,10 +27,11 @@ final class WinAPIEventDriverCore : EventDriverCore { void delegate() @safe nothrow[MAXIMUM_WAIT_OBJECTS] m_registeredEventCallbacks; DWORD m_registeredEventCount = 0; HANDLE m_fileCompletionEvent; + uint m_validationCounter; ConsumableQueue!IOEvent m_ioEvents; shared Mutex m_threadCallbackMutex; - ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; + ConsumableQueue!ThreadCallbackEntry m_threadCallbacks; } package { @@ -48,7 +51,7 @@ final class WinAPIEventDriverCore : EventDriverCore { else { () @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } (); } - m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t))); + m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry); m_threadCallbacks.reserve(1000); } @@ -138,7 +141,7 @@ final class WinAPIEventDriverCore : EventDriverCore { m_exit = false; } - override void runInOwnerThread(ThreadCallback del, intptr_t param) + override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) shared { import core.atomic : atomicLoad; @@ -152,12 +155,14 @@ final class WinAPIEventDriverCore : EventDriverCore { try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(tuple(del, param)); + .put(ThreadCallbackEntry(del, param1, param2)); } catch (Exception e) assert(false, e.msg); () @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } (); } + alias runInOwnerThread = EventDriverCore.runInOwnerThread; + package void* rawUserDataImpl(HANDLE handle, size_t size, DataInitializer initialize, DataInitializer destroy) @system { HandleSlot* fds = &m_handles[handle]; @@ -254,6 +259,7 @@ final class WinAPIEventDriverCore : EventDriverCore { assert(h !in m_handles, "Handle already in use."); HandleSlot s; s.refCount = 1; + s.validationCounter = ++m_validationCounter; s.specific = SlotType.init; m_handles[h] = s; return () @trusted { return &m_handles[h].specific.get!SlotType(); } (); @@ -276,14 +282,14 @@ final class WinAPIEventDriverCore : EventDriverCore { import std.stdint : intptr_t; while (true) { - Tuple!(ThreadCallback, intptr_t) del; + ThreadCallbackEntry del; try { synchronized (m_threadCallbackMutex) { if (m_threadCallbacks.empty) break; del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1]); + del[0](del[1], del[2]); } } } @@ -302,6 +308,7 @@ private struct HandleSlot { WatcherSlot watcher; } int refCount; + uint validationCounter; TaggedAlgebraic!SpecificTypes specific; DataInitializer userDataDestructor; @@ -344,7 +351,11 @@ package struct FileSlot { auto cb = this.callback; this.callback = null; assert(cb !is null); - cb(cast(FileFD)cast(size_t)overlapped.hEvent, status, bytes_transferred); + if (auto ps = overlapped.hEvent in overlapped.driver.m_handles) { + auto vc = ps.validationCounter; + auto fd = FileFD(cast(size_t)overlapped.hEvent, vc); + cb(fd, status, bytes_transferred); + } } } Direction!false read; diff --git a/source/eventcore/drivers/winapi/dns.d b/source/eventcore/drivers/winapi/dns.d index 045c2e2..5b45c1e 100644 --- a/source/eventcore/drivers/winapi/dns.d +++ b/source/eventcore/drivers/winapi/dns.d @@ -14,7 +14,7 @@ final class WinAPIEventDriverDNS : EventDriverDNS { import std.typecons : scoped; import std.utf : toUTF16z; - auto id = DNSLookupID(0); + auto id = DNSLookupID(0, 0); static immutable ushort[] addrfamilies = [AF_INET, AF_INET6]; @@ -100,6 +100,13 @@ final class WinAPIEventDriverDNS : EventDriverDNS { void cancelLookup(DNSLookupID handle) { + if (!isValid(handle)) return; + assert(false, "TODO!"); } + + override bool isValid(DNSLookupID handle) + const { + return handle == DNSLookupID(0, 0); + } } diff --git a/source/eventcore/drivers/winapi/driver.d b/source/eventcore/drivers/winapi/driver.d index 5e30235..434aab9 100644 --- a/source/eventcore/drivers/winapi/driver.d +++ b/source/eventcore/drivers/winapi/driver.d @@ -24,7 +24,7 @@ import eventcore.internal.utils : mallocT, freeT; import core.sys.windows.windows; static assert(HANDLE.sizeof <= FD.BaseType.sizeof); -static assert(FD(cast(size_t)INVALID_HANDLE_VALUE) == FD.init); +static assert(FD(cast(size_t)INVALID_HANDLE_VALUE, 0) == FD.init); final class WinAPIEventDriver : EventDriver { diff --git a/source/eventcore/drivers/winapi/events.d b/source/eventcore/drivers/winapi/events.d index 731fff1..a31a57e 100644 --- a/source/eventcore/drivers/winapi/events.d +++ b/source/eventcore/drivers/winapi/events.d @@ -56,14 +56,16 @@ final class WinAPIEventDriverEvents : EventDriverEvents { override EventID create() { - auto id = EventID(m_idCounter++); - if (id == EventID.invalid) id = EventID(m_idCounter++); + auto id = EventID(m_idCounter++, 0); + if (id == EventID.invalid) id = EventID(m_idCounter++, 0); m_events[id] = EventSlot(1, new ConsumableQueue!EventCallback); // FIXME: avoid GC allocation return id; } override void trigger(EventID event, bool notify_all = true) { + if (!isValid(event)) return; + auto pe = event in m_events; assert(pe !is null, "Invalid event ID passed to triggerEvent."); if (notify_all) { @@ -82,13 +84,12 @@ final class WinAPIEventDriverEvents : EventDriverEvents { override void trigger(EventID event, bool notify_all = true) shared { import core.atomic : atomicStore; - auto pe = event in m_events; - assert(pe !is null, "Invalid event ID passed to shared triggerEvent."); () @trusted { auto thisus = cast(WinAPIEventDriverEvents)this; EnterCriticalSection(&thisus.m_mutex); - thisus.m_pending.put(Trigger(event, notify_all)); + if (thisus.isValid(event)) + thisus.m_pending.put(Trigger(event, notify_all)); LeaveCriticalSection(&thisus.m_mutex); SetEvent(thisus.m_event); } (); @@ -96,6 +97,8 @@ final class WinAPIEventDriverEvents : EventDriverEvents { override void wait(EventID event, EventCallback on_event) { + if (!isValid(event)) return; + m_core.addWaiter(); return m_events[event].waiters.put(on_event); } @@ -105,18 +108,29 @@ final class WinAPIEventDriverEvents : EventDriverEvents { import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; + if (!isValid(event)) return; + m_events[event].waiters.removePending(on_event); m_core.removeWaiter(); } + override bool isValid(EventID handle) + const { + return (handle in m_events) !is null; + } + override void addRef(EventID descriptor) { + if (!isValid(descriptor)) return; + assert(m_events[descriptor].refCount > 0); m_events[descriptor].refCount++; } override bool releaseRef(EventID descriptor) { + if (!isValid(descriptor)) return true; + auto pe = descriptor in m_events; nogc_assert(pe.refCount > 0, "Releasing unreference event."); if (--pe.refCount == 0) { @@ -156,7 +170,7 @@ final class WinAPIEventDriverEvents : EventDriverEvents { } private static HANDLE idToHandle(EventID event) - @trusted { + @trusted @nogc { return cast(HANDLE)cast(size_t)event; } } diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index faa440f..ab8c21c 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -68,19 +68,24 @@ final class WinAPIEventDriverFiles : EventDriverFiles { s.write.overlapped.driver = m_core; s.write.overlapped.hEvent = handle; - return FileFD(cast(size_t)handle); + return FileFD(cast(size_t)handle, m_core.m_handles[handle].validationCounter); } override void close(FileFD file) { + if (!isValid(file)) return; + auto h = idToHandle(file); - auto slot = () @trusted { return &m_core.m_handles[h].file(); } (); - if (slot.read.overlapped.hEvent != INVALID_HANDLE_VALUE) - slot.read.overlapped.hEvent = slot.write.overlapped.hEvent = INVALID_HANDLE_VALUE; + auto slot = () @trusted { return &m_core.m_handles[h]; } (); + if (slot.validationCounter != file.validationCounter) return; + if (slot.file.read.overlapped.hEvent != INVALID_HANDLE_VALUE) + slot.file.read.overlapped.hEvent = slot.file.write.overlapped.hEvent = INVALID_HANDLE_VALUE; } override ulong getSize(FileFD file) { + if (!isValid(file)) return ulong.max; + LARGE_INTEGER size; auto succeeded = () @trusted { return GetFileSizeEx(idToHandle(file), &size); } (); if (!succeeded || size.QuadPart < 0) @@ -90,6 +95,11 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override void truncate(FileFD file, ulong size, FileIOCallback on_finish) @trusted { + if (!isValid(file)) { + on_finish(file, IOStatus.invalidHandle, 0); + return; + } + auto h = idToHandle(file); // FIXME: do this in a separate thread @@ -110,6 +120,11 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode mode, FileIOCallback on_write_finish) { + if (!isValid(file)) { + on_write_finish(file, IOStatus.invalidHandle, 0); + return; + } + auto h = idToHandle(file); auto slot = &m_core.m_handles[h].file.write; @@ -134,6 +149,11 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode mode, FileIOCallback on_read_finish) { + if (!isValid(file)) { + on_read_finish(file, IOStatus.invalidHandle, 0); + return; + } + auto h = idToHandle(file); auto slot = &m_core.m_handles[h].file.read; @@ -158,23 +178,39 @@ final class WinAPIEventDriverFiles : EventDriverFiles { override void cancelWrite(FileFD file) { + if (!isValid(file)) return; + auto h = idToHandle(file); cancelIO!true(h, m_core.m_handles[h].file.write); } override void cancelRead(FileFD file) { + if (!isValid(file)) return; + auto h = idToHandle(file); cancelIO!false(h, m_core.m_handles[h].file.read); } + override bool isValid(FileFD handle) + const { + auto h = idToHandle(handle); + if (auto ps = h in m_core.m_handles) + return ps.validationCounter == handle.validationCounter; + return false; + } + override void addRef(FileFD descriptor) { + if (!isValid(descriptor)) return; + m_core.m_handles[idToHandle(descriptor)].addRef(); } override bool releaseRef(FileFD descriptor) { + if (!isValid(descriptor)) return true; + auto h = idToHandle(descriptor); auto slot = &m_core.m_handles[h]; return slot.releaseRef({ @@ -222,13 +258,15 @@ final class WinAPIEventDriverFiles : EventDriverFiles { private static nothrow void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED_CORE* overlapped) { - FileFD id = cast(FileFD)cast(size_t)overlapped.hEvent; - auto handle = idToHandle(id); + HANDLE handle = overlapped.hEvent; if (handle == INVALID_HANDLE_VALUE) return; + auto cslot = () @trusted { return &overlapped.driver.m_handles[handle]; } (); + FileFD id = FileFD(cast(size_t)overlapped.hEvent, cslot.validationCounter); + static if (RO) - auto slot = () @trusted { return &overlapped.driver.m_handles[handle].file.write; } (); + auto slot = () @trusted { return &cslot.file.write; } (); else - auto slot = () @trusted { return &overlapped.driver.m_handles[handle].file.read; } (); + auto slot = () @trusted { return &cslot.file.read; } (); assert(slot !is null); if (!slot.callback) { @@ -254,7 +292,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } private static HANDLE idToHandle(FileFD id) - @trusted { + @trusted @nogc { return cast(HANDLE)cast(size_t)id; } } diff --git a/source/eventcore/drivers/winapi/pipes.d b/source/eventcore/drivers/winapi/pipes.d index bf33841..07fc43b 100644 --- a/source/eventcore/drivers/winapi/pipes.d +++ b/source/eventcore/drivers/winapi/pipes.d @@ -7,53 +7,80 @@ import eventcore.internal.win32; final class WinAPIEventDriverPipes : EventDriverPipes { @safe: /*@nogc:*/ nothrow: - override PipeFD adopt(int system_pipe_handle) - { - assert(false, "TODO!"); - } + override PipeFD adopt(int system_pipe_handle) + { + assert(false, "TODO!"); + } - override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) - { - assert(false, "TODO!"); - } + override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) + { + if (!isValid(pipe)) { + on_read_finish(pipe, IOStatus.invalidHandle, 0); + return; + } - override void cancelRead(PipeFD pipe) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) - { - assert(false, "TODO!"); - } + override void cancelRead(PipeFD pipe) + { + if (!isValid(pipe)) return; - override void cancelWrite(PipeFD pipe) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) - { - assert(false, "TODO!"); - } + override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) + { + if (!isValid(pipe)) { + on_write_finish(pipe, IOStatus.invalidHandle, 0); + return; + } - override void close(PipeFD pipe) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - override void addRef(PipeFD pid) - { - assert(false, "TODO!"); - } + override void cancelWrite(PipeFD pipe) + { + if (!isValid(pipe)) return; - override bool releaseRef(PipeFD pid) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - assert(false, "TODO!"); - } + override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) + { + if (!isValid(pipe)) return; + + assert(false, "TODO!"); + } + + override void close(PipeFD pipe) + { + if (!isValid(pipe)) return; + + assert(false, "TODO!"); + } + + override bool isValid(PipeFD handle) + const { + return false; + } + + override void addRef(PipeFD pipe) + { + if (!isValid(pipe)) return; + + assert(false, "TODO!"); + } + + override bool releaseRef(PipeFD pipe) + { + if (!isValid(pipe)) return true; + + assert(false, "TODO!"); + } + + protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } } diff --git a/source/eventcore/drivers/winapi/processes.d b/source/eventcore/drivers/winapi/processes.d index 19816b0..115779d 100644 --- a/source/eventcore/drivers/winapi/processes.d +++ b/source/eventcore/drivers/winapi/processes.d @@ -7,48 +7,65 @@ import eventcore.internal.win32; final class WinAPIEventDriverProcesses : EventDriverProcesses { @safe: /*@nogc:*/ nothrow: - override ProcessID adopt(int system_pid) - { - assert(false, "TODO!"); - } + override ProcessID adopt(int system_pid) + { + assert(false, "TODO!"); + } - override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) - { - assert(false, "TODO!"); - } + override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) + { + assert(false, "TODO!"); + } - override bool hasExited(ProcessID pid) - { - assert(false, "TODO!"); - } + override bool hasExited(ProcessID pid) + { + if (!isValid(pid)) return true; - override void kill(ProcessID pid, int signal) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) - { - assert(false, "TODO!"); - } + override void kill(ProcessID pid, int signal) + { + if (!isValid(pid)) return; - override void cancelWait(ProcessID pid, size_t waitId) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - override void addRef(ProcessID pid) - { - assert(false, "TODO!"); - } + override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) + { + if (!isValid(pid)) return size_t.max; - override bool releaseRef(ProcessID pid) - { - assert(false, "TODO!"); - } + assert(false, "TODO!"); + } - protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) - @system { - assert(false, "TODO!"); - } + override void cancelWait(ProcessID pid, size_t waitId) + { + if (!isValid(pid)) return; + + assert(false, "TODO!"); + } + + override bool isValid(ProcessID handle) + const { + return false; + } + + override void addRef(ProcessID pid) + { + if (!isValid(pid)) return; + + assert(false, "TODO!"); + } + + override bool releaseRef(ProcessID pid) + { + if (!isValid(pid)) return true; + + assert(false, "TODO!"); + } + + protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + assert(false, "TODO!"); + } } diff --git a/source/eventcore/drivers/winapi/signals.d b/source/eventcore/drivers/winapi/signals.d index aa8cc43..0303500 100644 --- a/source/eventcore/drivers/winapi/signals.d +++ b/source/eventcore/drivers/winapi/signals.d @@ -13,13 +13,22 @@ final class WinAPIEventDriverSignals : EventDriverSignals { assert(false, "TODO!"); } + override bool isValid(SignalListenID handle) + const { + return false; + } + override void addRef(SignalListenID descriptor) { + if (!isValid(descriptor)) return; + assert(false, "TODO!"); } override bool releaseRef(SignalListenID descriptor) { + if (!isValid(descriptor)) return true; + assert(false, "TODO!"); } } diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index eff3e83..ec7e272 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -96,7 +96,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { final override void cancelConnectStream(StreamSocketFD sock) { - assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor"); + if (!isValid(sock)) return; with (m_sockets[sock].streamSocket) { assert(state == ConnectionState.connecting, @@ -115,8 +115,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { private StreamSocketFD adoptStreamInternal(SOCKET socket, ConnectionState state) { - auto fd = StreamSocketFD(socket); - if (m_sockets[fd].common.refCount) // FD already in use? + if (m_sockets[socket].common.refCount) // FD already in use? return StreamSocketFD.invalid; // done by wsaasyncselect @@ -132,7 +131,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { overlapped.driver = m_core; } - initSocketSlot(fd); + auto fd = initSocketSlot!StreamSocketFD(socket); with (m_sockets[socket]) { specific = StreamSocketSlot.init; streamSocket.state = state; @@ -178,8 +177,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { if (fd == INVALID_SOCKET) return StreamListenSocketFD.invalid; - auto sock = cast(StreamListenSocketFD)fd; - initSocketSlot(sock); + auto sock = initSocketSlot!StreamListenSocketFD(fd); m_sockets[sock].specific = StreamListenSocketSlot.init; if (on_accept) waitForConnections(sock, on_accept); @@ -189,6 +187,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { + if (!isValid(sock)) return; + assert(!m_sockets[sock].streamListen.acceptCallback); m_sockets[sock].streamListen.acceptCallback = on_accept; () @trusted { WSAAsyncSelect(sock, m_hwnd, WM_USER_SOCKET, FD_ACCEPT); } (); @@ -197,13 +197,15 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override ConnectionState getConnectionState(StreamSocketFD sock) { - assert(sock != StreamSocketFD.invalid, "Invalid socket handle"); + if (!isValid(sock)) return ConnectionState.closed; + return m_sockets[sock].streamSocket.state; } override bool getLocalAddress(SocketFD sock, scope RefAddress dst) { - assert(sock != StreamSocketFD.invalid, "Invalid socket handle"); + if (!isValid(sock)) return false; + socklen_t addr_len = dst.nameLen; if (() @trusted { return getsockname(sock, dst.name, &addr_len); } () != 0) return false; @@ -213,7 +215,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override bool getRemoteAddress(SocketFD sock, scope RefAddress dst) { - assert(sock != StreamSocketFD.invalid, "Invalid socket handle"); + if (!isValid(sock)) return false; + socklen_t addr_len = dst.nameLen; if (() @trusted { return getpeername(sock, dst.name, &addr_len); } () != 0) return false; @@ -223,18 +226,24 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void setTCPNoDelay(StreamSocketFD socket, bool enable) @trusted { + if (!isValid(socket)) return; + BOOL eni = enable; setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &eni, eni.sizeof); } override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted { + if (!isValid(socket)) return; + BOOL eni = enable; setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof); } override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted { + if (!isValid(socket)) return; + tcp_keepalive opts = tcp_keepalive(1, cast(c_ulong) idle.total!"msecs"(), cast(c_ulong) interval.total!"msecs"); int result = WSAIoctl(socket, SIO_KEEPALIVE_VALS, &opts, cast(DWORD) tcp_keepalive.sizeof, @@ -248,6 +257,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { + if (!isValid(socket)) { + on_read_finish(socket, IOStatus.invalidHandle, 0); + return; + } + auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); slot.read.buffer = buffer; slot.read.bytesTransferred = 0; @@ -358,6 +372,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { + if (!isValid(socket)) { + on_write_finish(socket, IOStatus.invalidHandle, 0); + return; + } + auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); slot.write.buffer = buffer; slot.write.bytesTransferred = 0; @@ -454,11 +473,18 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void waitForData(StreamSocketFD socket, IOCallback on_data_available) { + if (!isValid(socket)) { + on_data_available(socket, IOStatus.invalidHandle, 0); + return; + } + assert(false, "TODO!"); } override void shutdown(StreamSocketFD socket, bool shut_read = true, bool shut_write = true) { + if (!isValid(socket)) return; + () @trusted { WSASendDisconnect(socket, null); } (); with (m_sockets[socket].streamSocket) { if (state == ConnectionState.passiveClose) @@ -469,6 +495,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void cancelRead(StreamSocketFD socket) @trusted @nogc { + if (!isValid(socket)) return; if (!m_sockets[socket].streamSocket.read.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped); m_sockets[socket].streamSocket.read.callback = null; @@ -477,6 +504,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void cancelWrite(StreamSocketFD socket) @trusted @nogc { + if (!isValid(socket)) return; if (!m_sockets[socket].streamSocket.write.callback) return; CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped); m_sockets[socket].streamSocket.write.callback = null; @@ -516,8 +544,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { private DatagramSocketFD adoptDatagramSocketInternal(SOCKET socket) { - auto fd = DatagramSocketFD(socket); - if (m_sockets[fd].common.refCount) // FD already in use? + if (m_sockets[socket].common.refCount) // FD already in use? return DatagramSocketFD.invalid; void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow { @@ -529,7 +556,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { overlapped.driver = m_core; } - initSocketSlot(fd); + auto fd = initSocketSlot!DatagramSocketFD(socket); with (m_sockets[socket]) { specific = DatagramSocketSlot.init; setupOverlapped(datagramSocket.write.overlapped); @@ -543,11 +570,15 @@ final class WinAPIEventDriverSockets : EventDriverSockets { final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) { + if (!isValid(socket)) return; + () @trusted { connect(cast(SOCKET)socket, target_address.name, target_address.nameLen); } (); } final override bool setBroadcast(DatagramSocketFD socket, bool enable) { + if (!isValid(socket)) return false; + int tmp_broad = enable; return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; } @@ -556,6 +587,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { { import std.socket : AddressFamily; + if (!isValid(socket)) return false; + switch (multicast_address.addressFamily) { default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); case AddressFamily.INET: @@ -583,6 +616,12 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish) { + if (!isValid(socket)) { + RefAddress addr; + on_read_finish(socket, IOStatus.invalidHandle, 0, addr); + return; + } + auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); slot.read.buffer = buffer; slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } (); @@ -678,6 +717,12 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_write_finish) { + if (!isValid(socket)) { + RefAddress addr; + on_write_finish(socket, IOStatus.invalidHandle, 0, addr); + return; + } + auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); slot.write.buffer = buffer; slot.write.wsabuf[0].len = buffer.length; @@ -780,8 +825,16 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } } + override bool isValid(SocketFD handle) + const { + if (handle.value >= m_sockets.length) return false; + return handle.validationCounter == m_sockets[handle].common.validationCounter; + } + override void addRef(SocketFD fd) { + if (!isValid(fd)) return; + assert(m_sockets[fd].common.refCount > 0, "Adding reference to unreferenced socket FD."); m_sockets[fd].common.refCount++; } @@ -789,6 +842,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override bool releaseRef(SocketFD fd) @nogc { import taggedalgebraic : hasType; + + if (!isValid(fd)) return true; + auto slot = () @trusted { return &m_sockets[fd]; } (); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); if (--slot.common.refCount == 0) { @@ -819,6 +875,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) { + if (!isValid(socket)) return false; + int proto, opt; final switch (option) { case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; @@ -830,6 +888,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) { + if (!isValid(socket)) return false; + int proto, opt; final switch (option) { case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; @@ -854,8 +914,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets { return rawUserDataImpl(descriptor, size, initialize, destroy); } - private void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + private void* rawUserDataImpl(SocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc { + if (!isValid(descriptor)) return null; + SocketSlot* fds = &m_sockets[descriptor].common; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); @@ -868,12 +930,16 @@ final class WinAPIEventDriverSockets : EventDriverSockets { return fds.userData.ptr; } - private void initSocketSlot(SocketFD fd) + private FDType initSocketSlot(FDType)(SOCKET socket) { m_socketCount++; - m_sockets[fd.value].common.refCount = 1; - m_sockets[fd.value].common.fd = fd; - m_sockets[fd.value].common.driver = this; + m_sockets[socket].common.refCount = 1; + auto vc = ++m_sockets[socket].common.validationCounter; + auto fd = FDType(socket, vc); + m_sockets[socket].common.fd = fd; + m_sockets[socket].common.driver = this; + + return fd; } package void clearSocketSlot(FD fd) @@ -905,14 +971,16 @@ final class WinAPIEventDriverSockets : EventDriverSockets { auto cb = slot.streamSocket.connectCallback; if (!cb) break; // cancelled connect? + auto fd = StreamSocketFD(sock, slot.common.validationCounter); + slot.streamSocket.connectCallback = null; slot.common.driver.m_core.removeWaiter(); if (err) { slot.streamSocket.state = ConnectionState.closed; - cb(cast(StreamSocketFD)sock, ConnectStatus.refused); + cb(fd, ConnectStatus.refused); } else { slot.streamSocket.state = ConnectionState.connected; - cb(cast(StreamSocketFD)sock, ConnectStatus.connected); + cb(fd, ConnectStatus.connected); } break; case FD_READ: @@ -948,7 +1016,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { if (clientsockfd == INVALID_SOCKET) return 0; auto clientsock = driver.adoptStreamInternal(clientsockfd, ConnectionState.connected); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); - slot.streamListen.acceptCallback(cast(StreamListenSocketFD)sock, clientsock, addrc); + auto fd = StreamListenSocketFD(sock, slot.common.validationCounter); + slot.streamListen.acceptCallback(fd, clientsock, addrc); } break; case Kind.datagramSocket: @@ -978,6 +1047,7 @@ static struct SocketSlot { WinAPIEventDriverSockets driver; // redundant, but needed by the current IO Completion Routines based approach @property inout(WinAPIEventDriverCore) core() @safe nothrow inout { return driver.m_core; } int refCount; + uint validationCounter; DataInitializer userDataDestructor; ubyte[16*size_t.sizeof] userData; } diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index 537d6f5..679d15f 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -37,8 +37,6 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { if (handle == INVALID_HANDLE_VALUE) return WatcherID.invalid; - auto id = WatcherID(cast(size_t)handle); - auto slot = m_core.setupSlot!WatcherSlot(handle); slot.directory = path; slot.recursive = recursive; @@ -48,6 +46,9 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { try return Mallocator.instance.makeArray!ubyte(16384); catch (Exception e) assert(false, "Failed to allocate directory watcher buffer."); } (); + + auto id = WatcherID(cast(size_t)handle, m_core.m_handles[handle].validationCounter); + if (!triggerRead(handle, *slot)) { releaseRef(id); return WatcherID.invalid; @@ -61,18 +62,31 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { return id; } + override bool isValid(WatcherID handle) + const { + if (auto ph = idToHandle(handle) in m_core.m_handles) + return ph.validationCounter == handle.validationCounter; + return false; + } + override void addRef(WatcherID descriptor) { + if (!isValid(descriptor)) return; + m_core.m_handles[idToHandle(descriptor)].addRef(); } override bool releaseRef(WatcherID descriptor) { + if (!isValid(descriptor)) return true; + return doReleaseRef(idToHandle(descriptor)); } protected override void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { + if (!isValid(descriptor)) return null; + return m_core.rawUserDataImpl(idToHandle(descriptor), size, initialize, destroy); } @@ -117,10 +131,9 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { import std.path : dirName, baseName, buildPath; auto handle = overlapped.hEvent; // *file* handle - auto id = WatcherID(cast(size_t)handle); - auto gslot = () @trusted { return &WinAPIEventDriver.threadInstance.core.m_handles[handle]; } (); auto slot = () @trusted { return &gslot.watcher(); } (); + auto id = WatcherID(cast(size_t)handle, gslot.validationCounter); if (dwError != 0 || gslot.refCount == 1) { // FIXME: error must be propagated to the caller (except for ABORTED @@ -220,5 +233,5 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { return true; } - static private HANDLE idToHandle(WatcherID id) @trusted { return cast(HANDLE)cast(size_t)id; } + static private HANDLE idToHandle(WatcherID id) @trusted @nogc { return cast(HANDLE)cast(size_t)id.value; } } diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d index 1756751..0973339 100644 --- a/source/eventcore/internal/utils.d +++ b/source/eventcore/internal/utils.d @@ -172,6 +172,18 @@ struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { return (*m_chunks[chunk])[subidx]; } + ref const(T) opIndex(size_t index) + const @nogc { + static immutable T emptySlot; + + auto chunk = index / chunkSize; + auto subidx = index % chunkSize; + if (index >= m_length) return emptySlot; + auto c = m_chunks[chunk]; + if (!c) return emptySlot; + return (*c)[subidx]; + } + int opApply(scope int delegate(size_t idx, ref T) @safe nothrow del) { size_t idx = 0;