Fix threading issues in the generic Posix event implementation.
Uses socketpair non-Linux systems and two separate UDP sockets on Windows for the cross-thread communication.
This commit is contained in:
parent
a5d4cf875c
commit
07d2bafcac
|
@ -5,13 +5,16 @@ import eventcore.driver;
|
||||||
import eventcore.drivers.posix.driver;
|
import eventcore.drivers.posix.driver;
|
||||||
import eventcore.internal.consumablequeue : ConsumableQueue;
|
import eventcore.internal.consumablequeue : ConsumableQueue;
|
||||||
|
|
||||||
import std.socket : InternetAddress;
|
|
||||||
|
|
||||||
version (linux) {
|
version (linux) {
|
||||||
nothrow @nogc extern (C) int eventfd(uint initval, int flags);
|
nothrow @nogc extern (C) int eventfd(uint initval, int flags);
|
||||||
import core.sys.posix.unistd : close, read, write;
|
|
||||||
enum EFD_NONBLOCK = 0x800;
|
enum EFD_NONBLOCK = 0x800;
|
||||||
}
|
}
|
||||||
|
version (Posix) {
|
||||||
|
import core.sys.posix.unistd : close, read, write;
|
||||||
|
} else {
|
||||||
|
import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM,
|
||||||
|
bind, connect, getsockname, send, socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
|
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
|
||||||
|
@ -19,10 +22,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
private {
|
private {
|
||||||
Loop m_loop;
|
Loop m_loop;
|
||||||
Sockets m_sockets;
|
Sockets m_sockets;
|
||||||
|
ubyte[ulong.sizeof] m_buf;
|
||||||
version (linux) {}
|
version (linux) {}
|
||||||
else {
|
else {
|
||||||
EventSlot[DatagramSocketFD] m_events;
|
// TODO: avoid the overhead of a mutex backed map here
|
||||||
ubyte[long.sizeof] m_buf;
|
import core.sync.mutex : Mutex;
|
||||||
|
Mutex m_eventsMutex;
|
||||||
|
EventID[DatagramSocketFD] m_events;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +36,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
{
|
{
|
||||||
m_loop = loop;
|
m_loop = loop;
|
||||||
m_sockets = sockets;
|
m_sockets = sockets;
|
||||||
|
version (linux) {}
|
||||||
|
else m_eventsMutex = new Mutex;
|
||||||
}
|
}
|
||||||
|
|
||||||
package @property Loop loop() { return m_loop; }
|
package @property Loop loop() { return m_loop; }
|
||||||
|
@ -53,13 +61,54 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
assert(getRC(id) == 1);
|
assert(getRC(id) == 1);
|
||||||
return id;
|
return id;
|
||||||
} else {
|
} else {
|
||||||
auto addr = new InternetAddress(0x7F000001, 0);
|
sock_t[2] fd;
|
||||||
auto s = m_sockets.createDatagramSocketInternal(addr, addr, true);
|
version (Posix) {
|
||||||
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
// create a pair of sockets to communicate between threads
|
||||||
|
import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair;
|
||||||
|
if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0)
|
||||||
|
return EventID.invalid;
|
||||||
|
|
||||||
|
assert(fd[0] != fd[1]);
|
||||||
|
|
||||||
|
// use the first socket as the async receiver
|
||||||
|
auto s = m_sockets.adoptDatagramSocketInternal(fd[0]);
|
||||||
|
} else {
|
||||||
|
// fake missing socketpair support on Windows
|
||||||
|
import std.socket : InternetAddress;
|
||||||
|
auto addr = new InternetAddress(0x7F000001, 0);
|
||||||
|
auto s = m_sockets.createDatagramSocketInternal(addr, null, true);
|
||||||
|
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
||||||
|
fd[0] = cast(sock_t)s;
|
||||||
|
if (!() @trusted {
|
||||||
|
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
|
||||||
|
int nl = addr.nameLen;
|
||||||
|
import eventcore.internal.utils : print;
|
||||||
|
if (bind(fd[1], addr.name, addr.nameLen) != 0)
|
||||||
|
return false;
|
||||||
|
assert(nl == addr.nameLen);
|
||||||
|
if (getsockname(fd[0], addr.name, &nl) != 0)
|
||||||
|
return false;
|
||||||
|
if (connect(fd[1], addr.name, addr.nameLen) != 0)
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
} ())
|
||||||
|
{
|
||||||
|
m_sockets.releaseRef(s);
|
||||||
|
return EventID.invalid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
|
||||||
m_sockets.releaseRef(s); // receive() increments the reference count, but we need a value of 1 upon return
|
// use the second socket as the event ID and as the sending end for
|
||||||
auto id = cast(EventID)s;
|
// other threads
|
||||||
|
auto id = cast(EventID)fd[1];
|
||||||
|
try {
|
||||||
|
synchronized (m_eventsMutex)
|
||||||
|
m_events[s] = id;
|
||||||
|
} catch (Exception e) assert(false, e.msg);
|
||||||
|
m_loop.initFD(id, FDFlags.internal);
|
||||||
|
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s); // FIXME: avoid dynamic memory allocation
|
||||||
assert(getRC(id) == 1);
|
assert(getRC(id) == 1);
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
@ -91,8 +140,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
long one = 1;
|
long one = 1;
|
||||||
//log("emitting for all threads");
|
//log("emitting for all threads");
|
||||||
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
||||||
version (linux) () @trusted { .write(cast(int)event, &one, one.sizeof); } ();
|
version (Posix) .write(cast(int)event, &one, one.sizeof);
|
||||||
else thisus.m_sockets.send(cast(DatagramSocketFD)event, thisus.m_buf, IOMode.once, null, &thisus.onSocketDataSent);
|
else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof);
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void wait(EventID event, EventCallback on_event)
|
final override void wait(EventID event, EventCallback on_event)
|
||||||
|
@ -124,13 +173,15 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
|
|
||||||
version (linux) {}
|
version (linux) {}
|
||||||
else {
|
else {
|
||||||
private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
|
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
|
||||||
{
|
{
|
||||||
onEvent(cast(EventID)s);
|
|
||||||
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
|
EventID evt;
|
||||||
|
try {
|
||||||
|
synchronized (m_eventsMutex)
|
||||||
|
evt = m_events[s];
|
||||||
|
onEvent(evt);
|
||||||
|
} catch (Exception e) assert(false, e.msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,43 +194,36 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
final override bool releaseRef(EventID descriptor)
|
final override bool releaseRef(EventID descriptor)
|
||||||
{
|
{
|
||||||
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
||||||
void destroy() {
|
if (--getRC(descriptor) == 0) {
|
||||||
|
if (!isInternal(descriptor))
|
||||||
|
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
|
||||||
() @trusted nothrow {
|
() @trusted nothrow {
|
||||||
try .destroy(getSlot(descriptor).waiters);
|
try .destroy(getSlot(descriptor).waiters);
|
||||||
catch (Exception e) assert(false, e.msg);
|
catch (Exception e) assert(false, e.msg);
|
||||||
} ();
|
} ();
|
||||||
}
|
version (linux) {
|
||||||
version (linux) {
|
|
||||||
if (--getRC(descriptor) == 0) {
|
|
||||||
if (!isInternal(descriptor))
|
|
||||||
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
|
|
||||||
destroy();
|
|
||||||
m_loop.unregisterFD(descriptor, EventMask.read);
|
m_loop.unregisterFD(descriptor, EventMask.read);
|
||||||
m_loop.clearFD(descriptor);
|
} else {
|
||||||
close(cast(int)descriptor);
|
auto rs = getSlot(descriptor).recvSocket;
|
||||||
return false;
|
m_sockets.cancelReceive(rs);
|
||||||
}
|
m_sockets.releaseRef(rs);
|
||||||
} else {
|
try {
|
||||||
if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) {
|
synchronized (m_eventsMutex)
|
||||||
if (!isInternal(descriptor))
|
m_events.remove(rs);
|
||||||
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
|
} catch (Exception e) assert(false, e.msg);
|
||||||
destroy();
|
|
||||||
m_events.remove(cast(DatagramSocketFD)descriptor);
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
m_loop.clearFD(descriptor);
|
||||||
|
version (Posix) close(cast(int)descriptor);
|
||||||
|
else () @trusted { closesocket(cast(SOCKET)descriptor); } ();
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private EventSlot* getSlot(EventID id)
|
private EventSlot* getSlot(EventID id)
|
||||||
{
|
{
|
||||||
version (linux) {
|
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
||||||
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
return () @trusted { return &m_loop.m_fds[id].event(); } ();
|
||||||
return () @trusted { return &m_loop.m_fds[id].event(); } ();
|
|
||||||
} else {
|
|
||||||
assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID.");
|
|
||||||
return &m_events[cast(DatagramSocketFD)id];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ref uint getRC(EventID id)
|
private ref uint getRC(EventID id)
|
||||||
|
@ -198,4 +242,8 @@ package struct EventSlot {
|
||||||
ConsumableQueue!EventCallback waiters;
|
ConsumableQueue!EventCallback waiters;
|
||||||
shared bool triggerAll;
|
shared bool triggerAll;
|
||||||
bool isInternal;
|
bool isInternal;
|
||||||
|
version (linux) {}
|
||||||
|
else {
|
||||||
|
DatagramSocketFD recvSocket;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue