Merge pull request #66 from vibe-d/run_in_owner_thread
Implement runInOwnerThread
This commit is contained in:
commit
d2cac0cca1
|
@ -41,10 +41,6 @@ environment:
|
|||
DVersion: 2.072.2
|
||||
arch: x64
|
||||
config: winapi
|
||||
- DC: dmd
|
||||
DVersion: 2.071.1
|
||||
arch: x86
|
||||
config: winapi-optlink
|
||||
- DC: ldc
|
||||
DVersion: 1.7.0
|
||||
arch: x64
|
||||
|
|
|
@ -20,6 +20,7 @@ module eventcore.driver;
|
|||
|
||||
import core.time : Duration;
|
||||
import std.socket : Address;
|
||||
import std.stdint : intptr_t;
|
||||
|
||||
|
||||
/** Encapsulates a full event driver.
|
||||
|
@ -31,23 +32,25 @@ import std.socket : Address;
|
|||
interface EventDriver {
|
||||
@safe: /*@nogc:*/ nothrow:
|
||||
/// Core event loop functionality
|
||||
@property EventDriverCore core();
|
||||
@property inout(EventDriverCore) core() inout;
|
||||
/// Core event loop functionality
|
||||
@property shared(inout(EventDriverCore)) core() shared inout;
|
||||
/// Single shot and recurring timers
|
||||
@property EventDriverTimers timers();
|
||||
@property inout(EventDriverTimers) timers() inout;
|
||||
/// Cross-thread events (thread local access)
|
||||
@property EventDriverEvents events();
|
||||
@property inout(EventDriverEvents) events() inout;
|
||||
/// Cross-thread events (cross-thread access)
|
||||
@property shared(EventDriverEvents) events() shared;
|
||||
@property shared(inout(EventDriverEvents)) events() shared inout;
|
||||
/// UNIX/POSIX signal reception
|
||||
@property EventDriverSignals signals();
|
||||
@property inout(EventDriverSignals) signals() inout;
|
||||
/// Stream and datagram sockets
|
||||
@property EventDriverSockets sockets();
|
||||
@property inout(EventDriverSockets) sockets() inout;
|
||||
/// DNS queries
|
||||
@property EventDriverDNS dns();
|
||||
@property inout(EventDriverDNS) dns() inout;
|
||||
/// Local file operations
|
||||
@property EventDriverFiles files();
|
||||
@property inout(EventDriverFiles) files() inout;
|
||||
/// Directory change watching
|
||||
@property EventDriverWatchers watchers();
|
||||
@property inout(EventDriverWatchers) watchers() inout;
|
||||
|
||||
/// Releases all resources associated with the driver
|
||||
void dispose();
|
||||
|
@ -98,6 +101,10 @@ interface EventDriverCore {
|
|||
*/
|
||||
void clearExitFlag();
|
||||
|
||||
/** Executes a callback in the thread owning the driver.
|
||||
*/
|
||||
void runInOwnerThread(ThreadCallback del, intptr_t param) shared;
|
||||
|
||||
/// Low-level user data access. Use `getUserData` instead.
|
||||
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
|
||||
/// ditto
|
||||
|
@ -751,6 +758,8 @@ struct Handle(string NAME, T, T invalid_value = T.init) {
|
|||
alias value this;
|
||||
}
|
||||
|
||||
alias ThreadCallback = void function(intptr_t param) @safe nothrow;
|
||||
|
||||
alias FD = Handle!("fd", size_t, size_t.max);
|
||||
alias SocketFD = Handle!("socket", FD);
|
||||
alias StreamSocketFD = Handle!("streamSocket", SocketFD);
|
||||
|
|
|
@ -77,15 +77,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
|
|||
}
|
||||
|
||||
// force overriding these in the (final) sub classes to avoid virtual calls
|
||||
final override @property CoreDriver core() { return m_core; }
|
||||
final override @property EventsDriver events() { return m_events; }
|
||||
final override @property shared(EventsDriver) events() shared { return m_events; }
|
||||
final override @property SignalsDriver signals() { return m_signals; }
|
||||
final override @property TimerDriver timers() { return m_timers; }
|
||||
final override @property SocketsDriver sockets() { return m_sockets; }
|
||||
final override @property DNSDriver dns() { return m_dns; }
|
||||
final override @property FileDriver files() { return m_files; }
|
||||
final override @property WatcherDriver watchers() { return m_watchers; }
|
||||
final override @property inout(CoreDriver) core() inout { return m_core; }
|
||||
final override @property shared(inout(CoreDriver)) core() shared inout { return m_core; }
|
||||
final override @property inout(EventsDriver) events() inout { return m_events; }
|
||||
final override @property shared(inout(EventsDriver)) events() shared inout { return m_events; }
|
||||
final override @property inout(SignalsDriver) signals() inout { return m_signals; }
|
||||
final override @property inout(TimerDriver) timers() inout { return m_timers; }
|
||||
final override @property inout(SocketsDriver) sockets() inout { return m_sockets; }
|
||||
final override @property inout(DNSDriver) dns() inout { return m_dns; }
|
||||
final override @property inout(FileDriver) files() inout { return m_files; }
|
||||
final override @property inout(WatcherDriver) watchers() inout { return m_watchers; }
|
||||
|
||||
final override void dispose()
|
||||
{
|
||||
|
@ -100,8 +101,12 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
|
|||
|
||||
|
||||
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore {
|
||||
@safe: nothrow:
|
||||
@safe nothrow:
|
||||
import core.atomic : atomicLoad, atomicStore;
|
||||
import core.sync.mutex : Mutex;
|
||||
import core.time : Duration;
|
||||
import std.stdint : intptr_t;
|
||||
import std.typecons : Tuple, tuple;
|
||||
|
||||
protected alias ExtraEventsCallback = bool delegate(long);
|
||||
|
||||
|
@ -111,6 +116,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
Events m_events;
|
||||
bool m_exit = false;
|
||||
EventID m_wakeupEvent;
|
||||
|
||||
shared Mutex m_threadCallbackMutex;
|
||||
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
|
||||
}
|
||||
|
||||
protected this(Loop loop, Timers timers, Events events)
|
||||
|
@ -119,12 +127,23 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
m_timers = timers;
|
||||
m_events = events;
|
||||
m_wakeupEvent = events.createInternal();
|
||||
|
||||
static if (__VERSION__ >= 2074)
|
||||
m_threadCallbackMutex = new shared Mutex;
|
||||
else {
|
||||
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } ();
|
||||
}
|
||||
|
||||
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t));
|
||||
m_threadCallbacks.reserve(1000);
|
||||
}
|
||||
|
||||
protected final void dispose()
|
||||
{
|
||||
executeThreadCallbacks();
|
||||
m_events.releaseRef(m_wakeupEvent);
|
||||
m_wakeupEvent = EventID.invalid;
|
||||
atomicStore(m_threadCallbackMutex, null);
|
||||
m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized!
|
||||
}
|
||||
|
||||
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
|
||||
|
@ -133,6 +152,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
{
|
||||
import core.time : hnsecs, seconds;
|
||||
|
||||
executeThreadCallbacks();
|
||||
|
||||
if (m_exit) {
|
||||
m_exit = false;
|
||||
return ExitReason.exited;
|
||||
|
@ -160,6 +181,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
} while (timeout > 0.seconds && !m_exit && !got_events);
|
||||
}
|
||||
|
||||
executeThreadCallbacks();
|
||||
|
||||
if (m_exit) {
|
||||
m_exit = false;
|
||||
return ExitReason.exited;
|
||||
|
@ -173,7 +196,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
|
||||
final override void exit()
|
||||
{
|
||||
m_exit = true;
|
||||
m_exit = true; // FIXME: this needs to be synchronized!
|
||||
() @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } ();
|
||||
}
|
||||
|
||||
|
@ -182,6 +205,26 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
m_exit = false;
|
||||
}
|
||||
|
||||
final override void runInOwnerThread(ThreadCallback del, intptr_t param)
|
||||
shared {
|
||||
auto m = atomicLoad(m_threadCallbackMutex);
|
||||
auto evt = atomicLoad(m_wakeupEvent);
|
||||
// NOTE: This case must be handled gracefully to avoid hazardous
|
||||
// race-conditions upon unexpected thread termination. The mutex
|
||||
// and the map will stay valid even after the driver has been
|
||||
// disposed, so no further synchronization is required.
|
||||
if (!m) return;
|
||||
|
||||
try {
|
||||
synchronized (m)
|
||||
() @trusted { return (cast()this).m_threadCallbacks; } ()
|
||||
.put(tuple(del, param));
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
|
||||
m_events.trigger(m_wakeupEvent, false);
|
||||
}
|
||||
|
||||
|
||||
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
return rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
|
@ -196,6 +239,22 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
@system {
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
private void executeThreadCallbacks()
|
||||
{
|
||||
import std.stdint : intptr_t;
|
||||
|
||||
while (true) {
|
||||
Tuple!(ThreadCallback, intptr_t) del;
|
||||
try {
|
||||
synchronized (m_threadCallbackMutex) {
|
||||
if (m_threadCallbacks.empty) break;
|
||||
del = m_threadCallbacks.consumeOne;
|
||||
}
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
del[0](del[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -7,8 +7,11 @@ import eventcore.drivers.timer;
|
|||
import eventcore.internal.consumablequeue;
|
||||
import eventcore.internal.utils : nogc_assert;
|
||||
import eventcore.internal.win32;
|
||||
import core.sync.mutex : Mutex;
|
||||
import core.time : Duration;
|
||||
import taggedalgebraic;
|
||||
import std.stdint : intptr_t;
|
||||
import std.typecons : Tuple, tuple;
|
||||
|
||||
|
||||
final class WinAPIEventDriverCore : EventDriverCore {
|
||||
|
@ -22,6 +25,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
void delegate() @safe nothrow[HANDLE] m_eventCallbacks;
|
||||
HANDLE m_fileCompletionEvent;
|
||||
ConsumableQueue!IOEvent m_ioEvents;
|
||||
|
||||
shared Mutex m_threadCallbackMutex;
|
||||
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
|
||||
}
|
||||
|
||||
package {
|
||||
|
@ -35,6 +41,14 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
|
||||
registerEvent(m_fileCompletionEvent);
|
||||
m_ioEvents = new ConsumableQueue!IOEvent;
|
||||
|
||||
static if (__VERSION__ >= 2074)
|
||||
m_threadCallbackMutex = new shared Mutex;
|
||||
else {
|
||||
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } ();
|
||||
}
|
||||
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t));
|
||||
m_threadCallbacks.reserve(1000);
|
||||
}
|
||||
|
||||
override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; }
|
||||
|
@ -98,6 +112,26 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
m_exit = false;
|
||||
}
|
||||
|
||||
override void runInOwnerThread(ThreadCallback del, intptr_t param)
|
||||
shared {
|
||||
import core.atomic : atomicLoad;
|
||||
|
||||
auto m = atomicLoad(m_threadCallbackMutex);
|
||||
// NOTE: This case must be handled gracefully to avoid hazardous
|
||||
// race-conditions upon unexpected thread termination. The mutex
|
||||
// and the map will stay valid even after the driver has been
|
||||
// disposed, so no further synchronization is required.
|
||||
if (!m) return;
|
||||
|
||||
try {
|
||||
synchronized (m)
|
||||
() @trusted { return (cast()this).m_threadCallbacks; } ()
|
||||
.put(tuple(del, param));
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
|
||||
() @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } ();
|
||||
}
|
||||
|
||||
package void* rawUserDataImpl(HANDLE handle, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
HandleSlot* fds = &m_handles[handle];
|
||||
|
@ -127,6 +161,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
import core.time : seconds;
|
||||
import std.algorithm.comparison : min;
|
||||
|
||||
executeThreadCallbacks();
|
||||
|
||||
bool got_event;
|
||||
|
||||
if (max_wait > 0.seconds) {
|
||||
|
@ -173,6 +209,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
//if (++cnt % 10 == 0) processTimers();
|
||||
}
|
||||
|
||||
executeThreadCallbacks();
|
||||
|
||||
return got_event;
|
||||
}
|
||||
|
||||
|
@ -204,6 +242,22 @@ final class WinAPIEventDriverCore : EventDriverCore {
|
|||
import std.algorithm.searching : canFind;
|
||||
m_ioEvents.filterPending!(evt => !overlapped.canFind(evt.overlapped));
|
||||
}
|
||||
|
||||
private void executeThreadCallbacks()
|
||||
{
|
||||
import std.stdint : intptr_t;
|
||||
|
||||
while (true) {
|
||||
Tuple!(ThreadCallback, intptr_t) del;
|
||||
try {
|
||||
synchronized (m_threadCallbackMutex) {
|
||||
if (m_threadCallbacks.empty) break;
|
||||
del = m_threadCallbacks.consumeOne;
|
||||
}
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
del[0](del[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long currStdTime()
|
||||
|
|
|
@ -60,15 +60,16 @@ final class WinAPIEventDriver : EventDriver {
|
|||
|
||||
@safe: /*@nogc:*/ nothrow:
|
||||
|
||||
override @property WinAPIEventDriverCore core() { return m_core; }
|
||||
override @property WinAPIEventDriverFiles files() { return m_files; }
|
||||
override @property WinAPIEventDriverSockets sockets() { return m_sockets; }
|
||||
override @property WinAPIEventDriverDNS dns() { return m_dns; }
|
||||
override @property LoopTimeoutTimerDriver timers() { return m_timers; }
|
||||
override @property WinAPIEventDriverEvents events() { return m_events; }
|
||||
override @property shared(WinAPIEventDriverEvents) events() shared { return m_events; }
|
||||
override @property WinAPIEventDriverSignals signals() { return m_signals; }
|
||||
override @property WinAPIEventDriverWatchers watchers() { return m_watchers; }
|
||||
override @property inout(WinAPIEventDriverCore) core() inout { return m_core; }
|
||||
override @property shared(inout(WinAPIEventDriverCore)) core() inout shared { return m_core; }
|
||||
override @property inout(WinAPIEventDriverFiles) files() inout { return m_files; }
|
||||
override @property inout(WinAPIEventDriverSockets) sockets() inout { return m_sockets; }
|
||||
override @property inout(WinAPIEventDriverDNS) dns() inout { return m_dns; }
|
||||
override @property inout(LoopTimeoutTimerDriver) timers() inout { return m_timers; }
|
||||
override @property inout(WinAPIEventDriverEvents) events() inout { return m_events; }
|
||||
override @property shared(inout(WinAPIEventDriverEvents)) events() inout shared { return m_events; }
|
||||
override @property inout(WinAPIEventDriverSignals) signals() inout { return m_signals; }
|
||||
override @property inout(WinAPIEventDriverWatchers) watchers() inout { return m_watchers; }
|
||||
|
||||
override void dispose()
|
||||
{
|
||||
|
|
37
tests/0-runinownerthread.d
Normal file
37
tests/0-runinownerthread.d
Normal file
|
@ -0,0 +1,37 @@
|
|||
/++ dub.sdl:
|
||||
name "test"
|
||||
dependency "eventcore" path=".."
|
||||
+/
|
||||
module test;
|
||||
|
||||
import eventcore.core;
|
||||
import core.thread;
|
||||
import std.stdint;
|
||||
|
||||
intptr_t s_id; // thread-local
|
||||
|
||||
void main()
|
||||
{
|
||||
auto ed = cast(shared)eventDriver;
|
||||
|
||||
auto thr = new Thread({ threadFunc(ed); });
|
||||
thr.start();
|
||||
|
||||
// keep the event loop running for one second
|
||||
auto tm = eventDriver.timers.create();
|
||||
eventDriver.timers.set(tm, 1.seconds, 0.seconds);
|
||||
|
||||
ExitReason er;
|
||||
do er = eventDriver.core.processEvents(Duration.max);
|
||||
while (er == ExitReason.idle);
|
||||
assert(er == ExitReason.outOfWaiters);
|
||||
|
||||
assert(s_id == 42);
|
||||
}
|
||||
|
||||
void threadFunc(shared(NativeEventDriver) drv)
|
||||
{
|
||||
drv.core.runInOwnerThread((id) {
|
||||
s_id = id;
|
||||
}, 42);
|
||||
}
|
Loading…
Reference in a new issue