From c4e985b73c0fcf2bc1907947a85928bdfebe85bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 7 Oct 2016 12:39:38 +0200 Subject: [PATCH] Implement UDP socket support. --- README.md | 2 +- source/eventcore/driver.d | 12 +- source/eventcore/drivers/posix.d | 190 ++++++++++++++++++++++++++++--- 3 files changed, 186 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 7325e13..026c701 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Driver development status Feature | SelectEventDriver | EpollEventDriver | IOCPEventDriver | KqueueEventDriver -----------------|-------------------|------------------|-----------------|------------------ TCP Sockets | yes | yes | no | no -UDP Sockets | no | no | no | no +UDP Sockets | yes | yes | no | no USDS | no | no | no | no DNS | no | no | no | no Timers | yes | yes | no | no diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index a79723a..8ec368b 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -83,11 +83,17 @@ interface EventDriverSockets { ConnectionState getConnectionState(StreamSocketFD sock); void setTCPNoDelay(StreamSocketFD socket, bool enable); void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); + void cancelRead(StreamSocketFD socket); void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); + void cancelWrite(StreamSocketFD socket); void waitForData(StreamSocketFD socket, IOCallback on_data_available); void shutdown(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); - void cancelRead(StreamSocketFD socket); - void cancelWrite(StreamSocketFD socket); + + DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address); + void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); + void cancelReceive(DatagramSocketFD socket); + void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, DatagramIOCallback on_send_finish, Address target_address = null); + void cancelSend(DatagramSocketFD socket); /** Increments the reference count of the given resource. */ @@ -195,6 +201,7 @@ interface EventDriverWatchers { alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); +alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope Address); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(int); @@ -302,6 +309,7 @@ alias FD = Handle!("FD", int, -1); alias SocketFD = Handle!("Socket", FD); alias StreamSocketFD = Handle!("Stream", SocketFD); alias StreamListenSocketFD = Handle!("StreamListen", SocketFD); +alias DatagramSocketFD = Handle!("Datagram", SocketFD); alias FileFD = Handle!("File", FD); alias EventID = Handle!("Event", FD); alias TimerID = Handle!("Timer", int); diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 568196c..85e2d76 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -135,7 +135,7 @@ abstract class PosixEventDriver : EventDriver, final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) { - auto sock = cast(StreamSocketFD)createSocket(address.addressFamily); + auto sock = cast(StreamSocketFD)createSocket(address.addressFamily, SOCK_STREAM); if (sock == -1) return StreamSocketFD.invalid; void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; } @@ -187,7 +187,7 @@ abstract class PosixEventDriver : EventDriver, final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) { log("Listen stream"); - auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily); + auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily, SOCK_STREAM); void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; } @@ -259,7 +259,7 @@ abstract class PosixEventDriver : EventDriver, } sizediff_t ret; - () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); + () @trusted { ret = .recv(socket, buffer.ptr, buffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); @@ -323,7 +323,7 @@ abstract class PosixEventDriver : EventDriver, } sizediff_t ret; - () @trusted { ret = recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); + () @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); if (err != EAGAIN) { @@ -355,7 +355,7 @@ abstract class PosixEventDriver : EventDriver, } sizediff_t ret; - () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); + () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); @@ -398,11 +398,9 @@ abstract class PosixEventDriver : EventDriver, override void cancelWrite(StreamSocketFD socket) { - assert(m_fds[socket].readCallback !is null, "Cancelling write when there is no read in progress."); + assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); setNotifyCallback!(EventType.write)(socket, null); - with (m_fds[socket]) { - writeBuffer = null; - } + m_fds[socket].writeBuffer = null; } private void onSocketWrite(FD fd) @@ -411,13 +409,13 @@ abstract class PosixEventDriver : EventDriver, auto socket = cast(StreamSocketFD)fd; sizediff_t ret; - () @trusted { ret = send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } (); + () @trusted { ret = .send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); if (err != EAGAIN) { setNotifyCallback!(EventType.write)(socket, null); - slot.readCallback(socket, IOStatus.error, slot.bytesRead); + slot.writeCallback(socket, IOStatus.error, slot.bytesRead); return; } } @@ -501,6 +499,167 @@ abstract class PosixEventDriver : EventDriver, // TODO! } + DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address) + { + auto sock = cast(DatagramSocketFD)createSocket(bind_address.addressFamily, SOCK_DGRAM); + if (sock == -1) return DatagramSocketFD.invalid; + + if (() @trusted { return bind(sock, bind_address.name, bind_address.nameLen); } () != 0) { + closeSocket(sock); + return DatagramSocketFD.init; + } + + if (target_address && () @trusted { return connect(sock, target_address.name, target_address.nameLen); } () != 0) { + closeSocket(sock); + return DatagramSocketFD.init; + } + + registerFD(sock, EventMask.read|EventMask.write|EventMask.status); + + initFD(sock); + + return sock; + } + + void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish) + { + assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); + + sizediff_t ret; + scope src_addr = new UnknownAddress; + socklen_t src_addr_len = src_addr.nameLen; + () @trusted { ret = .recvfrom(socket, buffer.ptr, buffer.length, 0, src_addr.name, &src_addr_len); } (); + + if (ret < 0) { + auto err = getSocketError(); + if (err != EAGAIN) { + print("sock error %s!", err); + on_receive_finish(socket, IOStatus.error, 0, null); + return; + } + } + + if (ret < 0) { + if (mode == IOMode.immediate) { + on_receive_finish(socket, IOStatus.wouldBlock, 0, null); + } else { + with (m_fds[socket]) { + readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } (); + readMode = mode; + bytesRead = 0; + readBuffer = buffer; + } + + setNotifyCallback!(EventType.read)(socket, &onDgramRead); + } + return; + } + + on_receive_finish(socket, IOStatus.ok, ret, src_addr); + } + + void cancelReceive(DatagramSocketFD socket) + { + assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + setNotifyCallback!(EventType.read)(socket, null); + m_fds[socket].readBuffer = null; + } + + private void onDgramRead(FD fd) + { + auto slot = &m_fds[fd]; + auto socket = cast(DatagramSocketFD)fd; + + sizediff_t ret; + scope src_addr = new UnknownAddress; + socklen_t src_addr_len = src_addr.nameLen; + () @trusted { ret = .recvfrom(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0, src_addr.name, &src_addr_len); } (); + + if (ret < 0) { + auto err = getSocketError(); + if (err != EAGAIN) { + setNotifyCallback!(EventType.read)(socket, null); + () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null); + return; + } + } + + setNotifyCallback!(EventType.read)(socket, null); + () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addr); + } + + void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, DatagramIOCallback on_send_finish, Address target_address = null) + { + assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); + + sizediff_t ret; + if (target_address) { + () @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } (); + m_fds[socket].targetAddr = target_address; + } else { + () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); + } + + if (ret < 0) { + auto err = getSocketError(); + if (err != EAGAIN) { + print("sock error %s!", err); + on_send_finish(socket, IOStatus.error, 0, null); + return; + } + } + + if (ret < 0) { + if (mode == IOMode.immediate) { + on_send_finish(socket, IOStatus.wouldBlock, 0, null); + } else { + with (m_fds[socket]) { + writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } (); + writeMode = mode; + bytesWritten = 0; + writeBuffer = buffer; + } + + setNotifyCallback!(EventType.write)(socket, &onDgramWrite); + } + return; + } + + on_send_finish(socket, IOStatus.ok, ret, null); + } + + void cancelSend(DatagramSocketFD socket) + { + assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); + setNotifyCallback!(EventType.write)(socket, null); + m_fds[socket].writeBuffer = null; + } + + private void onDgramWrite(FD fd) + { + auto slot = &m_fds[fd]; + auto socket = cast(DatagramSocketFD)fd; + + sizediff_t ret; + if (slot.targetAddr) { + () @trusted { ret = .sendto(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0, slot.targetAddr.name, slot.targetAddr.nameLen); } (); + } else { + () @trusted { ret = .send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } (); + } + + if (ret < 0) { + auto err = getSocketError(); + if (err != EAGAIN) { + setNotifyCallback!(EventType.write)(socket, null); + () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); + return; + } + } + + setNotifyCallback!(EventType.write)(socket, null); + () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); + } + final override void addRef(SocketFD fd) { auto pfd = &m_fds[fd]; @@ -695,10 +854,10 @@ abstract class PosixEventDriver : EventDriver, m_fds[fd].callback[evt] = callback; } - private SocketFD createSocket(AddressFamily family) + private SocketFD createSocket(AddressFamily family, int type) { int sock; - () @trusted { sock = socket(family, SOCK_STREAM, 0); } (); + () @trusted { sock = socket(family, type, 0); } (); if (sock == -1) return SocketFD.invalid; setSocketNonBlocking(cast(SocketFD)sock); return cast(SocketFD)sock; @@ -733,12 +892,13 @@ private struct FDSlot { size_t bytesRead; ubyte[] readBuffer; IOMode readMode; - IOCallback readCallback; + IOCallback readCallback; // FIXME: this type only works for stream sockets size_t bytesWritten; const(ubyte)[] writeBuffer; IOMode writeMode; - IOCallback writeCallback; + IOCallback writeCallback; // FIXME: this type only works for stream sockets + Address targetAddr; ConnectCallback connectCallback; AcceptCallback acceptCallback;