Add a draft version of a managed socket wrapper API.
This commit is contained in:
parent
c4e985b73c
commit
45931f2c17
104
source/eventcore/socket.d
Normal file
104
source/eventcore/socket.d
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
module eventcore.socket;
|
||||||
|
|
||||||
|
import eventcore.core : eventDriver;
|
||||||
|
import eventcore.driver;
|
||||||
|
import std.exception : enforce;
|
||||||
|
import std.socket : Address;
|
||||||
|
|
||||||
|
|
||||||
|
StreamSocket connectStream(scope Address peer_address, ConnectCallback on_connect)
|
||||||
|
@safe {
|
||||||
|
auto fd = eventDriver.sockets.connectStream(peer_address, on_connect);
|
||||||
|
enforce(fd != DatagramSocketFD.init, "Failed to create socket.");
|
||||||
|
return StreamSocket(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
StreamListenSocket listenStream(scope Address bind_address, AcceptCallback on_accept)
|
||||||
|
@safe {
|
||||||
|
auto fd = eventDriver.sockets.listenStream(bind_address, on_accept);
|
||||||
|
enforce(fd != DatagramSocketFD.init, "Failed to create socket.");
|
||||||
|
return StreamListenSocket(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatagramSocket createDatagramSocket(scope Address bind_address, scope Address target_address = null)
|
||||||
|
@safe {
|
||||||
|
auto fd = eventDriver.sockets.createDatagramSocket(bind_address, target_address);
|
||||||
|
enforce(fd != DatagramSocketFD.init, "Failed to create socket.");
|
||||||
|
return DatagramSocket(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*alias ConnectCallback = void delegate(ConnectStatus);
|
||||||
|
alias AcceptCallback = void delegate(StreamSocket);
|
||||||
|
alias IOCallback = void delegate(IOStatus, size_t);*/
|
||||||
|
|
||||||
|
struct StreamSocket {
|
||||||
|
@safe: nothrow:
|
||||||
|
|
||||||
|
private StreamSocketFD m_fd;
|
||||||
|
|
||||||
|
private this(StreamSocketFD fd)
|
||||||
|
{
|
||||||
|
m_fd = fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
this(this) { if (m_fd != StreamSocketFD.init) eventDriver.sockets.addRef(m_fd); }
|
||||||
|
~this() { if (m_fd != StreamSocketFD.init) eventDriver.sockets.releaseRef(m_fd); }
|
||||||
|
|
||||||
|
@property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); }
|
||||||
|
@property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); }
|
||||||
|
|
||||||
|
void read(ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { eventDriver.sockets.read(m_fd, buffer, mode, on_read_finish); }
|
||||||
|
void cancelRead() { eventDriver.sockets.cancelRead(m_fd); }
|
||||||
|
void waitForData(IOCallback on_data_available) { eventDriver.sockets.waitForData(m_fd, on_data_available); }
|
||||||
|
void write(const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { eventDriver.sockets.write(m_fd, buffer, mode, on_write_finish); }
|
||||||
|
void cancelWrite() { eventDriver.sockets.cancelWrite(m_fd); }
|
||||||
|
void shutdown(bool shut_read = true, bool shut_write = true) { eventDriver.sockets.shutdown(m_fd, shut_read, shut_write); }
|
||||||
|
}
|
||||||
|
|
||||||
|
struct StreamListenSocket {
|
||||||
|
@safe: nothrow:
|
||||||
|
|
||||||
|
private StreamListenSocketFD m_fd;
|
||||||
|
|
||||||
|
private this(StreamListenSocketFD fd)
|
||||||
|
{
|
||||||
|
m_fd = fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
this(this) { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.addRef(m_fd); }
|
||||||
|
~this() { if (m_fd != StreamListenSocketFD.init) eventDriver.sockets.releaseRef(m_fd); }
|
||||||
|
|
||||||
|
void waitForConnections(AcceptCallback on_accept)
|
||||||
|
{
|
||||||
|
eventDriver.sockets.waitForConnections(m_fd, on_accept);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DatagramSocket {
|
||||||
|
@safe: nothrow:
|
||||||
|
|
||||||
|
private DatagramSocketFD m_fd;
|
||||||
|
|
||||||
|
private this(DatagramSocketFD fd)
|
||||||
|
{
|
||||||
|
m_fd = fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
this(this) { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.addRef(m_fd); }
|
||||||
|
~this() { if (m_fd != DatagramSocketFD.init) eventDriver.sockets.releaseRef(m_fd); }
|
||||||
|
}
|
||||||
|
|
||||||
|
void receive(alias callback)(ref DatagramSocket socket, ubyte[] buffer, IOMode mode) {
|
||||||
|
void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope Address address) @safe nothrow {
|
||||||
|
callback(status, bytes_written, address);
|
||||||
|
}
|
||||||
|
eventDriver.sockets.receive(socket.m_fd, buffer, mode, &cb);
|
||||||
|
}
|
||||||
|
void cancelReceive(ref DatagramSocket socket) { eventDriver.sockets.cancelReceive(socket.m_fd); }
|
||||||
|
void send(alias callback)(ref DatagramSocket socket, const(ubyte)[] buffer, IOMode mode, Address target_address = null) {
|
||||||
|
void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope Address) @safe nothrow {
|
||||||
|
callback(status, bytes_written);
|
||||||
|
}
|
||||||
|
eventDriver.sockets.send(socket.m_fd, buffer, mode, &cb, target_address);
|
||||||
|
}
|
||||||
|
void cancelSend(ref DatagramSocket socket) { eventDriver.sockets.cancelSend(socket.m_fd); }
|
77
tests/0-udp.d
Normal file
77
tests/0-udp.d
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
/++ dub.sdl:
|
||||||
|
name "test"
|
||||||
|
dependency "eventcore" path=".."
|
||||||
|
+/
|
||||||
|
module test;
|
||||||
|
|
||||||
|
import eventcore.core;
|
||||||
|
import eventcore.socket;
|
||||||
|
import std.socket : InternetAddress;
|
||||||
|
|
||||||
|
DatagramSocket s_baseSocket;
|
||||||
|
DatagramSocket s_freeSocket;
|
||||||
|
DatagramSocket s_connectedSocket;
|
||||||
|
ubyte[256] s_rbuf;
|
||||||
|
bool s_done;
|
||||||
|
|
||||||
|
void main()
|
||||||
|
{
|
||||||
|
static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||||
|
static ubyte[] pack2 = [4, 3, 2, 1, 0];
|
||||||
|
|
||||||
|
auto baddr = new InternetAddress(0x7F000001, 40001);
|
||||||
|
auto anyaddr = new InternetAddress(0x7F000001, 0);
|
||||||
|
s_baseSocket = createDatagramSocket(baddr);
|
||||||
|
s_freeSocket = createDatagramSocket(anyaddr);
|
||||||
|
s_connectedSocket = createDatagramSocket(anyaddr, baddr);
|
||||||
|
s_baseSocket.receive!((status, bytes, addr) {
|
||||||
|
log("receive initial: %s %s", status, bytes);
|
||||||
|
assert(status == IOStatus.wouldBlock);
|
||||||
|
})(s_rbuf, IOMode.immediate);
|
||||||
|
s_baseSocket.receive!((status, bts, address) {
|
||||||
|
log("receive1: %s %s", status, bts);
|
||||||
|
assert(status == IOStatus.ok);
|
||||||
|
assert(bts == pack1.length);
|
||||||
|
assert(s_rbuf[0 .. pack1.length] == pack1);
|
||||||
|
|
||||||
|
s_freeSocket.send!((status, bytes) {
|
||||||
|
log("send2: %s %s", status, bytes);
|
||||||
|
assert(status == IOStatus.ok);
|
||||||
|
assert(bytes == pack2.length);
|
||||||
|
})(pack2, IOMode.once, baddr);
|
||||||
|
|
||||||
|
s_baseSocket.receive!((status, bts, scope addr) {
|
||||||
|
log("receive2: %s %s", status, bts);
|
||||||
|
assert(status == IOStatus.ok);
|
||||||
|
assert(bts == pack2.length);
|
||||||
|
assert(s_rbuf[0 .. pack2.length] == pack2);
|
||||||
|
|
||||||
|
destroy(s_baseSocket);
|
||||||
|
destroy(s_freeSocket);
|
||||||
|
destroy(s_connectedSocket);
|
||||||
|
s_done = true;
|
||||||
|
log("done.");
|
||||||
|
|
||||||
|
// FIXME: this shouldn't ne necessary:
|
||||||
|
eventDriver.core.exit();
|
||||||
|
})(s_rbuf, IOMode.immediate);
|
||||||
|
})(s_rbuf, IOMode.once);
|
||||||
|
s_connectedSocket.send!((status, bytes) {
|
||||||
|
log("send1: %s %s", status, bytes);
|
||||||
|
assert(status == IOStatus.ok);
|
||||||
|
assert(bytes == 10);
|
||||||
|
})(pack1, IOMode.immediate);
|
||||||
|
|
||||||
|
ExitReason er;
|
||||||
|
do er = eventDriver.core.processEvents();
|
||||||
|
while (er == ExitReason.idle);
|
||||||
|
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
||||||
|
assert(s_done);
|
||||||
|
}
|
||||||
|
|
||||||
|
void log(ARGS...)(string fmt, ARGS args)
|
||||||
|
@trusted {
|
||||||
|
import std.stdio;
|
||||||
|
try writefln(fmt, args);
|
||||||
|
catch (Exception e) assert(false, e.msg);
|
||||||
|
}
|
Loading…
Reference in a new issue