Add EventDriverSockets.getLocalAddress and make parameters scope where possible.

This commit is contained in:
Sönke Ludwig 2017-01-15 20:56:06 +01:00
parent d8f10972eb
commit 116940a5a7
4 changed files with 70 additions and 32 deletions

View file

@ -83,6 +83,7 @@ interface EventDriverSockets {
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
ConnectionState getConnectionState(StreamSocketFD sock); ConnectionState getConnectionState(StreamSocketFD sock);
bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst);
void setTCPNoDelay(StreamSocketFD socket, bool enable); void setTCPNoDelay(StreamSocketFD socket, bool enable);
void setKeepAlive(StreamSocketFD socket, bool enable); void setKeepAlive(StreamSocketFD socket, bool enable);
void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
@ -232,12 +233,37 @@ interface EventDriverWatchers {
bool releaseRef(WatcherID descriptor); bool releaseRef(WatcherID descriptor);
} }
final class RefAddress : Address {
version (Posix) import core.sys.posix.sys.socket : sockaddr, socklen_t;
version (Windows) import core.sys.windows.winsock2 : sockaddr, socklen_t;
private {
sockaddr* m_addr;
socklen_t m_addrLen;
}
this() @safe nothrow {}
this(sockaddr* addr, socklen_t addr_len) @safe nothrow { set(addr, addr_len); }
override @property sockaddr* name() { return m_addr; }
override @property const(sockaddr)* name() const { return m_addr; }
override @property socklen_t nameLen() const { return m_addrLen; }
void set(sockaddr* addr, socklen_t addr_len) @safe nothrow { m_addr = addr; m_addrLen = addr_len; }
void cap(socklen_t new_len)
@safe nothrow {
assert(new_len <= m_addrLen, "Cannot grow size of a RefAddress.");
m_addrLen = new_len;
}
}
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address);
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, /*scope*/ Address); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, /*scope*/ Address[]); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);

View file

@ -99,6 +99,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst)
{
assert(false, "TODO!");
}
override void setTCPNoDelay(StreamSocketFD socket, bool enable) override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{ {
assert(false, "TODO!"); assert(false, "TODO!");

View file

@ -324,8 +324,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
foreach (i; 0 .. 20) { foreach (i; 0 .. 20) {
int sockfd; int sockfd;
sockaddr_storage addr; sockaddr_storage addr;
version (Windows) int addr_len = addr.sizeof; socklen_t addr_len = addr.sizeof;
else uint addr_len = addr.sizeof;
() @trusted { sockfd = accept(listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } (); () @trusted { sockfd = accept(listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
if (sockfd == -1) break; if (sockfd == -1) break;
@ -335,7 +334,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.m_fds[fd].specific = StreamSocketSlot.init; m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
//print("accept %d", sockfd); //print("accept %d", sockfd);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
} }
} }
@ -344,6 +344,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return m_loop.m_fds[sock].streamSocket.state; return m_loop.m_fds[sock].streamSocket.state;
} }
bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst)
{
socklen_t addr_len = dst.nameLen;
if (() @trusted { return getsockname(sock, dst.name, &addr_len); } () != 0)
return false;
dst.cap(addr_len);
return true;
}
final override void setTCPNoDelay(StreamSocketFD socket, bool enable) final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{ {
int opt = enable; int opt = enable;
@ -643,9 +653,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
sizediff_t ret; sizediff_t ret;
scope src_addr = new UnknownAddress(); sockaddr_storage src_addr;
socklen_t src_addr_len = src_addr.nameLen; socklen_t src_addr_len = src_addr.sizeof;
() @trusted { ret = .recvfrom(socket, buffer.ptr, buffer.length, 0, src_addr.name, &src_addr_len); } (); () @trusted { ret = .recvfrom(socket, buffer.ptr, buffer.length, 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
@ -670,7 +680,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return; return;
} }
on_receive_finish(socket, IOStatus.ok, ret, src_addr); scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr_len);
on_receive_finish(socket, IOStatus.ok, ret, src_addrc);
} }
void cancelReceive(DatagramSocketFD socket) void cancelReceive(DatagramSocketFD socket)
@ -686,9 +697,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
auto socket = cast(DatagramSocketFD)fd; auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret; sizediff_t ret;
scope src_addr = new UnknownAddress; sockaddr_storage src_addr;
socklen_t src_addr_len = src_addr.nameLen; socklen_t src_addr_len = src_addr.sizeof;
() @trusted { ret = .recvfrom(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0, src_addr.name, &src_addr_len); } (); () @trusted { ret = .recvfrom(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
@ -700,7 +711,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addr); scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addrc);
} }
void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish) void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish)
@ -1012,29 +1024,20 @@ private void passToDNSCallback(DNSLookupID id, scope DNSLookupCallback cb, DNSSt
{ {
import std.typecons : scoped; import std.typecons : scoped;
static final class RefAddr : Address {
sockaddr* sa;
socklen_t len;
override @property sockaddr* name() { return sa; }
override @property const(sockaddr)* name() const { return sa; }
override @property socklen_t nameLen() const { return len; }
}
try { try {
typeof(scoped!RefAddr())[16] addrs_prealloc = [ typeof(scoped!RefAddress())[16] addrs_prealloc = [
scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr(), scoped!RefAddr() scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress()
]; ];
//Address[16] addrs; //Address[16] addrs;
auto addrs = new Address[16]; // FIXME: avoid heap allocation RefAddress[16] addrs;
auto ai = ai_orig; auto ai = ai_orig;
size_t addr_count = 0; size_t addr_count = 0;
while (ai !is null && addr_count < addrs.length) { while (ai !is null && addr_count < addrs.length) {
RefAddr ua = new RefAddr;//addrs_prealloc[addr_count]; // FIXME: avoid heap allocation RefAddress ua = addrs_prealloc[addr_count]; // FIXME: avoid heap allocation
ua.sa = ai.ai_addr; ua.set(ai.ai_addr, ai.ai_addrlen);
ua.len = ai.ai_addrlen;
addrs[addr_count] = ua; addrs[addr_count] = ua;
addr_count++; addr_count++;
ai = ai.ai_next; ai = ai.ai_next;
@ -1044,7 +1047,6 @@ private void passToDNSCallback(DNSLookupID id, scope DNSLookupCallback cb, DNSSt
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private Loop m_loop; private Loop m_loop;

View file

@ -106,6 +106,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst)
{
assert(false, "TODO!");
}
override void setTCPNoDelay(StreamSocketFD socket, bool enable) override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{ {
assert(false, "TODO!"); assert(false, "TODO!");