From 16e2d9587d5848c9559bbe90f86840c06caad90d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 16 Mar 2018 11:42:02 +0100 Subject: [PATCH 1/3] Implement EventDriverCore.runInOwnerThread. --- source/eventcore/driver.d | 27 +++++--- source/eventcore/drivers/posix/driver.d | 83 ++++++++++++++++++++---- source/eventcore/drivers/winapi/core.d | 54 +++++++++++++++ source/eventcore/drivers/winapi/driver.d | 19 +++--- 4 files changed, 153 insertions(+), 30 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 8d83f35..6323890 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -20,6 +20,7 @@ module eventcore.driver; import core.time : Duration; import std.socket : Address; +import std.stdint : intptr_t; /** Encapsulates a full event driver. @@ -31,23 +32,25 @@ import std.socket : Address; interface EventDriver { @safe: /*@nogc:*/ nothrow: /// Core event loop functionality - @property EventDriverCore core(); + @property inout(EventDriverCore) core() inout; + /// Core event loop functionality + @property shared(inout(EventDriverCore)) core() shared inout; /// Single shot and recurring timers - @property EventDriverTimers timers(); + @property inout(EventDriverTimers) timers() inout; /// Cross-thread events (thread local access) - @property EventDriverEvents events(); + @property inout(EventDriverEvents) events() inout; /// Cross-thread events (cross-thread access) - @property shared(EventDriverEvents) events() shared; + @property shared(inout(EventDriverEvents)) events() shared inout; /// UNIX/POSIX signal reception - @property EventDriverSignals signals(); + @property inout(EventDriverSignals) signals() inout; /// Stream and datagram sockets - @property EventDriverSockets sockets(); + @property inout(EventDriverSockets) sockets() inout; /// DNS queries - @property EventDriverDNS dns(); + @property inout(EventDriverDNS) dns() inout; /// Local file operations - @property EventDriverFiles files(); + @property inout(EventDriverFiles) files() inout; /// Directory change watching - @property EventDriverWatchers watchers(); + @property inout(EventDriverWatchers) watchers() inout; /// Releases all resources associated with the driver void dispose(); @@ -98,6 +101,10 @@ interface EventDriverCore { */ void clearExitFlag(); + /** Executes a callback in the thread owning the driver. + */ + void runInOwnerThread(ThreadCallback del, intptr_t param) shared; + /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; /// ditto @@ -751,6 +758,8 @@ struct Handle(string NAME, T, T invalid_value = T.init) { alias value this; } +alias ThreadCallback = void function(intptr_t param) @safe nothrow; + alias FD = Handle!("fd", size_t, size_t.max); alias SocketFD = Handle!("socket", FD); alias StreamSocketFD = Handle!("streamSocket", SocketFD); diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 5f84e8a..552fd1c 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -77,15 +77,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { } // force overriding these in the (final) sub classes to avoid virtual calls - final override @property CoreDriver core() { return m_core; } - final override @property EventsDriver events() { return m_events; } - final override @property shared(EventsDriver) events() shared { return m_events; } - final override @property SignalsDriver signals() { return m_signals; } - final override @property TimerDriver timers() { return m_timers; } - final override @property SocketsDriver sockets() { return m_sockets; } - final override @property DNSDriver dns() { return m_dns; } - final override @property FileDriver files() { return m_files; } - final override @property WatcherDriver watchers() { return m_watchers; } + final override @property inout(CoreDriver) core() inout { return m_core; } + final override @property shared(inout(CoreDriver)) core() shared inout { return m_core; } + final override @property inout(EventsDriver) events() inout { return m_events; } + final override @property shared(inout(EventsDriver)) events() shared inout { return m_events; } + final override @property inout(SignalsDriver) signals() inout { return m_signals; } + final override @property inout(TimerDriver) timers() inout { return m_timers; } + final override @property inout(SocketsDriver) sockets() inout { return m_sockets; } + final override @property inout(DNSDriver) dns() inout { return m_dns; } + final override @property inout(FileDriver) files() inout { return m_files; } + final override @property inout(WatcherDriver) watchers() inout { return m_watchers; } final override void dispose() { @@ -100,8 +101,12 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore { -@safe: nothrow: +@safe nothrow: + import core.atomic : atomicLoad, atomicStore; + import core.sync.mutex : Mutex; import core.time : Duration; + import std.stdint : intptr_t; + import std.typecons : Tuple, tuple; protected alias ExtraEventsCallback = bool delegate(long); @@ -111,6 +116,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime Events m_events; bool m_exit = false; EventID m_wakeupEvent; + + shared Mutex m_threadCallbackMutex; + ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; } protected this(Loop loop, Timers timers, Events events) @@ -119,12 +127,23 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_timers = timers; m_events = events; m_wakeupEvent = events.createInternal(); + + static if (__VERSION__ >= 2074) + m_threadCallbackMutex = new shared Mutex; + else { + () @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); + } + + m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); + m_threadCallbacks.reserve(1000); } protected final void dispose() { + executeThreadCallbacks(); m_events.releaseRef(m_wakeupEvent); - m_wakeupEvent = EventID.invalid; + atomicStore(m_threadCallbackMutex, null); + m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized! } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @@ -133,6 +152,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime { import core.time : hnsecs, seconds; + executeThreadCallbacks(); + if (m_exit) { m_exit = false; return ExitReason.exited; @@ -160,6 +181,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime } while (timeout > 0.seconds && !m_exit && !got_events); } + executeThreadCallbacks(); + if (m_exit) { m_exit = false; return ExitReason.exited; @@ -173,7 +196,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime final override void exit() { - m_exit = true; + m_exit = true; // FIXME: this needs to be synchronized! () @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } (); } @@ -182,6 +205,26 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_exit = false; } + final override void runInOwnerThread(ThreadCallback del, intptr_t param) + shared { + auto m = atomicLoad(m_threadCallbackMutex); + auto evt = atomicLoad(m_wakeupEvent); + // NOTE: This case must be handled gracefully to avoid hazardous + // race-conditions upon unexpected thread termination. The mutex + // and the map will stay valid even after the driver has been + // disposed, so no further synchronization is required. + if (!m) return; + + try { + synchronized (m) + () @trusted { return (cast()this).m_threadCallbacks; } () + .put(tuple(del, param)); + } catch (Exception e) assert(false, e.msg); + + m_events.trigger(m_wakeupEvent, false); + } + + final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { return rawUserDataImpl(descriptor, size, initialize, destroy); @@ -196,6 +239,22 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime @system { return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); } + + private void executeThreadCallbacks() + { + import std.stdint : intptr_t; + + while (true) { + Tuple!(ThreadCallback, intptr_t) del; + try { + synchronized (m_threadCallbackMutex) { + if (m_threadCallbacks.empty) break; + del = m_threadCallbacks.consumeOne; + } + } catch (Exception e) assert(false, e.msg); + del[0](del[1]); + } + } } diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index dea8a88..321130f 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -7,8 +7,11 @@ import eventcore.drivers.timer; import eventcore.internal.consumablequeue; import eventcore.internal.utils : nogc_assert; import eventcore.internal.win32; +import core.sync.mutex : Mutex; import core.time : Duration; import taggedalgebraic; +import std.stdint : intptr_t; +import std.typecons : Tuple, tuple; final class WinAPIEventDriverCore : EventDriverCore { @@ -22,6 +25,9 @@ final class WinAPIEventDriverCore : EventDriverCore { void delegate() @safe nothrow[HANDLE] m_eventCallbacks; HANDLE m_fileCompletionEvent; ConsumableQueue!IOEvent m_ioEvents; + + shared Mutex m_threadCallbackMutex; + ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; } package { @@ -35,6 +41,14 @@ final class WinAPIEventDriverCore : EventDriverCore { m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); registerEvent(m_fileCompletionEvent); m_ioEvents = new ConsumableQueue!IOEvent; + + static if (__VERSION__ >= 2074) + m_threadCallbackMutex = new shared Mutex; + else { + () @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } (); + } + m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)); + m_threadCallbacks.reserve(1000); } override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; } @@ -98,6 +112,26 @@ final class WinAPIEventDriverCore : EventDriverCore { m_exit = false; } + override void runInOwnerThread(ThreadCallback del, intptr_t param) + shared { + import core.atomic : atomicLoad; + + auto m = atomicLoad(m_threadCallbackMutex); + // NOTE: This case must be handled gracefully to avoid hazardous + // race-conditions upon unexpected thread termination. The mutex + // and the map will stay valid even after the driver has been + // disposed, so no further synchronization is required. + if (!m) return; + + try { + synchronized (m) + () @trusted { return (cast()this).m_threadCallbacks; } () + .put(tuple(del, param)); + } catch (Exception e) assert(false, e.msg); + + () @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } (); + } + package void* rawUserDataImpl(HANDLE handle, size_t size, DataInitializer initialize, DataInitializer destroy) @system { HandleSlot* fds = &m_handles[handle]; @@ -127,6 +161,8 @@ final class WinAPIEventDriverCore : EventDriverCore { import core.time : seconds; import std.algorithm.comparison : min; + executeThreadCallbacks(); + bool got_event; if (max_wait > 0.seconds) { @@ -173,6 +209,8 @@ final class WinAPIEventDriverCore : EventDriverCore { //if (++cnt % 10 == 0) processTimers(); } + executeThreadCallbacks(); + return got_event; } @@ -204,6 +242,22 @@ final class WinAPIEventDriverCore : EventDriverCore { import std.algorithm.searching : canFind; m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); } + + private void executeThreadCallbacks() + { + import std.stdint : intptr_t; + + while (true) { + Tuple!(ThreadCallback, intptr_t) del; + try { + synchronized (m_threadCallbackMutex) { + if (m_threadCallbacks.empty) break; + del = m_threadCallbacks.consumeOne; + } + } catch (Exception e) assert(false, e.msg); + del[0](del[1]); + } + } } private long currStdTime() diff --git a/source/eventcore/drivers/winapi/driver.d b/source/eventcore/drivers/winapi/driver.d index 06ba834..7ef3281 100644 --- a/source/eventcore/drivers/winapi/driver.d +++ b/source/eventcore/drivers/winapi/driver.d @@ -60,15 +60,16 @@ final class WinAPIEventDriver : EventDriver { @safe: /*@nogc:*/ nothrow: - override @property WinAPIEventDriverCore core() { return m_core; } - override @property WinAPIEventDriverFiles files() { return m_files; } - override @property WinAPIEventDriverSockets sockets() { return m_sockets; } - override @property WinAPIEventDriverDNS dns() { return m_dns; } - override @property LoopTimeoutTimerDriver timers() { return m_timers; } - override @property WinAPIEventDriverEvents events() { return m_events; } - override @property shared(WinAPIEventDriverEvents) events() shared { return m_events; } - override @property WinAPIEventDriverSignals signals() { return m_signals; } - override @property WinAPIEventDriverWatchers watchers() { return m_watchers; } + override @property inout(WinAPIEventDriverCore) core() inout { return m_core; } + override @property shared(inout(WinAPIEventDriverCore)) core() inout shared { return m_core; } + override @property inout(WinAPIEventDriverFiles) files() inout { return m_files; } + override @property inout(WinAPIEventDriverSockets) sockets() inout { return m_sockets; } + override @property inout(WinAPIEventDriverDNS) dns() inout { return m_dns; } + override @property inout(LoopTimeoutTimerDriver) timers() inout { return m_timers; } + override @property inout(WinAPIEventDriverEvents) events() inout { return m_events; } + override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; } + override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; } + override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; } override void dispose() { From 910451557b3c62f37f2790b9c815d64754e9aef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 16 Mar 2018 13:34:27 +0100 Subject: [PATCH 2/3] Add test for runInOwnerThread. --- tests/0-runinownerthread.d | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 tests/0-runinownerthread.d diff --git a/tests/0-runinownerthread.d b/tests/0-runinownerthread.d new file mode 100644 index 0000000..bb9208e --- /dev/null +++ b/tests/0-runinownerthread.d @@ -0,0 +1,37 @@ +/++ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ +module test; + +import eventcore.core; +import core.thread; +import std.stdint; + +intptr_t s_id; // thread-local + +void main() +{ + auto ed = cast(shared)eventDriver; + + auto thr = new Thread({ threadFunc(ed); }); + thr.start(); + + // keep the event loop running for one second + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, 1.seconds, 0.seconds); + + ExitReason er; + do er = eventDriver.core.processEvents(Duration.max); + while (er == ExitReason.idle); + assert(er == ExitReason.outOfWaiters); + + assert(s_id == 42); +} + +void threadFunc(shared(NativeEventDriver) drv) +{ + drv.core.runInOwnerThread((id) { + s_id = id; + }, 42); +} From 1dfed63ad972bd52723ccc2b366e20a74f5c3889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 16 Mar 2018 15:24:57 +0100 Subject: [PATCH 3/3] Drop left-over DMD 2.071 test run. --- appveyor.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 1963602..bd81b4c 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -41,10 +41,6 @@ environment: DVersion: 2.072.2 arch: x64 config: winapi - - DC: dmd - DVersion: 2.071.1 - arch: x86 - config: winapi-optlink - DC: ldc DVersion: 1.7.0 arch: x64