From a7e5b49943ef5b98a59d35c5426864a4892987ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 26 Jun 2017 23:37:33 +0200 Subject: [PATCH 1/3] Implement basic UDP support for the WinAPI driver. Note that the IOMode.immediate semantics are not compatible with the current 0-udp.d test and will instead call the callback asynchronously. It appears that non-blocking semantics are generally not possible with overlapped sockets. --- source/eventcore/drivers/winapi/sockets.d | 242 ++++++++++++++++++++-- source/eventcore/internal/win32.d | 3 + 2 files changed, 228 insertions(+), 17 deletions(-) diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index d7e6c3d..81f4663 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -14,7 +14,7 @@ private enum WM_USER_SOCKET = WM_USER + 1; final class WinAPIEventDriverSockets : EventDriverSockets { @safe: /*@nogc:*/ nothrow: private { - alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot); + alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DatagramSocketSlot); SocketVector m_sockets; WinAPIEventDriverCore m_core; DWORD m_tid; @@ -371,44 +371,249 @@ final class WinAPIEventDriverSockets : EventDriverSockets { m_core.removeWaiter(); } - override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address) + final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address) { - assert(false, "TODO!"); + auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); } (); + if (fd == INVALID_SOCKET) + return DatagramSocketFD.invalid; + + void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; } + + () @trusted { + if (bind(fd, bind_address.name, bind_address.nameLen) != 0) { + invalidateSocket(); + return; + } + } (); + + if (fd == INVALID_SOCKET) + return DatagramSocketFD.invalid; + + auto sock = adoptDatagramSocket(fd); + + if (target_address !is null) + setTargetAddress(sock, target_address); + + return sock; } - override DatagramSocketFD adoptDatagramSocket(int socket) + final override DatagramSocketFD adoptDatagramSocket(int socket) { - assert(false); + auto fd = DatagramSocketFD(socket); + if (m_sockets[fd].common.refCount) // FD already in use? + return DatagramSocketFD.invalid; + + void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow { + overlapped.Internal = 0; + overlapped.InternalHigh = 0; + overlapped.Offset = 0; + overlapped.OffsetHigh = 0; + overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket]; + } + + initSocketSlot(fd); + with (m_sockets[socket]) { + specific = DatagramSocketSlot.init; + setupOverlapped(datagramSocket.write.overlapped); + setupOverlapped(datagramSocket.read.overlapped); + } + + //() @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE); } (); + + return fd; } - override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) + final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) { - assert(false); + () @trusted { connect(cast(SOCKET)socket, target_address.name, target_address.nameLen); } (); } - override bool setBroadcast(DatagramSocketFD socket, bool enable) + final override bool setBroadcast(DatagramSocketFD socket, bool enable) { - assert(false, "TODO!"); + int tmp_broad = enable; + return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; } - override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish) + override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish) { - assert(false, "TODO!"); + auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); + slot.read.buffer = buffer; + slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } (); + slot.read.wsabuf[0].len = buffer.length; + slot.read.mode = mode; + slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof; + + auto ovl = &slot.read.overlapped; + DWORD flags = 0; + auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, &onIOReceiveCompleted); } (); + if (ret == SOCKET_ERROR) { + auto err = WSAGetLastError(); + if (err != WSA_IO_PENDING) { + on_read_finish(socket, IOStatus.error, 0, null); + return; + } + } + + if (mode == IOMode.immediate) + () @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.read.overlapped); } (); + + slot.read.callback = on_read_finish; + m_core.addWaiter(); } override void cancelReceive(DatagramSocketFD socket) - { - assert(false, "TODO!"); + @trusted { + if (!m_sockets[socket].datagramSocket.read.callback) return; + CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped); + m_sockets[socket].datagramSocket.read.callback = null; + m_core.removeWaiter(); } - override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish) + private static extern(System) nothrow + void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) { - assert(false, "TODO!"); + auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); + + if (!slot.datagramSocket.read.callback) return; + + void invokeCallback(IOStatus status, size_t nsent) + @safe nothrow { + slot.common.core.removeWaiter(); + auto cb = slot.datagramSocket.read.callback; + slot.datagramSocket.read.callback = null; + scope addr = new RefAddress(cast(sockaddr*)&slot.datagramSocket.sourceAddr, slot.datagramSocket.sourceAddrLen); + cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, status == IOStatus.ok ? addr : null); + } + + slot.datagramSocket.read.bytesTransferred += cbTransferred; + slot.datagramSocket.read.buffer = slot.datagramSocket.read.buffer[cbTransferred .. $]; + + if (!dwError && (slot.datagramSocket.read.mode != IOMode.all || !slot.datagramSocket.read.buffer.length)) { + invokeCallback(IOStatus.ok, cbTransferred); + return; + } + + if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) { + invokeCallback(IOStatus.wouldBlock, 0); + return; + } + + if (dwError) { + invokeCallback(IOStatus.error, 0); + return; + } + + slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length; + slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } (); + auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped; + DWORD flags = 0; + auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, &onIOReceiveCompleted); } (); + if (ret == SOCKET_ERROR) { + auto err = WSAGetLastError(); + if (err == WSA_IO_PENDING) { + if (slot.datagramSocket.read.mode == IOMode.immediate) { + invokeCallback(IOStatus.wouldBlock, 0); + } + } else { + invokeCallback(IOStatus.error, 0); + } + } + } + + override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_write_finish) + { + auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); + slot.write.buffer = buffer; + slot.write.wsabuf[0].len = buffer.length; + slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } (); + slot.write.mode = mode; + slot.targetAddr = target_address; + + auto ovl = &slot.write.overlapped; + auto tan = target_address ? target_address.name : null; + auto tal = target_address ? target_address.nameLen : 0; + auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } (); + + if (ret != 0) { + auto err = WSAGetLastError(); + if (err != WSA_IO_PENDING) { + on_write_finish(socket, IOStatus.error, 0, null); + return; + } + } + + if (mode == IOMode.immediate) + () @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.write.overlapped); } (); + + slot.write.callback = on_write_finish; + m_core.addWaiter(); } override void cancelSend(DatagramSocketFD socket) + @trusted { + if (!m_sockets[socket].datagramSocket.write.callback) return; + CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped); + m_sockets[socket].datagramSocket.write.callback = null; + m_core.removeWaiter(); + } + + private static extern(System) nothrow + void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) { - assert(false, "TODO!"); + auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); + + if (!slot.datagramSocket.write.callback) return; + + void invokeCallback(IOStatus status, size_t nsent) + @safe nothrow { + slot.common.core.removeWaiter(); + auto cb = slot.datagramSocket.write.callback; + auto addr = slot.datagramSocket.targetAddr; + slot.datagramSocket.write.callback = null; + slot.datagramSocket.targetAddr = null; + + if (addr) { + scope raddr = new RefAddress(addr.name, addr.nameLen); + cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, raddr); + } else { + cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, null); + } + } + + slot.datagramSocket.write.bytesTransferred += cbTransferred; + slot.datagramSocket.write.buffer = slot.datagramSocket.write.buffer[cbTransferred .. $]; + + if (!dwError && (slot.datagramSocket.write.mode != IOMode.all || !slot.datagramSocket.write.buffer.length)) { + invokeCallback(IOStatus.ok, cbTransferred); + return; + } + + if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) { + invokeCallback(IOStatus.wouldBlock, 0); + return; + } + + if (dwError) { + invokeCallback(IOStatus.error, 0); + return; + } + + slot.datagramSocket.write.wsabuf[0].len = slot.datagramSocket.write.buffer.length; + slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } (); + auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null; + auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0; + auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped; + auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } (); + if (ret == SOCKET_ERROR) { + auto err = WSAGetLastError(); + if (err == WSA_IO_PENDING) { + if (slot.datagramSocket.write.mode == IOMode.immediate) { + invokeCallback(IOStatus.wouldBlock, 0); + } + } else { + invokeCallback(IOStatus.error, 0); + } + } } override void addRef(SocketFD fd) @@ -600,17 +805,20 @@ private struct StreamListenSocketSlot { AcceptCallback acceptCallback; } -private struct DgramSocketSlot { +private struct DatagramSocketSlot { alias Handle = DatagramSocketFD; DgramDirection!true write; DgramDirection!false read; Address targetAddr; + SOCKADDR_STORAGE sourceAddr; + INT sourceAddrLen; } static struct DgramDirection(bool RO) { WSAOVERLAPPEDX overlapped; static if (RO) const(ubyte)[] buffer; else ubyte[] buffer; + WSABUF[1] wsabuf; size_t bytesTransferred; IOMode mode; DatagramIOCallback callback; diff --git a/source/eventcore/internal/win32.d b/source/eventcore/internal/win32.d index a2ba839..9fad47f 100644 --- a/source/eventcore/internal/win32.d +++ b/source/eventcore/internal/win32.d @@ -30,6 +30,7 @@ enum { MAX_PROTOCOL_CHAIN = 7, } +enum WSA_OPERATION_ABORTED = 995; enum WSA_IO_PENDING = 997; struct WSAPROTOCOL_INFOW { @@ -131,7 +132,9 @@ alias LPCONDITIONPROC = void*; alias LPTRANSMIT_FILE_BUFFERS = void*; int WSARecv(SOCKET s, WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesRecvd, DWORD* lpFlags, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine); +int WSARecvFrom(SOCKET s, WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesRecvd, DWORD* lpFlags, SOCKADDR *lpFrom, LPINT lpFromlen, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine); int WSASend(SOCKET s, in WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesSent, DWORD dwFlags, in WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine); +int WSASendTo(SOCKET s, in WSABUF* lpBuffers, DWORD dwBufferCount, DWORD* lpNumberOfBytesSent, DWORD dwFlags, const(SOCKADDR) *lpTo, int iToLenin, WSAOVERLAPPEDX* lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINEX lpCompletionRoutine); int GetAddrInfoExW(LPCWSTR pName, LPCWSTR pServiceName, DWORD dwNameSpace, GUID* lpNspId, const ADDRINFOEXW *pHints, ADDRINFOEXW **ppResult, timeval *timeout, WSAOVERLAPPEDX* lpOverlapped, LPLOOKUPSERVICE_COMPLETION_ROUTINE lpCompletionRoutine, HANDLE* lpNameHandle); @nogc: From c55eeaf4767df42e86a0d3e5d974e93657ae0fff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 26 Jun 2017 23:41:38 +0200 Subject: [PATCH 2/3] Update development status table. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 637b725..c5c1966 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ Driver development status Feature | SelectEventDriver | EpollEventDriver | WinAPIEventDriver | KqueueEventDriver -----------------|-------------------|------------------|-------------------|------------------ TCP Sockets | yes | yes | yes | yes -UDP Sockets | yes | yes | — | yes +UDP Sockets | yes | yes | yes | yes USDS | yes | yes | — | yes DNS | yes | yes | yes | yes Timers | yes | yes | yes | yes From 077c643e0890ce0d82376531c169c945824f39aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 27 Jun 2017 00:54:32 +0200 Subject: [PATCH 3/3] Fix 64-bit compile error. --- source/eventcore/drivers/winapi/sockets.d | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 81f4663..de00d5f 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -389,7 +389,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { if (fd == INVALID_SOCKET) return DatagramSocketFD.invalid; - auto sock = adoptDatagramSocket(fd); + auto sock = adoptDatagramSocketInternal(fd); if (target_address !is null) setTargetAddress(sock, target_address); @@ -398,6 +398,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } final override DatagramSocketFD adoptDatagramSocket(int socket) + { + return adoptDatagramSocketInternal(socket); + } + + private DatagramSocketFD adoptDatagramSocketInternal(SOCKET socket) { auto fd = DatagramSocketFD(socket); if (m_sockets[fd].common.refCount) // FD already in use?