Add basic UDP multicast support.

This also adds a new setOption method to handle boolean socket options in a generic way.
This commit is contained in:
Sönke Ludwig 2017-09-03 16:43:50 +02:00
parent 166ccfe861
commit 06c6e0a4ca
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
3 changed files with 141 additions and 0 deletions

View file

@ -278,6 +278,9 @@ interface EventDriverSockets {
/// Sets the `SO_BROADCAST` socket option. /// Sets the `SO_BROADCAST` socket option.
bool setBroadcast(DatagramSocketFD socket, bool enable); bool setBroadcast(DatagramSocketFD socket, bool enable);
/// Joins the multicast group associated with the given IP address.
bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address);
/// Receives a single datagram. /// Receives a single datagram.
void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish);
/// Cancels an ongoing wait for an incoming datagram. /// Cancels an ongoing wait for an incoming datagram.
@ -301,6 +304,12 @@ interface EventDriverSockets {
*/ */
bool releaseRef(SocketFD descriptor); bool releaseRef(SocketFD descriptor);
/** Enables or disables a socket option.
*/
bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable);
/// ditto
bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable);
/// Low-level user data access. Use `getUserData` instead. /// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
/// ditto /// ditto
@ -561,6 +570,16 @@ enum StreamListenOptions {
reusePort = 1<<0, reusePort = 1<<0,
} }
enum StreamSocketOption {
noDelay,
keepAlive
}
enum DatagramSocketOption {
broadcast,
multicastLoopback
}
/** /**
Specifies how a file is manipulated on disk. Specifies how a file is manipulated on disk.
*/ */

View file

@ -25,6 +25,26 @@ version (linux) {
extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc; extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc;
static if (!is(typeof(SOCK_NONBLOCK))) static if (!is(typeof(SOCK_NONBLOCK)))
enum SOCK_NONBLOCK = 0x800; enum SOCK_NONBLOCK = 0x800;
static if (__VERSION__ < 2077)
{
enum IP_ADD_MEMBERSHIP = 35;
enum IP_MULTICAST_LOOP = 34;
}
else
import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
}
version(OSX) {
static if (__VERSION__ < 2077) {
enum IP_ADD_MEMBERSHIP = 12;
enum IP_MULTICAST_LOOP = 11;
} else import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
}
version(FreeBSD) {
static if (__VERSION__ < 2077) {
enum IP_ADD_MEMBERSHIP = 12;
enum IP_MULTICAST_LOOP = 11;
} else import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
} }
version (Windows) { version (Windows) {
import core.sys.windows.windows; import core.sys.windows.windows;
@ -557,6 +577,35 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
} }
final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address)
{
switch (multicast_address.addressFamily) {
default: assert(false, "Multicast only supported for IPv4/IPv6 sockets.");
case AddressFamily.INET:
struct ip_mreq {
in_addr imr_multiaddr; /* IP multicast address of group */
in_addr imr_interface; /* local IP address of interface */
}
auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } ();
ip_mreq mreq;
mreq.imr_multiaddr = addr.sin_addr;
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0;
case AddressFamily.INET6:
version (Windows) {
struct ipv6_mreq {
in6_addr ipv6mr_multiaddr;
uint ipv6mr_interface;
}
}
auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } ();
ipv6_mreq mreq;
mreq.ipv6mr_multiaddr = addr.sin6_addr;
mreq.ipv6mr_interface = 0;
return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0;
}
}
void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish) void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish)
@trusted { // DMD 2.072.0-b2: scope considered unsafe @trusted { // DMD 2.072.0-b2: scope considered unsafe
import std.typecons : scoped; import std.typecons : scoped;
@ -717,6 +766,28 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return true; return true;
} }
final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable)
{
int proto, opt;
final switch (option) {
case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break;
case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break;
}
int tmp = enable;
return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
}
final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable)
{
int proto, opt;
final switch (option) {
case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break;
case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break;
}
int tmp = enable;
return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
}
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
return rawUserDataImpl(descriptor, size, initialize, destroy); return rawUserDataImpl(descriptor, size, initialize, destroy);

View file

@ -444,6 +444,35 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
} }
final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address)
{
import std.socket : AddressFamily;
switch (multicast_address.addressFamily) {
default: assert(false, "Multicast only supported for IPv4/IPv6 sockets.");
case AddressFamily.INET:
struct ip_mreq {
in_addr imr_multiaddr; /* IP multicast address of group */
in_addr imr_interface; /* local IP address of interface */
}
auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } ();
ip_mreq mreq;
mreq.imr_multiaddr = addr.sin_addr;
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0;
case AddressFamily.INET6:
struct ipv6_mreq {
in6_addr ipv6mr_multiaddr;
uint ipv6mr_interface;
}
auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } ();
ipv6_mreq mreq;
mreq.ipv6mr_multiaddr = addr.sin6_addr;
mreq.ipv6mr_interface = 0;
return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0;
}
}
override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish) override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish)
{ {
auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } ();
@ -665,6 +694,28 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
return true; return true;
} }
final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable)
{
int proto, opt;
final switch (option) {
case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break;
case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break;
}
int tmp = enable;
return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
}
final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable)
{
int proto, opt;
final switch (option) {
case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break;
case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break;
}
int tmp = enable;
return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
}
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
return rawUserDataImpl(descriptor, size, initialize, destroy); return rawUserDataImpl(descriptor, size, initialize, destroy);