Fix the Windows events implementation in the Posix driver.
This commit is contained in:
parent
924f2087f2
commit
17c4fe65a8
|
@ -54,7 +54,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
|
||||||
|
|
||||||
private {
|
private {
|
||||||
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver);
|
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver);
|
||||||
alias EventsDriver = PosixEventDriverEvents!Loop;
|
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
|
||||||
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
|
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
|
||||||
else alias SignalsDriver = DummyEventDriverSignals!Loop;
|
else alias SignalsDriver = DummyEventDriverSignals!Loop;
|
||||||
alias TimerDriver = LoopTimeoutTimerDriver;
|
alias TimerDriver = LoopTimeoutTimerDriver;
|
||||||
|
@ -80,11 +80,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
|
||||||
this()
|
this()
|
||||||
{
|
{
|
||||||
m_loop = new Loop;
|
m_loop = new Loop;
|
||||||
m_events = new EventsDriver(m_loop);
|
m_sockets = new SocketsDriver(m_loop);
|
||||||
|
m_events = new EventsDriver(m_loop, m_sockets);
|
||||||
m_signals = new SignalsDriver(m_loop);
|
m_signals = new SignalsDriver(m_loop);
|
||||||
m_timers = new TimerDriver;
|
m_timers = new TimerDriver;
|
||||||
m_core = new CoreDriver(m_loop, m_timers, m_events);
|
m_core = new CoreDriver(m_loop, m_timers, m_events);
|
||||||
m_sockets = new SocketsDriver(m_loop);
|
|
||||||
m_dns = new DNSDriver(m_events, m_signals);
|
m_dns = new DNSDriver(m_events, m_signals);
|
||||||
m_files = new FileDriver(m_events);
|
m_files = new FileDriver(m_events);
|
||||||
m_watchers = new WatcherDriver(m_loop);
|
m_watchers = new WatcherDriver(m_loop);
|
||||||
|
@ -1134,20 +1134,22 @@ private void passToDNSCallback()(DNSLookupID id, scope DNSLookupCallback cb, DNS
|
||||||
} catch (Exception e) assert(false, e.msg);
|
} catch (Exception e) assert(false, e.msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
|
||||||
@safe: /*@nogc:*/ nothrow:
|
@safe: /*@nogc:*/ nothrow:
|
||||||
private {
|
private {
|
||||||
Loop m_loop;
|
Loop m_loop;
|
||||||
|
Sockets m_sockets;
|
||||||
version (Windows) {
|
version (Windows) {
|
||||||
static struct ES {
|
EventSlot[DatagramSocketFD] m_events;
|
||||||
uint refCount;
|
ubyte[long.sizeof] m_buf;
|
||||||
EventSlot slot;
|
|
||||||
}
|
|
||||||
ES[EventID] m_events;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this(Loop loop) { m_loop = loop; }
|
this(Loop loop, Sockets sockets)
|
||||||
|
{
|
||||||
|
m_loop = loop;
|
||||||
|
m_sockets = sockets;
|
||||||
|
}
|
||||||
|
|
||||||
final override EventID create()
|
final override EventID create()
|
||||||
{
|
{
|
||||||
|
@ -1159,25 +1161,27 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
||||||
return id;
|
return id;
|
||||||
} else version (Windows) {
|
} else version (Windows) {
|
||||||
auto h = () @trusted { return CreateEvent(null, false, false, null); } ();
|
auto addr = new InternetAddress(0x7F000001, 0);
|
||||||
auto id = EventID(cast(int)h);
|
auto s = m_sockets.createDatagramSocket(addr, addr);
|
||||||
m_events[id] = ES(1, EventSlot(new ConsumableQueue!EventCallback)); // FIXME: avoid dynamic memory allocation
|
if (s == DatagramSocketFD.invalid) print("oops");
|
||||||
return id;
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
|
m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
|
||||||
|
return cast(EventID)s;
|
||||||
} else assert(false, "OS not supported!");
|
} else assert(false, "OS not supported!");
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void trigger(EventID event, bool notify_all = true)
|
final override void trigger(EventID event, bool notify_all = true)
|
||||||
{
|
{
|
||||||
assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent.");
|
auto slot = &getSlot(event);
|
||||||
if (notify_all) {
|
if (notify_all) {
|
||||||
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
|
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
|
||||||
foreach (w; getSlot(event).waiters.consume) {
|
foreach (w; slot.waiters.consume) {
|
||||||
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
|
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
|
||||||
w(event);
|
w(event);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!getSlot(event).waiters.empty)
|
if (!slot.waiters.empty)
|
||||||
getSlot(event).waiters.consumeOne()(event);
|
slot.waiters.consumeOne()(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1189,7 +1193,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
long one = 1;
|
long one = 1;
|
||||||
//log("emitting for all threads");
|
//log("emitting for all threads");
|
||||||
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
||||||
() @trusted { .write(event, &one, one.sizeof); } ();
|
version (Windows)
|
||||||
|
(cast(Sockets)m_sockets).send(cast(DatagramSocketFD)event, cast(ubyte[])m_buf, IOMode.once, null, &onSocketDataSent);
|
||||||
|
else
|
||||||
|
() @trusted { .write(event, &one, one.sizeof); } ();
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void wait(EventID event, EventCallback on_event)
|
final override void wait(EventID event, EventCallback on_event)
|
||||||
|
@ -1217,6 +1224,17 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
trigger(event, all);
|
trigger(event, all);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
version (Windows) {
|
||||||
|
private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
|
||||||
|
{
|
||||||
|
onEvent(cast(EventID)s);
|
||||||
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final override void addRef(EventID descriptor)
|
final override void addRef(EventID descriptor)
|
||||||
{
|
{
|
||||||
assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD.");
|
assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD.");
|
||||||
|
@ -1226,16 +1244,22 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
final override bool releaseRef(EventID descriptor)
|
final override bool releaseRef(EventID descriptor)
|
||||||
{
|
{
|
||||||
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
||||||
if (--getRC(descriptor) == 0) {
|
void destroy() {
|
||||||
() @trusted nothrow {
|
() @trusted nothrow {
|
||||||
scope (failure) assert(false);
|
scope (failure) assert(false);
|
||||||
destroy(getSlot(descriptor).waiters);
|
.destroy(getSlot(descriptor).waiters);
|
||||||
assert(getSlot(descriptor).waiters is null);
|
assert(getSlot(descriptor).waiters is null);
|
||||||
} ();
|
} ();
|
||||||
version (Windows) {
|
}
|
||||||
() @trusted { CloseHandle(cast(HANDLE)cast(int)descriptor); } ();
|
version (Windows) {
|
||||||
m_events.remove(descriptor);
|
if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) {
|
||||||
} else {
|
destroy();
|
||||||
|
m_events.remove(cast(DatagramSocketFD)descriptor);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (--getRC(descriptor) == 0) {
|
||||||
|
destroy();
|
||||||
m_loop.unregisterFD(descriptor);
|
m_loop.unregisterFD(descriptor);
|
||||||
m_loop.clearFD(descriptor);
|
m_loop.clearFD(descriptor);
|
||||||
close(descriptor);
|
close(descriptor);
|
||||||
|
@ -1248,8 +1272,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
private ref EventSlot getSlot(EventID id)
|
private ref EventSlot getSlot(EventID id)
|
||||||
{
|
{
|
||||||
version (Windows) {
|
version (Windows) {
|
||||||
assert(id in m_events, "Invalid event ID.");
|
assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID.");
|
||||||
return m_events[id].slot;
|
return m_events[cast(DatagramSocketFD)id];
|
||||||
} else {
|
} else {
|
||||||
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
||||||
return m_loop.m_fds[id].event();
|
return m_loop.m_fds[id].event();
|
||||||
|
@ -1258,11 +1282,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
|
||||||
|
|
||||||
private ref uint getRC(EventID id)
|
private ref uint getRC(EventID id)
|
||||||
{
|
{
|
||||||
version (Windows) {
|
return m_loop.m_fds[id].common.refCount;
|
||||||
return m_events[id].refCount;
|
|
||||||
} else {
|
|
||||||
return m_loop.m_fds[id].common.refCount;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@ import core.thread : Thread;
|
||||||
bool s_done;
|
bool s_done;
|
||||||
shared EventDriverEvents ss_evts;
|
shared EventDriverEvents ss_evts;
|
||||||
|
|
||||||
|
// TODO: adjust to detect premature loop exit if only an event wait is active (no timer)
|
||||||
|
|
||||||
void test(bool notify_all)
|
void test(bool notify_all)
|
||||||
{
|
{
|
||||||
auto id = eventDriver.events.create();
|
auto id = eventDriver.events.create();
|
||||||
|
|
Loading…
Reference in a new issue