diff --git a/source/eventcore/socket.d b/source/eventcore/socket.d new file mode 100644 index 0000000..4f0e941 --- /dev/null +++ b/source/eventcore/socket.d @@ -0,0 +1,104 @@ +module eventcore.socket; + +import eventcore.core : eventDriver; +import eventcore.driver; +import std.exception : enforce; +import std.socket : Address; + + +StreamSocket connectStream(scope Address peer_address, ConnectCallback on_connect) +@safe { + auto fd = eventDriver.sockets.connectStream(peer_address, on_connect); + enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + return StreamSocket(fd); +} + +StreamListenSocket listenStream(scope Address bind_address, AcceptCallback on_accept) +@safe { + auto fd = eventDriver.sockets.listenStream(bind_address, on_accept); + enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + return StreamListenSocket(fd); +} + +DatagramSocket createDatagramSocket(scope Address bind_address, scope Address target_address = null) +@safe { + auto fd = eventDriver.sockets.createDatagramSocket(bind_address, target_address); + enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + return DatagramSocket(fd); +} + +/*alias ConnectCallback = void delegate(ConnectStatus); +alias AcceptCallback = void delegate(StreamSocket); +alias IOCallback = void delegate(IOStatus, size_t);*/ + +struct StreamSocket { + @safe: nothrow: + + private StreamSocketFD m_fd; + + private this(StreamSocketFD fd) + { + m_fd = fd; + } + + this(this) { if (m_fd != StreamSocketFD.init) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != StreamSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } + + @property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); } + @property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); } + + void read(ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { eventDriver.sockets.read(m_fd, buffer, mode, on_read_finish); } + void cancelRead() { eventDriver.sockets.cancelRead(m_fd); } + void waitForData(IOCallback on_data_available) { eventDriver.sockets.waitForData(m_fd, on_data_available); } + void write(const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { eventDriver.sockets.write(m_fd, buffer, mode, on_write_finish); } + void cancelWrite() { eventDriver.sockets.cancelWrite(m_fd); } + void shutdown(bool shut_read = true, bool shut_write = true) { eventDriver.sockets.shutdown(m_fd, shut_read, shut_write); } +} + +struct StreamListenSocket { + @safe: nothrow: + + private StreamListenSocketFD m_fd; + + private this(StreamListenSocketFD fd) + { + m_fd = fd; + } + + this(this) { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } + + void waitForConnections(AcceptCallback on_accept) + { + eventDriver.sockets.waitForConnections(m_fd, on_accept); + } +} + +struct DatagramSocket { + @safe: nothrow: + + private DatagramSocketFD m_fd; + + private this(DatagramSocketFD fd) + { + m_fd = fd; + } + + this(this) { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } +} + +void receive(alias callback)(ref DatagramSocket socket, ubyte[] buffer, IOMode mode) { + void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope Address address) @safe nothrow { + callback(status, bytes_written, address); + } + eventDriver.sockets.receive(socket.m_fd, buffer, mode, &cb); +} +void cancelReceive(ref DatagramSocket socket) { eventDriver.sockets.cancelReceive(socket.m_fd); } +void send(alias callback)(ref DatagramSocket socket, const(ubyte)[] buffer, IOMode mode, Address target_address = null) { + void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope Address) @safe nothrow { + callback(status, bytes_written); + } + eventDriver.sockets.send(socket.m_fd, buffer, mode, &cb, target_address); +} +void cancelSend(ref DatagramSocket socket) { eventDriver.sockets.cancelSend(socket.m_fd); } diff --git a/tests/0-udp.d b/tests/0-udp.d new file mode 100644 index 0000000..be05b29 --- /dev/null +++ b/tests/0-udp.d @@ -0,0 +1,77 @@ +/++ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ +module test; + +import eventcore.core; +import eventcore.socket; +import std.socket : InternetAddress; + +DatagramSocket s_baseSocket; +DatagramSocket s_freeSocket; +DatagramSocket s_connectedSocket; +ubyte[256] s_rbuf; +bool s_done; + +void main() +{ + static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + static ubyte[] pack2 = [4, 3, 2, 1, 0]; + + auto baddr = new InternetAddress(0x7F000001, 40001); + auto anyaddr = new InternetAddress(0x7F000001, 0); + s_baseSocket = createDatagramSocket(baddr); + s_freeSocket = createDatagramSocket(anyaddr); + s_connectedSocket = createDatagramSocket(anyaddr, baddr); + s_baseSocket.receive!((status, bytes, addr) { + log("receive initial: %s %s", status, bytes); + assert(status == IOStatus.wouldBlock); + })(s_rbuf, IOMode.immediate); + s_baseSocket.receive!((status, bts, address) { + log("receive1: %s %s", status, bts); + assert(status == IOStatus.ok); + assert(bts == pack1.length); + assert(s_rbuf[0 .. pack1.length] == pack1); + + s_freeSocket.send!((status, bytes) { + log("send2: %s %s", status, bytes); + assert(status == IOStatus.ok); + assert(bytes == pack2.length); + })(pack2, IOMode.once, baddr); + + s_baseSocket.receive!((status, bts, scope addr) { + log("receive2: %s %s", status, bts); + assert(status == IOStatus.ok); + assert(bts == pack2.length); + assert(s_rbuf[0 .. pack2.length] == pack2); + + destroy(s_baseSocket); + destroy(s_freeSocket); + destroy(s_connectedSocket); + s_done = true; + log("done."); + + // FIXME: this shouldn't ne necessary: + eventDriver.core.exit(); + })(s_rbuf, IOMode.immediate); + })(s_rbuf, IOMode.once); + s_connectedSocket.send!((status, bytes) { + log("send1: %s %s", status, bytes); + assert(status == IOStatus.ok); + assert(bytes == 10); + })(pack1, IOMode.immediate); + + ExitReason er; + do er = eventDriver.core.processEvents(); + while (er == ExitReason.idle); + //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(s_done); +} + +void log(ARGS...)(string fmt, ARGS args) +@trusted { + import std.stdio; + try writefln(fmt, args); + catch (Exception e) assert(false, e.msg); +}