From ca81d256453f23c5c9e080a7962dc8ff57efa6a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 22 Jan 2017 10:43:18 +0100 Subject: [PATCH] Implement socket adoption and fix wait loops on Windows for the Posix driver. --- README.md | 3 +- source/eventcore/driver.d | 2 ++ source/eventcore/drivers/libasync.d | 11 ++++++ source/eventcore/drivers/posix.d | 53 ++++++++++++++++++++++------- source/eventcore/drivers/winapi.d | 10 ++++++ 5 files changed, 65 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index b33e288..d155f52 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,10 @@ Timers | yes | yes | yes | &m Events | yes | yes | yes | — Unix Signals | yes² | yes² | — | — Files | yes | yes | yes | — -UI Integration | — | — | yes | — +UI Integration | yes¹ | yes¹ | yes | — File watcher | yes² | yes² | yes | — +¹ Manually, by adopting the X11 display connection socket ² Currently only supported on Linux diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index a765f2b..42ad4b6 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -82,6 +82,7 @@ interface EventDriverCore { interface EventDriverSockets { @safe: /*@nogc:*/ nothrow: StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect); + StreamSocketFD adoptStream(int socket); StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); ConnectionState getConnectionState(StreamSocketFD sock); @@ -96,6 +97,7 @@ interface EventDriverSockets { void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write); DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address); + DatagramSocketFD adoptDatagramSocket(int socket); bool setBroadcast(DatagramSocketFD socket, bool enable); void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); void cancelReceive(DatagramSocketFD socket); diff --git a/source/eventcore/drivers/libasync.d b/source/eventcore/drivers/libasync.d index b11b3de..09e8da9 100644 --- a/source/eventcore/drivers/libasync.d +++ b/source/eventcore/drivers/libasync.d @@ -89,6 +89,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } + override StreamSocketFD adoptStream(int socket) + { + assert(false, "TODO!"); + } + override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) { assert(false, "TODO!"); @@ -154,6 +159,12 @@ final class LibasyncEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } + override DatagramSocketFD adoptDatagramSocket(int socket) + { + assert(false); + } + + override bool setBroadcast(DatagramSocketFD socket, bool enable) { assert(false, "TODO!"); diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 0dc89ee..bfc0aca 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -12,6 +12,8 @@ import eventcore.drivers.threadedfile; import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.utils; +import std.algorithm.comparison : among; + import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress; version (Posix) { import std.socket : UnixAddress; @@ -255,7 +257,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets on_connect(sock, ConnectStatus.connected); } else { auto err = getSocketError(); - if (err == EINPROGRESS) { + if (err.among!(EAGAIN, EINPROGRESS)) { with (m_loop.m_fds[sock].streamSocket) { connectCallback = on_connect; state = ConnectionState.connecting; @@ -273,6 +275,18 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return sock; } + final override StreamSocketFD adoptStream(int socket) + { + auto fd = StreamSocketFD(socket); + if (m_loop.m_fds[fd].common.refCount) // FD already in use? + return StreamSocketFD.invalid; + setSocketNonBlocking(fd); + m_loop.initFD(fd); + m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.m_fds[fd].specific = StreamSocketSlot.init; + return fd; + } + private void onConnect(FD sock) { m_loop.setNotifyCallback!(EventType.write)(sock, null); @@ -348,6 +362,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto fd = cast(StreamSocketFD)sockfd; m_loop.initFD(fd); m_loop.m_fds[fd].specific = StreamSocketSlot.init; + m_loop.m_fds[fd].specific.state = ConnectionState.connected; m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); //print("accept %d", sockfd); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); @@ -394,7 +409,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { print("sock error %s!", err); on_read_finish(socket, IOStatus.error, 0); return; @@ -458,7 +473,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets () @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { finalize(IOStatus.error); return; } @@ -492,7 +507,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { on_write_finish(socket, IOStatus.error, 0); return; } @@ -550,7 +565,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(socket, IOStatus.error, slot.bytesRead); return; @@ -582,7 +597,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { on_data_available(socket, IOStatus.error, 0); return; } @@ -627,7 +642,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets () @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } (); if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) finalize(IOStatus.error); + if (!err.among!(EAGAIN, EINPROGRESS)) finalize(IOStatus.error); } else finalize(ret ? IOStatus.ok : IOStatus.disconnected); } @@ -662,6 +677,18 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return sock; } + final override DatagramSocketFD adoptDatagramSocket(int socket) + { + auto fd = DatagramSocketFD(socket); + if (m_loop.m_fds[fd].common.refCount) // FD already in use? + return DatagramSocketFD.init; + setSocketNonBlocking(fd); + m_loop.initFD(fd); + m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); + m_loop.m_fds[fd].specific = DgramSocketSlot.init; + return fd; + } + final override bool setBroadcast(DatagramSocketFD socket, bool enable) { int tmp_broad = enable; @@ -681,7 +708,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { print("sock error %s!", err); on_receive_finish(socket, IOStatus.error, 0, null); return; @@ -725,7 +752,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { m_loop.setNotifyCallback!(EventType.read)(socket, null); slot.readCallback(socket, IOStatus.error, 0, null); return; @@ -751,7 +778,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { print("sock error %s!", err); on_send_finish(socket, IOStatus.error, 0, null); return; @@ -796,7 +823,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (err != EAGAIN) { + if (!err.among!(EAGAIN, EINPROGRESS)) { m_loop.setNotifyCallback!(EventType.write)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); return; @@ -1344,7 +1371,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna signalfd_siginfo nfo; do { auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } (); - if (ret == -1 && errno == EAGAIN) + if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS)) break; auto cb = m_loop.m_fds[fd].signal.callback; if (ret != nfo.sizeof) { @@ -1456,7 +1483,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch while (true) { auto ret = () @trusted { return read(id, &buf[0], buf.length); } (); - if (ret == -1 && errno == EAGAIN) + if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS)) break; assert(ret <= buf.length); diff --git a/source/eventcore/drivers/winapi.d b/source/eventcore/drivers/winapi.d index 5bbed2e..6aecbbf 100644 --- a/source/eventcore/drivers/winapi.d +++ b/source/eventcore/drivers/winapi.d @@ -241,6 +241,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } + override StreamSocketFD adoptStream(int socket) + { + assert(false, "TODO!"); + } + override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) { assert(false, "TODO!"); @@ -306,6 +311,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { assert(false, "TODO!"); } + override DatagramSocketFD adoptDatagramSocket(int socket) + { + assert(false); + } + override bool setBroadcast(DatagramSocketFD socket, bool enable) { assert(false, "TODO!");