diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 08c48a2..a8300d0 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -5,13 +5,16 @@ import eventcore.driver; import eventcore.drivers.posix.driver; import eventcore.internal.consumablequeue : ConsumableQueue; -import std.socket : InternetAddress; - version (linux) { nothrow @nogc extern (C) int eventfd(uint initval, int flags); - import core.sys.posix.unistd : close, read, write; enum EFD_NONBLOCK = 0x800; } +version (Posix) { + import core.sys.posix.unistd : close, read, write; +} else { + import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM, + bind, connect, getsockname, send, socket; +} final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents { @@ -19,10 +22,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS private { Loop m_loop; Sockets m_sockets; + ubyte[ulong.sizeof] m_buf; version (linux) {} else { - EventSlot[DatagramSocketFD] m_events; - ubyte[long.sizeof] m_buf; + // TODO: avoid the overhead of a mutex backed map here + import core.sync.mutex : Mutex; + Mutex m_eventsMutex; + EventID[DatagramSocketFD] m_events; } } @@ -30,6 +36,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS { m_loop = loop; m_sockets = sockets; + version (linux) {} + else m_eventsMutex = new Mutex; } package @property Loop loop() { return m_loop; } @@ -53,13 +61,54 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS assert(getRC(id) == 1); return id; } else { - auto addr = new InternetAddress(0x7F000001, 0); - auto s = m_sockets.createDatagramSocketInternal(addr, addr, true); - if (s == DatagramSocketFD.invalid) return EventID.invalid; + sock_t[2] fd; + version (Posix) { + // create a pair of sockets to communicate between threads + import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair; + if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0) + return EventID.invalid; + + assert(fd[0] != fd[1]); + + // use the first socket as the async receiver + auto s = m_sockets.adoptDatagramSocketInternal(fd[0]); + } else { + // fake missing socketpair support on Windows + import std.socket : InternetAddress; + auto addr = new InternetAddress(0x7F000001, 0); + auto s = m_sockets.createDatagramSocketInternal(addr, null, true); + if (s == DatagramSocketFD.invalid) return EventID.invalid; + fd[0] = cast(sock_t)s; + if (!() @trusted { + fd[1] = socket(AF_INET, SOCK_DGRAM, 0); + int nl = addr.nameLen; + import eventcore.internal.utils : print; + if (bind(fd[1], addr.name, addr.nameLen) != 0) + return false; + assert(nl == addr.nameLen); + if (getsockname(fd[0], addr.name, &nl) != 0) + return false; + if (connect(fd[1], addr.name, addr.nameLen) != 0) + return false; + return true; + } ()) + { + m_sockets.releaseRef(s); + return EventID.invalid; + } + } + m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation - m_sockets.releaseRef(s); // receive() increments the reference count, but we need a value of 1 upon return - auto id = cast(EventID)s; + + // use the second socket as the event ID and as the sending end for + // other threads + auto id = cast(EventID)fd[1]; + try { + synchronized (m_eventsMutex) + m_events[s] = id; + } catch (Exception e) assert(false, e.msg); + m_loop.initFD(id, FDFlags.internal); + m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s); // FIXME: avoid dynamic memory allocation assert(getRC(id) == 1); return id; } @@ -91,8 +140,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS long one = 1; //log("emitting for all threads"); if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (linux) () @trusted { .write(cast(int)event, &one, one.sizeof); } (); - else thisus.m_sockets.send(cast(DatagramSocketFD)event, thisus.m_buf, IOMode.once, null, &thisus.onSocketDataSent); + version (Posix) .write(cast(int)event, &one, one.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -124,13 +173,15 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS version (linux) {} else { - private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress) - { - } private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) { - onEvent(cast(EventID)s); m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + EventID evt; + try { + synchronized (m_eventsMutex) + evt = m_events[s]; + onEvent(evt); + } catch (Exception e) assert(false, e.msg); } } @@ -143,43 +194,36 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override bool releaseRef(EventID descriptor) { assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); - void destroy() { + if (--getRC(descriptor) == 0) { + if (!isInternal(descriptor)) + m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; () @trusted nothrow { try .destroy(getSlot(descriptor).waiters); catch (Exception e) assert(false, e.msg); } (); - } - version (linux) { - if (--getRC(descriptor) == 0) { - if (!isInternal(descriptor)) - m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; - destroy(); + version (linux) { m_loop.unregisterFD(descriptor, EventMask.read); - m_loop.clearFD(descriptor); - close(cast(int)descriptor); - return false; - } - } else { - if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) { - if (!isInternal(descriptor)) - m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; - destroy(); - m_events.remove(cast(DatagramSocketFD)descriptor); - return false; + } else { + auto rs = getSlot(descriptor).recvSocket; + m_sockets.cancelReceive(rs); + m_sockets.releaseRef(rs); + try { + synchronized (m_eventsMutex) + m_events.remove(rs); + } catch (Exception e) assert(false, e.msg); } + m_loop.clearFD(descriptor); + version (Posix) close(cast(int)descriptor); + else () @trusted { closesocket(cast(SOCKET)descriptor); } (); + return false; } return true; } private EventSlot* getSlot(EventID id) { - version (linux) { - assert(id < m_loop.m_fds.length, "Invalid event ID."); - return () @trusted { return &m_loop.m_fds[id].event(); } (); - } else { - assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID."); - return &m_events[cast(DatagramSocketFD)id]; - } + assert(id < m_loop.m_fds.length, "Invalid event ID."); + return () @trusted { return &m_loop.m_fds[id].event(); } (); } private ref uint getRC(EventID id) @@ -198,4 +242,8 @@ package struct EventSlot { ConsumableQueue!EventCallback waiters; shared bool triggerAll; bool isInternal; + version (linux) {} + else { + DatagramSocketFD recvSocket; + } }