Call Windows IO callbacks outside of completion routines.

Calling WinSock functions from inside of a completion routine results in undefined behavior, because the completion routine may be triggered within another WinSock function that enters an alertable wait state. For this reason, none of the callbacks that are triggered by overlapped I/O may be invoked directly from a completion routine.

To solve this, a ConsumableQueue is filled with all completion events that occur and is processed after each MsgWaitForMultipleObjectsEx call.
This commit is contained in:
Sönke Ludwig 2018-03-10 22:59:21 +01:00
parent cdbd8b58e8
commit 0e1d74cc41
5 changed files with 82 additions and 39 deletions

View file

@ -4,6 +4,7 @@ version (Windows):
import eventcore.driver;
import eventcore.drivers.timer;
import eventcore.internal.consumablequeue;
import eventcore.internal.win32;
import core.time : Duration;
import taggedalgebraic;
@ -19,6 +20,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
HANDLE[] m_registeredEvents;
void delegate() @safe nothrow[HANDLE] m_eventCallbacks;
HANDLE m_fileCompletionEvent;
ConsumableQueue!IOEvent m_ioEvents;
}
package {
@ -31,6 +33,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_tid = () @trusted { return GetCurrentThreadId(); } ();
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
registerEvent(m_fileCompletionEvent);
m_ioEvents = new ConsumableQueue!IOEvent;
}
override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; }
@ -116,6 +119,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr,
timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } ();
foreach (evt; m_ioEvents.consume)
evt.process(evt.error, evt.bytesTransferred, evt.overlapped);
if (ret == WAIT_IO_COMPLETION) got_event = true;
else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) {
if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) {
@ -241,14 +247,34 @@ package struct FileSlot {
package struct WatcherSlot {
ubyte[] buffer;
OVERLAPPED overlapped;
OVERLAPPED_CORE overlapped;
string directory;
bool recursive;
FileChangesCallback callback;
}
package struct OVERLAPPED_FILE {
package struct OVERLAPPED_CORE {
OVERLAPPED overlapped;
alias overlapped this;
WinAPIEventDriverCore driver;
}
package struct OVERLAPPED_FILE {
OVERLAPPED_CORE core;
alias core this;
FileFD handle;
}
package struct IOEvent {
void function(DWORD err, DWORD bts, OVERLAPPED_CORE*) @safe nothrow process;
DWORD error;
DWORD bytesTransferred;
OVERLAPPED_CORE* overlapped;
}
package extern(System) @system nothrow
void overlappedIOHandler(alias process, EXTRA...)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped, EXTRA extra)
{
auto overlapped = cast(OVERLAPPED_CORE*)_overlapped;
overlapped.driver.m_ioEvents.put(IOEvent(&process, error, bytes_transferred, overlapped));
}

View file

@ -165,7 +165,8 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
}
auto nbytes = min(slot.buffer.length, DWORD.max);
if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, &onIOFinished!(fun, RO)); } ()) {
auto handler = &overlappedIOHandler!(onIOFinished!(fun, RO));
if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, handler); } ()) {
slot.overlapped.driver.removeWaiter();
slot.invokeCallback(IOStatus.error, slot.bytesTransferred);
}
@ -181,8 +182,8 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
}
}
private static extern(Windows)
void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped)
private static nothrow
void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED_CORE* _overlapped)
{
auto ctx = () @trusted { return cast(OVERLAPPED_FILE*)_overlapped; } ();
FileFD id = ctx.handle;

View file

@ -111,12 +111,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
//uint enable = 1;
//() @trusted { ioctlsocket(socket, FIONBIO, &enable); } ();
void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow {
void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
overlapped.Internal = 0;
overlapped.InternalHigh = 0;
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
overlapped.driver = m_core;
}
initSocketSlot(fd);
@ -226,9 +227,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.read.wsabuf[0].len = buffer.length;
slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } ();
auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped;
auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -247,8 +249,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
}
private static extern(System) nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -278,9 +280,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.read.wsabuf[0].len = slot.streamSocket.read.buffer.length;
slot.streamSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.read.buffer.ptr; } ();
auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped;
auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -301,8 +304,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.write.wsabuf[0].len = buffer.length;
slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } ();
auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped;
auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } ();
auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -320,8 +324,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.addWaiter();
}
private static extern(System) nothrow
void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -351,8 +355,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.write.wsabuf[0].len = slot.streamSocket.write.buffer.length;
slot.streamSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.write.buffer.ptr; } ();
auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped;
auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } ();
auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -431,12 +436,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
if (m_sockets[fd].common.refCount) // FD already in use?
return DatagramSocketFD.invalid;
void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow {
void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
overlapped.Internal = 0;
overlapped.InternalHigh = 0;
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
overlapped.driver = m_core;
}
initSocketSlot(fd);
@ -500,9 +506,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.read.mode = mode;
slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof;
auto ovl = &slot.read.overlapped;
auto ovl = &slot.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err != WSA_IO_PENDING) {
@ -527,8 +534,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.removeWaiter();
}
private static extern(System) nothrow
void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -564,9 +571,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length;
slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } ();
auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped;
auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -588,10 +596,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.write.mode = mode;
slot.targetAddr = target_address;
auto ovl = &slot.write.overlapped;
auto ovl = &slot.write.overlapped.overlapped;
auto tan = target_address ? target_address.name : null;
auto tal = target_address ? target_address.nameLen : 0;
auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
if (ret != 0) {
auto err = WSAGetLastError();
@ -617,8 +626,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.removeWaiter();
}
private static extern(System) nothrow
void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -664,8 +673,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } ();
auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null;
auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0;
auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped;
auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -878,7 +888,7 @@ private struct StreamSocketSlot {
}
static struct StreamDirection(bool RO) {
WSAOVERLAPPEDX overlapped;
OVERLAPPED_CORE overlapped;
static if (RO) const(ubyte)[] buffer;
else ubyte[] buffer;
WSABUF[1] wsabuf;
@ -902,7 +912,7 @@ private struct DatagramSocketSlot {
}
static struct DgramDirection(bool RO) {
WSAOVERLAPPEDX overlapped;
OVERLAPPED_CORE overlapped;
static if (RO) const(ubyte)[] buffer;
else ubyte[] buffer;
WSABUF[1] wsabuf;

View file

@ -41,6 +41,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
slot.directory = path;
slot.recursive = recursive;
slot.callback = callback;
slot.overlapped.driver = m_core;
slot.buffer = () @trusted {
try return theAllocator.makeArray!ubyte(16384);
catch (Exception e) assert(false, "Failed to allocate directory watcher buffer.");
@ -91,7 +92,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
// the current wait operation. Simply cancel the I/O to let the
// completion callback
if (slot.refCount == 1) {
() @trusted { CancelIoEx(handle, &slot.watcher.overlapped); } ();
() @trusted { CancelIoEx(handle, &slot.watcher.overlapped.overlapped); } ();
slot.watcher.callback = null;
core.removeWaiter();
}
@ -99,8 +100,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
return true;
}
private static nothrow extern(System)
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
private static nothrow
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* overlapped)
{
import std.conv : to;
import std.file : isDir;
@ -176,9 +177,10 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
slot.overlapped.hEvent = handle;
BOOL ret;
auto handler = &overlappedIOHandler!onIOCompleted;
() @trusted {
ret = ReadDirectoryChangesW(handle, slot.buffer.ptr, cast(DWORD)slot.buffer.length, slot.recursive,
notifications, null, &slot.overlapped, &onIOCompleted);
notifications, null, &slot.overlapped.overlapped, handler);
} ();
if (!ret) {

View file

@ -30,6 +30,8 @@ enum {
MAX_PROTOCOL_CHAIN = 7,
}
enum WSAEDISCON = 10101;
enum WSA_OPERATION_ABORTED = 995;
enum WSA_IO_PENDING = 997;
@ -153,7 +155,7 @@ void freeaddrinfo(ADDRINFOA* ai);
BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags);
BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped);
struct WSAOVERLAPPEDX {
/*struct WSAOVERLAPPEDX {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
@ -164,4 +166,6 @@ struct WSAOVERLAPPEDX {
PVOID Pointer;
}
HANDLE hEvent;
}
}*/
alias WSAOVERLAPPEDX = OVERLAPPED;