Implement basic UDP support for the WinAPI driver.

Note that the IOMode.immediate semantics are not compatible with the current 0-udp.d test and will instead call the callback asynchronously. It appears that non-blocking semantics are generally not possible with overlapped sockets.
This commit is contained in:
Sönke Ludwig 2017-06-26 23:37:33 +02:00
parent a263637bd3
commit a7e5b49943
2 changed files with 228 additions and 17 deletions

View file

@ -14,7 +14,7 @@ private enum WM_USER_SOCKET = WM_USER + 1;
final class WinAPIEventDriverSockets : EventDriverSockets {
@safe: /*@nogc:*/ nothrow:
private {
alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot);
alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DatagramSocketSlot);
SocketVector m_sockets;
WinAPIEventDriverCore m_core;
DWORD m_tid;
@ -371,44 +371,249 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.removeWaiter();
}
override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
{
assert(false, "TODO!");
auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); } ();
if (fd == INVALID_SOCKET)
return DatagramSocketFD.invalid;
void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; }
() @trusted {
if (bind(fd, bind_address.name, bind_address.nameLen) != 0) {
invalidateSocket();
return;
}
} ();
if (fd == INVALID_SOCKET)
return DatagramSocketFD.invalid;
auto sock = adoptDatagramSocket(fd);
if (target_address !is null)
setTargetAddress(sock, target_address);
return sock;
}
override DatagramSocketFD adoptDatagramSocket(int socket)
final override DatagramSocketFD adoptDatagramSocket(int socket)
{
assert(false);
auto fd = DatagramSocketFD(socket);
if (m_sockets[fd].common.refCount) // FD already in use?
return DatagramSocketFD.invalid;
void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow {
overlapped.Internal = 0;
overlapped.InternalHigh = 0;
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
}
initSocketSlot(fd);
with (m_sockets[socket]) {
specific = DatagramSocketSlot.init;
setupOverlapped(datagramSocket.write.overlapped);
setupOverlapped(datagramSocket.read.overlapped);
}
//() @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE); } ();
return fd;
}
override void setTargetAddress(DatagramSocketFD socket, scope Address target_address)
final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address)
{
assert(false);
() @trusted { connect(cast(SOCKET)socket, target_address.name, target_address.nameLen); } ();
}
override bool setBroadcast(DatagramSocketFD socket, bool enable)
final override bool setBroadcast(DatagramSocketFD socket, bool enable)
{
assert(false, "TODO!");
int tmp_broad = enable;
return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
}
override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish)
override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish)
{
assert(false, "TODO!");
auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } ();
slot.read.buffer = buffer;
slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } ();
slot.read.wsabuf[0].len = buffer.length;
slot.read.mode = mode;
slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof;
auto ovl = &slot.read.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err != WSA_IO_PENDING) {
on_read_finish(socket, IOStatus.error, 0, null);
return;
}
}
if (mode == IOMode.immediate)
() @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.read.overlapped); } ();
slot.read.callback = on_read_finish;
m_core.addWaiter();
}
override void cancelReceive(DatagramSocketFD socket)
{
assert(false, "TODO!");
@trusted {
if (!m_sockets[socket].datagramSocket.read.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped);
m_sockets[socket].datagramSocket.read.callback = null;
m_core.removeWaiter();
}
override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish)
private static extern(System) nothrow
void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
{
assert(false, "TODO!");
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
if (!slot.datagramSocket.read.callback) return;
void invokeCallback(IOStatus status, size_t nsent)
@safe nothrow {
slot.common.core.removeWaiter();
auto cb = slot.datagramSocket.read.callback;
slot.datagramSocket.read.callback = null;
scope addr = new RefAddress(cast(sockaddr*)&slot.datagramSocket.sourceAddr, slot.datagramSocket.sourceAddrLen);
cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, status == IOStatus.ok ? addr : null);
}
slot.datagramSocket.read.bytesTransferred += cbTransferred;
slot.datagramSocket.read.buffer = slot.datagramSocket.read.buffer[cbTransferred .. $];
if (!dwError && (slot.datagramSocket.read.mode != IOMode.all || !slot.datagramSocket.read.buffer.length)) {
invokeCallback(IOStatus.ok, cbTransferred);
return;
}
if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) {
invokeCallback(IOStatus.wouldBlock, 0);
return;
}
if (dwError) {
invokeCallback(IOStatus.error, 0);
return;
}
slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length;
slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } ();
auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
if (slot.datagramSocket.read.mode == IOMode.immediate) {
invokeCallback(IOStatus.wouldBlock, 0);
}
} else {
invokeCallback(IOStatus.error, 0);
}
}
}
override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_write_finish)
{
auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } ();
slot.write.buffer = buffer;
slot.write.wsabuf[0].len = buffer.length;
slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } ();
slot.write.mode = mode;
slot.targetAddr = target_address;
auto ovl = &slot.write.overlapped;
auto tan = target_address ? target_address.name : null;
auto tal = target_address ? target_address.nameLen : 0;
auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
if (ret != 0) {
auto err = WSAGetLastError();
if (err != WSA_IO_PENDING) {
on_write_finish(socket, IOStatus.error, 0, null);
return;
}
}
if (mode == IOMode.immediate)
() @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.write.overlapped); } ();
slot.write.callback = on_write_finish;
m_core.addWaiter();
}
override void cancelSend(DatagramSocketFD socket)
@trusted {
if (!m_sockets[socket].datagramSocket.write.callback) return;
CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped);
m_sockets[socket].datagramSocket.write.callback = null;
m_core.removeWaiter();
}
private static extern(System) nothrow
void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
{
assert(false, "TODO!");
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
if (!slot.datagramSocket.write.callback) return;
void invokeCallback(IOStatus status, size_t nsent)
@safe nothrow {
slot.common.core.removeWaiter();
auto cb = slot.datagramSocket.write.callback;
auto addr = slot.datagramSocket.targetAddr;
slot.datagramSocket.write.callback = null;
slot.datagramSocket.targetAddr = null;
if (addr) {
scope raddr = new RefAddress(addr.name, addr.nameLen);
cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, raddr);
} else {
cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, null);
}
}
slot.datagramSocket.write.bytesTransferred += cbTransferred;
slot.datagramSocket.write.buffer = slot.datagramSocket.write.buffer[cbTransferred .. $];
if (!dwError && (slot.datagramSocket.write.mode != IOMode.all || !slot.datagramSocket.write.buffer.length)) {
invokeCallback(IOStatus.ok, cbTransferred);
return;
}
if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) {
invokeCallback(IOStatus.wouldBlock, 0);
return;
}
if (dwError) {
invokeCallback(IOStatus.error, 0);
return;
}
slot.datagramSocket.write.wsabuf[0].len = slot.datagramSocket.write.buffer.length;
slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } ();
auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null;
auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0;
auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped;
auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
if (slot.datagramSocket.write.mode == IOMode.immediate) {
invokeCallback(IOStatus.wouldBlock, 0);
}
} else {
invokeCallback(IOStatus.error, 0);
}
}
}
override void addRef(SocketFD fd)
@ -600,17 +805,20 @@ private struct StreamListenSocketSlot {
AcceptCallback acceptCallback;
}
private struct DgramSocketSlot {
private struct DatagramSocketSlot {
alias Handle = DatagramSocketFD;
DgramDirection!true write;
DgramDirection!false read;
Address targetAddr;
SOCKADDR_STORAGE sourceAddr;
INT sourceAddrLen;
}
static struct DgramDirection(bool RO) {
WSAOVERLAPPEDX overlapped;
static if (RO) const(ubyte)[] buffer;
else ubyte[] buffer;
WSABUF[1] wsabuf;
size_t bytesTransferred;
IOMode mode;
DatagramIOCallback callback;

View file

@ -30,6 +30,7 @@ enum {
MAX_PROTOCOL_CHAIN = 7,
}
enum WSA_OPERATION_ABORTED = 995;
enum WSA_IO_PENDING = 997;
struct WSAPROTOCOL_INFOW {
@ -131,7 +132,9 @@ alias LPCONDITIONPROC = void*;
alias LPTRANSMIT_FILE_BUFFERS = void*;
int WSARecv(SOCKET s, WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesRecvd, DWORD* lpFlags, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine);
int WSARecvFrom(SOCKET s, WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesRecvd, DWORD* lpFlags, SOCKADDR *lpFrom, LPINT lpFromlen, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine);
int WSASend(SOCKET s, in WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesSent, DWORD dwFlags, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine);
int WSASendTo(SOCKET s, in WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesSent, DWORD dwFlags, const(SOCKADDR) *lpTo, int iToLenin, WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine);
int GetAddrInfoExW(LPCWSTR pName, LPCWSTR pServiceName, DWORD dwNameSpace, GUID* lpNspId, const ADDRINFOEXW *pHints, ADDRINFOEXW **ppResult, timeval *timeout, WSAOVERLAPPEDX* lpOverlapped, LPLOOKUPSERVICE_COMPLETION_ROUTINE lpCompletionRoutine, HANDLE* lpNameHandle);
@nogc: