diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index b3f4dde..ec5e2c5 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -4,6 +4,7 @@ version (Windows): import eventcore.driver; import eventcore.drivers.timer; +import eventcore.internal.consumablequeue; import eventcore.internal.win32; import core.time : Duration; import taggedalgebraic; @@ -19,6 +20,7 @@ final class WinAPIEventDriverCore : EventDriverCore { HANDLE[] m_registeredEvents; void delegate() @safe nothrow[HANDLE] m_eventCallbacks; HANDLE m_fileCompletionEvent; + ConsumableQueue!IOEvent m_ioEvents; } package { @@ -31,6 +33,7 @@ final class WinAPIEventDriverCore : EventDriverCore { m_tid = () @trusted { return GetCurrentThreadId(); } (); m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } (); registerEvent(m_fileCompletionEvent); + m_ioEvents = new ConsumableQueue!IOEvent; } override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; } @@ -116,6 +119,9 @@ final class WinAPIEventDriverCore : EventDriverCore { auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); + foreach (evt; m_ioEvents.consume) + evt.process(evt.error, evt.bytesTransferred, evt.overlapped); + if (ret == WAIT_IO_COMPLETION) got_event = true; else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) { @@ -241,14 +247,34 @@ package struct FileSlot { package struct WatcherSlot { ubyte[] buffer; - OVERLAPPED overlapped; + OVERLAPPED_CORE overlapped; string directory; bool recursive; FileChangesCallback callback; } -package struct OVERLAPPED_FILE { +package struct OVERLAPPED_CORE { OVERLAPPED overlapped; + alias overlapped this; WinAPIEventDriverCore driver; +} + +package struct OVERLAPPED_FILE { + OVERLAPPED_CORE core; + alias core this; FileFD handle; } + +package struct IOEvent { + void function(DWORD err, DWORD bts, OVERLAPPED_CORE*) @safe nothrow process; + DWORD error; + DWORD bytesTransferred; + OVERLAPPED_CORE* overlapped; +} + +package extern(System) @system nothrow +void overlappedIOHandler(alias process, EXTRA...)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped, EXTRA extra) +{ + auto overlapped = cast(OVERLAPPED_CORE*)_overlapped; + overlapped.driver.m_ioEvents.put(IOEvent(&process, error, bytes_transferred, overlapped)); +} diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index 5f42f8d..720aa7b 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -165,7 +165,8 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } auto nbytes = min(slot.buffer.length, DWORD.max); - if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, &onIOFinished!(fun, RO)); } ()) { + auto handler = &overlappedIOHandler!(onIOFinished!(fun, RO)); + if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, handler); } ()) { slot.overlapped.driver.removeWaiter(); slot.invokeCallback(IOStatus.error, slot.bytesTransferred); } @@ -181,8 +182,8 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } } - private static extern(Windows) - void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped) + private static nothrow + void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED_CORE* _overlapped) { auto ctx = () @trusted { return cast(OVERLAPPED_FILE*)_overlapped; } (); FileFD id = ctx.handle; diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index ebf6e9e..22ed186 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -111,12 +111,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets { //uint enable = 1; //() @trusted { ioctlsocket(socket, FIONBIO, &enable); } (); - void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow { + void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow { overlapped.Internal = 0; overlapped.InternalHigh = 0; overlapped.Offset = 0; overlapped.OffsetHigh = 0; overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket]; + overlapped.driver = m_core; } initSocketSlot(fd); @@ -226,9 +227,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.read.wsabuf[0].len = buffer.length; slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } (); - auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped; + auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped.overlapped; DWORD flags = 0; - auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } (); + auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD); + auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -247,8 +249,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } - private static extern(System) nothrow - void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) + private static nothrow + void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) { auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); @@ -278,9 +280,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.streamSocket.read.wsabuf[0].len = slot.streamSocket.read.buffer.length; slot.streamSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.read.buffer.ptr; } (); - auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped; + auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped.overlapped; DWORD flags = 0; - auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } (); + auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD); + auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -301,8 +304,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.write.wsabuf[0].len = buffer.length; slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } (); - auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped; - auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } (); + auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped.overlapped; + auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD); + auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -320,8 +324,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { m_core.addWaiter(); } - private static extern(System) nothrow - void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) + private static nothrow + void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) { auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); @@ -351,8 +355,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.streamSocket.write.wsabuf[0].len = slot.streamSocket.write.buffer.length; slot.streamSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.write.buffer.ptr; } (); - auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped; - auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } (); + auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped.overlapped; + auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD); + auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -431,12 +436,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets { if (m_sockets[fd].common.refCount) // FD already in use? return DatagramSocketFD.invalid; - void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow { + void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow { overlapped.Internal = 0; overlapped.InternalHigh = 0; overlapped.Offset = 0; overlapped.OffsetHigh = 0; overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket]; + overlapped.driver = m_core; } initSocketSlot(fd); @@ -500,9 +506,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.read.mode = mode; slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof; - auto ovl = &slot.read.overlapped; + auto ovl = &slot.read.overlapped.overlapped; DWORD flags = 0; - auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, &onIOReceiveCompleted); } (); + auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD); + auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err != WSA_IO_PENDING) { @@ -527,8 +534,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { m_core.removeWaiter(); } - private static extern(System) nothrow - void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) + private static nothrow + void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) { auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); @@ -564,9 +571,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length; slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } (); - auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped; + auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped.overlapped; DWORD flags = 0; - auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, &onIOReceiveCompleted); } (); + auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD); + auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -588,10 +596,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.write.mode = mode; slot.targetAddr = target_address; - auto ovl = &slot.write.overlapped; + auto ovl = &slot.write.overlapped.overlapped; auto tan = target_address ? target_address.name : null; auto tal = target_address ? target_address.nameLen : 0; - auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } (); + auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD); + auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } (); if (ret != 0) { auto err = WSAGetLastError(); @@ -617,8 +626,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { m_core.removeWaiter(); } - private static extern(System) nothrow - void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) + private static nothrow + void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) { auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); @@ -664,8 +673,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } (); auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null; auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0; - auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped; - auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } (); + auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped.overlapped; + auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD); + auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } (); if (ret == SOCKET_ERROR) { auto err = WSAGetLastError(); if (err == WSA_IO_PENDING) { @@ -878,7 +888,7 @@ private struct StreamSocketSlot { } static struct StreamDirection(bool RO) { - WSAOVERLAPPEDX overlapped; + OVERLAPPED_CORE overlapped; static if (RO) const(ubyte)[] buffer; else ubyte[] buffer; WSABUF[1] wsabuf; @@ -902,7 +912,7 @@ private struct DatagramSocketSlot { } static struct DgramDirection(bool RO) { - WSAOVERLAPPEDX overlapped; + OVERLAPPED_CORE overlapped; static if (RO) const(ubyte)[] buffer; else ubyte[] buffer; WSABUF[1] wsabuf; diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index b4e9467..524af9d 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -41,6 +41,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { slot.directory = path; slot.recursive = recursive; slot.callback = callback; + slot.overlapped.driver = m_core; slot.buffer = () @trusted { try return theAllocator.makeArray!ubyte(16384); catch (Exception e) assert(false, "Failed to allocate directory watcher buffer."); @@ -91,7 +92,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { // the current wait operation. Simply cancel the I/O to let the // completion callback if (slot.refCount == 1) { - () @trusted { CancelIoEx(handle, &slot.watcher.overlapped); } (); + () @trusted { CancelIoEx(handle, &slot.watcher.overlapped.overlapped); } (); slot.watcher.callback = null; core.removeWaiter(); } @@ -99,8 +100,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { return true; } - private static nothrow extern(System) - void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) + private static nothrow + void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* overlapped) { import std.conv : to; import std.file : isDir; @@ -176,9 +177,10 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { slot.overlapped.hEvent = handle; BOOL ret; + auto handler = &overlappedIOHandler!onIOCompleted; () @trusted { ret = ReadDirectoryChangesW(handle, slot.buffer.ptr, cast(DWORD)slot.buffer.length, slot.recursive, - notifications, null, &slot.overlapped, &onIOCompleted); + notifications, null, &slot.overlapped.overlapped, handler); } (); if (!ret) { diff --git a/source/eventcore/internal/win32.d b/source/eventcore/internal/win32.d index 9fad47f..af068b3 100644 --- a/source/eventcore/internal/win32.d +++ b/source/eventcore/internal/win32.d @@ -30,6 +30,8 @@ enum { MAX_PROTOCOL_CHAIN = 7, } +enum WSAEDISCON = 10101; + enum WSA_OPERATION_ABORTED = 995; enum WSA_IO_PENDING = 997; @@ -153,7 +155,7 @@ void freeaddrinfo(ADDRINFOA* ai); BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags); BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped); -struct WSAOVERLAPPEDX { +/*struct WSAOVERLAPPEDX { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { @@ -164,4 +166,6 @@ struct WSAOVERLAPPEDX { PVOID Pointer; } HANDLE hEvent; -} +}*/ + +alias WSAOVERLAPPEDX = OVERLAPPED;