diff --git a/source/eventcore/socket.d b/source/eventcore/socket.d index 4f0e941..2be0cf5 100644 --- a/source/eventcore/socket.d +++ b/source/eventcore/socket.d @@ -6,31 +6,34 @@ import std.exception : enforce; import std.socket : Address; -StreamSocket connectStream(scope Address peer_address, ConnectCallback on_connect) +StreamSocket connectStream(alias callback)(scope Address peer_address) @safe { - auto fd = eventDriver.sockets.connectStream(peer_address, on_connect); - enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + void cb(StreamSocketFD fd, ConnectStatus status) @safe nothrow { + if (fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(fd); + callback(StreamSocket(fd), status); + if (fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(fd); + } + + auto fd = eventDriver.sockets.connectStream(peer_address, &cb); + enforce(fd != StreamSocketFD.invalid, "Failed to create socket."); + eventDriver.sockets.addRef(fd); return StreamSocket(fd); } -StreamListenSocket listenStream(scope Address bind_address, AcceptCallback on_accept) +StreamListenSocket listenStream(scope Address bind_address) @safe { - auto fd = eventDriver.sockets.listenStream(bind_address, on_accept); - enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + auto fd = eventDriver.sockets.listenStream(bind_address, null); + enforce(fd != StreamListenSocketFD.invalid, "Failed to create socket."); return StreamListenSocket(fd); } DatagramSocket createDatagramSocket(scope Address bind_address, scope Address target_address = null) @safe { auto fd = eventDriver.sockets.createDatagramSocket(bind_address, target_address); - enforce(fd != DatagramSocketFD.init, "Failed to create socket."); + enforce(fd != DatagramSocketFD.invalid, "Failed to create socket."); return DatagramSocket(fd); } -/*alias ConnectCallback = void delegate(ConnectStatus); -alias AcceptCallback = void delegate(StreamSocket); -alias IOCallback = void delegate(IOStatus, size_t);*/ - struct StreamSocket { @safe: nothrow: @@ -41,20 +44,39 @@ struct StreamSocket { m_fd = fd; } - this(this) { if (m_fd != StreamSocketFD.init) eventDriver.sockets.addRef(m_fd); } - ~this() { if (m_fd != StreamSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } + this(this) { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } @property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); } @property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); } - - void read(ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { eventDriver.sockets.read(m_fd, buffer, mode, on_read_finish); } - void cancelRead() { eventDriver.sockets.cancelRead(m_fd); } - void waitForData(IOCallback on_data_available) { eventDriver.sockets.waitForData(m_fd, on_data_available); } - void write(const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { eventDriver.sockets.write(m_fd, buffer, mode, on_write_finish); } - void cancelWrite() { eventDriver.sockets.cancelWrite(m_fd); } - void shutdown(bool shut_read = true, bool shut_write = true) { eventDriver.sockets.shutdown(m_fd, shut_read, shut_write); } } +void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode) +{ + void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { + callback(status, nbytes); + } + eventDriver.sockets.read(socket.m_fd, buffer, mode, &cb); +} +void cancelRead(ref StreamSocket socket) { eventDriver.sockets.cancelRead(socket.m_fd); } +void waitForData(alias callback)(ref StreamSocket socket) +{ + void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { + callback(status, nbytes); + } + eventDriver.sockets.waitForData(socket.m_fd, &cb); +} +void write(alias callback)(ref StreamSocket socket, const(ubyte)[] buffer, IOMode mode) +{ + void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { + callback(status, nbytes); + } + eventDriver.sockets.write(socket.m_fd, buffer, mode, &cb); +} +void cancelWrite(ref StreamSocket socket) { eventDriver.sockets.cancelWrite(socket.m_fd); } +void shutdown(ref StreamSocket socket, bool shut_read = true, bool shut_write = true) { eventDriver.sockets.shutdown(socket.m_fd, shut_read, shut_write); } + + struct StreamListenSocket { @safe: nothrow: @@ -65,15 +87,20 @@ struct StreamListenSocket { m_fd = fd; } - this(this) { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.addRef(m_fd); } - ~this() { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } - - void waitForConnections(AcceptCallback on_accept) - { - eventDriver.sockets.waitForConnections(m_fd, on_accept); - } + this(this) { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } } +void waitForConnections(alias callback)(ref StreamListenSocket socket) +{ + void cb(StreamListenSocketFD, StreamSocketFD sock) @safe nothrow { + auto ss = StreamSocket(sock); + callback(ss); + } + eventDriver.sockets.waitForConnections(socket.m_fd, &cb); +} + + struct DatagramSocket { @safe: nothrow: @@ -84,8 +111,8 @@ struct DatagramSocket { m_fd = fd; } - this(this) { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.addRef(m_fd); } - ~this() { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.releaseRef(m_fd); } + this(this) { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } + ~this() { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } } void receive(alias callback)(ref DatagramSocket socket, ubyte[] buffer, IOMode mode) {