Implement EventDriverCore.runInOwnerThread.

This commit is contained in:
Sönke Ludwig 2018-03-16 11:42:02 +01:00
parent ff4d65a131
commit 16e2d9587d
4 changed files with 153 additions and 30 deletions

View file

@ -20,6 +20,7 @@ module eventcore.driver;
import core.time : Duration; import core.time : Duration;
import std.socket : Address; import std.socket : Address;
import std.stdint : intptr_t;
/** Encapsulates a full event driver. /** Encapsulates a full event driver.
@ -31,23 +32,25 @@ import std.socket : Address;
interface EventDriver { interface EventDriver {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
/// Core event loop functionality /// Core event loop functionality
@property EventDriverCore core(); @property inout(EventDriverCore) core() inout;
/// Core event loop functionality
@property shared(inout(EventDriverCore)) core() shared inout;
/// Single shot and recurring timers /// Single shot and recurring timers
@property EventDriverTimers timers(); @property inout(EventDriverTimers) timers() inout;
/// Cross-thread events (thread local access) /// Cross-thread events (thread local access)
@property EventDriverEvents events(); @property inout(EventDriverEvents) events() inout;
/// Cross-thread events (cross-thread access) /// Cross-thread events (cross-thread access)
@property shared(EventDriverEvents) events() shared; @property shared(inout(EventDriverEvents)) events() shared inout;
/// UNIX/POSIX signal reception /// UNIX/POSIX signal reception
@property EventDriverSignals signals(); @property inout(EventDriverSignals) signals() inout;
/// Stream and datagram sockets /// Stream and datagram sockets
@property EventDriverSockets sockets(); @property inout(EventDriverSockets) sockets() inout;
/// DNS queries /// DNS queries
@property EventDriverDNS dns(); @property inout(EventDriverDNS) dns() inout;
/// Local file operations /// Local file operations
@property EventDriverFiles files(); @property inout(EventDriverFiles) files() inout;
/// Directory change watching /// Directory change watching
@property EventDriverWatchers watchers(); @property inout(EventDriverWatchers) watchers() inout;
/// Releases all resources associated with the driver /// Releases all resources associated with the driver
void dispose(); void dispose();
@ -98,6 +101,10 @@ interface EventDriverCore {
*/ */
void clearExitFlag(); void clearExitFlag();
/** Executes a callback in the thread owning the driver.
*/
void runInOwnerThread(ThreadCallback del, intptr_t param) shared;
/// Low-level user data access. Use `getUserData` instead. /// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
/// ditto /// ditto
@ -751,6 +758,8 @@ struct Handle(string NAME, T, T invalid_value = T.init) {
alias value this; alias value this;
} }
alias ThreadCallback = void function(intptr_t param) @safe nothrow;
alias FD = Handle!("fd", size_t, size_t.max); alias FD = Handle!("fd", size_t, size_t.max);
alias SocketFD = Handle!("socket", FD); alias SocketFD = Handle!("socket", FD);
alias StreamSocketFD = Handle!("streamSocket", SocketFD); alias StreamSocketFD = Handle!("streamSocket", SocketFD);

View file

@ -77,15 +77,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls
final override @property CoreDriver core() { return m_core; } final override @property inout(CoreDriver) core() inout { return m_core; }
final override @property EventsDriver events() { return m_events; } final override @property shared(inout(CoreDriver)) core() shared inout { return m_core; }
final override @property shared(EventsDriver) events() shared { return m_events; } final override @property inout(EventsDriver) events() inout { return m_events; }
final override @property SignalsDriver signals() { return m_signals; } final override @property shared(inout(EventsDriver)) events() shared inout { return m_events; }
final override @property TimerDriver timers() { return m_timers; } final override @property inout(SignalsDriver) signals() inout { return m_signals; }
final override @property SocketsDriver sockets() { return m_sockets; } final override @property inout(TimerDriver) timers() inout { return m_timers; }
final override @property DNSDriver dns() { return m_dns; } final override @property inout(SocketsDriver) sockets() inout { return m_sockets; }
final override @property FileDriver files() { return m_files; } final override @property inout(DNSDriver) dns() inout { return m_dns; }
final override @property WatcherDriver watchers() { return m_watchers; } final override @property inout(FileDriver) files() inout { return m_files; }
final override @property inout(WatcherDriver) watchers() inout { return m_watchers; }
final override void dispose() final override void dispose()
{ {
@ -100,8 +101,12 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore { final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore {
@safe: nothrow: @safe nothrow:
import core.atomic : atomicLoad, atomicStore;
import core.sync.mutex : Mutex;
import core.time : Duration; import core.time : Duration;
import std.stdint : intptr_t;
import std.typecons : Tuple, tuple;
protected alias ExtraEventsCallback = bool delegate(long); protected alias ExtraEventsCallback = bool delegate(long);
@ -111,6 +116,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Events m_events; Events m_events;
bool m_exit = false; bool m_exit = false;
EventID m_wakeupEvent; EventID m_wakeupEvent;
shared Mutex m_threadCallbackMutex;
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
} }
protected this(Loop loop, Timers timers, Events events) protected this(Loop loop, Timers timers, Events events)
@ -119,12 +127,23 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_timers = timers; m_timers = timers;
m_events = events; m_events = events;
m_wakeupEvent = events.createInternal(); m_wakeupEvent = events.createInternal();
static if (__VERSION__ >= 2074)
m_threadCallbackMutex = new shared Mutex;
else {
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } ();
}
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t));
m_threadCallbacks.reserve(1000);
} }
protected final void dispose() protected final void dispose()
{ {
executeThreadCallbacks();
m_events.releaseRef(m_wakeupEvent); m_events.releaseRef(m_wakeupEvent);
m_wakeupEvent = EventID.invalid; atomicStore(m_threadCallbackMutex, null);
m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized!
} }
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
@ -133,6 +152,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
{ {
import core.time : hnsecs, seconds; import core.time : hnsecs, seconds;
executeThreadCallbacks();
if (m_exit) { if (m_exit) {
m_exit = false; m_exit = false;
return ExitReason.exited; return ExitReason.exited;
@ -160,6 +181,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} while (timeout > 0.seconds && !m_exit && !got_events); } while (timeout > 0.seconds && !m_exit && !got_events);
} }
executeThreadCallbacks();
if (m_exit) { if (m_exit) {
m_exit = false; m_exit = false;
return ExitReason.exited; return ExitReason.exited;
@ -173,7 +196,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
final override void exit() final override void exit()
{ {
m_exit = true; m_exit = true; // FIXME: this needs to be synchronized!
() @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } (); () @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } ();
} }
@ -182,6 +205,26 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_exit = false; m_exit = false;
} }
final override void runInOwnerThread(ThreadCallback del, intptr_t param)
shared {
auto m = atomicLoad(m_threadCallbackMutex);
auto evt = atomicLoad(m_wakeupEvent);
// NOTE: This case must be handled gracefully to avoid hazardous
// race-conditions upon unexpected thread termination. The mutex
// and the map will stay valid even after the driver has been
// disposed, so no further synchronization is required.
if (!m) return;
try {
synchronized (m)
() @trusted { return (cast()this).m_threadCallbacks; } ()
.put(tuple(del, param));
} catch (Exception e) assert(false, e.msg);
m_events.trigger(m_wakeupEvent, false);
}
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
return rawUserDataImpl(descriptor, size, initialize, destroy); return rawUserDataImpl(descriptor, size, initialize, destroy);
@ -196,6 +239,22 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
@system { @system {
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
} }
private void executeThreadCallbacks()
{
import std.stdint : intptr_t;
while (true) {
Tuple!(ThreadCallback, intptr_t) del;
try {
synchronized (m_threadCallbackMutex) {
if (m_threadCallbacks.empty) break;
del = m_threadCallbacks.consumeOne;
}
} catch (Exception e) assert(false, e.msg);
del[0](del[1]);
}
}
} }

View file

@ -7,8 +7,11 @@ import eventcore.drivers.timer;
import eventcore.internal.consumablequeue; import eventcore.internal.consumablequeue;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : nogc_assert;
import eventcore.internal.win32; import eventcore.internal.win32;
import core.sync.mutex : Mutex;
import core.time : Duration; import core.time : Duration;
import taggedalgebraic; import taggedalgebraic;
import std.stdint : intptr_t;
import std.typecons : Tuple, tuple;
final class WinAPIEventDriverCore : EventDriverCore { final class WinAPIEventDriverCore : EventDriverCore {
@ -22,6 +25,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
void delegate() @safe nothrow[HANDLE] m_eventCallbacks; void delegate() @safe nothrow[HANDLE] m_eventCallbacks;
HANDLE m_fileCompletionEvent; HANDLE m_fileCompletionEvent;
ConsumableQueue!IOEvent m_ioEvents; ConsumableQueue!IOEvent m_ioEvents;
shared Mutex m_threadCallbackMutex;
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
} }
package { package {
@ -35,6 +41,14 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
registerEvent(m_fileCompletionEvent); registerEvent(m_fileCompletionEvent);
m_ioEvents = new ConsumableQueue!IOEvent; m_ioEvents = new ConsumableQueue!IOEvent;
static if (__VERSION__ >= 2074)
m_threadCallbackMutex = new shared Mutex;
else {
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } ();
}
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t));
m_threadCallbacks.reserve(1000);
} }
override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; } override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; }
@ -98,6 +112,26 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_exit = false; m_exit = false;
} }
override void runInOwnerThread(ThreadCallback del, intptr_t param)
shared {
import core.atomic : atomicLoad;
auto m = atomicLoad(m_threadCallbackMutex);
// NOTE: This case must be handled gracefully to avoid hazardous
// race-conditions upon unexpected thread termination. The mutex
// and the map will stay valid even after the driver has been
// disposed, so no further synchronization is required.
if (!m) return;
try {
synchronized (m)
() @trusted { return (cast()this).m_threadCallbacks; } ()
.put(tuple(del, param));
} catch (Exception e) assert(false, e.msg);
() @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } ();
}
package void* rawUserDataImpl(HANDLE handle, size_t size, DataInitializer initialize, DataInitializer destroy) package void* rawUserDataImpl(HANDLE handle, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
HandleSlot* fds = &m_handles[handle]; HandleSlot* fds = &m_handles[handle];
@ -127,6 +161,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
import core.time : seconds; import core.time : seconds;
import std.algorithm.comparison : min; import std.algorithm.comparison : min;
executeThreadCallbacks();
bool got_event; bool got_event;
if (max_wait > 0.seconds) { if (max_wait > 0.seconds) {
@ -173,6 +209,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
//if (++cnt % 10 == 0) processTimers(); //if (++cnt % 10 == 0) processTimers();
} }
executeThreadCallbacks();
return got_event; return got_event;
} }
@ -204,6 +242,22 @@ final class WinAPIEventDriverCore : EventDriverCore {
import std.algorithm.searching : canFind; import std.algorithm.searching : canFind;
m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped)); m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped));
} }
private void executeThreadCallbacks()
{
import std.stdint : intptr_t;
while (true) {
Tuple!(ThreadCallback, intptr_t) del;
try {
synchronized (m_threadCallbackMutex) {
if (m_threadCallbacks.empty) break;
del = m_threadCallbacks.consumeOne;
}
} catch (Exception e) assert(false, e.msg);
del[0](del[1]);
}
}
} }
private long currStdTime() private long currStdTime()

View file

@ -60,15 +60,16 @@ final class WinAPIEventDriver : EventDriver {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
override @property WinAPIEventDriverCore core() { return m_core; } override @property inout(WinAPIEventDriverCore) core() inout { return m_core; }
override @property WinAPIEventDriverFiles files() { return m_files; } override @property shared(inout(WinAPIEventDriverCore)) core() inout shared { return m_core; }
override @property WinAPIEventDriverSockets sockets() { return m_sockets; } override @property inout(WinAPIEventDriverFiles) files() inout { return m_files; }
override @property WinAPIEventDriverDNS dns() { return m_dns; } override @property inout(WinAPIEventDriverSockets) sockets() inout { return m_sockets; }
override @property LoopTimeoutTimerDriver timers() { return m_timers; } override @property inout(WinAPIEventDriverDNS) dns() inout { return m_dns; }
override @property WinAPIEventDriverEvents events() { return m_events; } override @property inout(LoopTimeoutTimerDriver) timers() inout { return m_timers; }
override @property shared(WinAPIEventDriverEvents) events() shared { return m_events; } override @property inout(WinAPIEventDriverEvents) events() inout { return m_events; }
override @property WinAPIEventDriverSignals signals() { return m_signals; } override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; }
override @property WinAPIEventDriverWatchers watchers() { return m_watchers; } override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; }
override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; }
override void dispose() override void dispose()
{ {