From 06c6e0a4cac8d6bbffd6f35d42db75715917943b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 3 Sep 2017 16:43:50 +0200 Subject: [PATCH] Add basic UDP multicast support. This also adds a new setOption method to handle boolean socket options in a generic way. --- source/eventcore/driver.d | 19 ++++++ source/eventcore/drivers/posix/sockets.d | 71 +++++++++++++++++++++++ source/eventcore/drivers/winapi/sockets.d | 51 ++++++++++++++++ 3 files changed, 141 insertions(+) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 1497fce..76bae24 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -278,6 +278,9 @@ interface EventDriverSockets { /// Sets the `SO_BROADCAST` socket option. bool setBroadcast(DatagramSocketFD socket, bool enable); + /// Joins the multicast group associated with the given IP address. + bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address); + /// Receives a single datagram. void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); /// Cancels an ongoing wait for an incoming datagram. @@ -301,6 +304,12 @@ interface EventDriverSockets { */ bool releaseRef(SocketFD descriptor); + /** Enables or disables a socket option. + */ + bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable); + /// ditto + bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable); + /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; /// ditto @@ -561,6 +570,16 @@ enum StreamListenOptions { reusePort = 1<<0, } +enum StreamSocketOption { + noDelay, + keepAlive +} + +enum DatagramSocketOption { + broadcast, + multicastLoopback +} + /** Specifies how a file is manipulated on disk. */ diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 0331f93..cc0f62e 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -25,6 +25,26 @@ version (linux) { extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc; static if (!is(typeof(SOCK_NONBLOCK))) enum SOCK_NONBLOCK = 0x800; + + static if (__VERSION__ < 2077) + { + enum IP_ADD_MEMBERSHIP = 35; + enum IP_MULTICAST_LOOP = 34; + } + else + import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; +} +version(OSX) { + static if (__VERSION__ < 2077) { + enum IP_ADD_MEMBERSHIP = 12; + enum IP_MULTICAST_LOOP = 11; + } else import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; +} +version(FreeBSD) { + static if (__VERSION__ < 2077) { + enum IP_ADD_MEMBERSHIP = 12; + enum IP_MULTICAST_LOOP = 11; + } else import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; } version (Windows) { import core.sys.windows.windows; @@ -557,6 +577,35 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; } + final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address) + { + switch (multicast_address.addressFamily) { + default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); + case AddressFamily.INET: + struct ip_mreq { + in_addr imr_multiaddr; /* IP multicast address of group */ + in_addr imr_interface; /* local IP address of interface */ + } + auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } (); + ip_mreq mreq; + mreq.imr_multiaddr = addr.sin_addr; + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0; + case AddressFamily.INET6: + version (Windows) { + struct ipv6_mreq { + in6_addr ipv6mr_multiaddr; + uint ipv6mr_interface; + } + } + auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } (); + ipv6_mreq mreq; + mreq.ipv6mr_multiaddr = addr.sin6_addr; + mreq.ipv6mr_interface = 0; + return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0; + } + } + void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish) @trusted { // DMD 2.072.0-b2: scope considered unsafe import std.typecons : scoped; @@ -717,6 +766,28 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return true; } + final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) + { + int proto, opt; + final switch (option) { + case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; + case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break; + } + int tmp = enable; + return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; + } + + final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) + { + int proto, opt; + final switch (option) { + case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; + case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break; + } + int tmp = enable; + return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; + } + final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { return rawUserDataImpl(descriptor, size, initialize, destroy); diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index d6c5edb..f8f2323 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -444,6 +444,35 @@ final class WinAPIEventDriverSockets : EventDriverSockets { return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; } + final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address) + { + import std.socket : AddressFamily; + + switch (multicast_address.addressFamily) { + default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); + case AddressFamily.INET: + struct ip_mreq { + in_addr imr_multiaddr; /* IP multicast address of group */ + in_addr imr_interface; /* local IP address of interface */ + } + auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } (); + ip_mreq mreq; + mreq.imr_multiaddr = addr.sin_addr; + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0; + case AddressFamily.INET6: + struct ipv6_mreq { + in6_addr ipv6mr_multiaddr; + uint ipv6mr_interface; + } + auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } (); + ipv6_mreq mreq; + mreq.ipv6mr_multiaddr = addr.sin6_addr; + mreq.ipv6mr_interface = 0; + return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0; + } + } + override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish) { auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); @@ -665,6 +694,28 @@ final class WinAPIEventDriverSockets : EventDriverSockets { return true; } + final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) + { + int proto, opt; + final switch (option) { + case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; + case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break; + } + int tmp = enable; + return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; + } + + final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) + { + int proto, opt; + final switch (option) { + case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; + case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break; + } + int tmp = enable; + return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; + } + final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { return rawUserDataImpl(descriptor, size, initialize, destroy);