Merge pull request #57 from vibe-d/fix_windows_completion_routines

Call Windows IO callbacks outside of completion routines.
This commit is contained in:
Sönke Ludwig 2018-03-11 12:03:13 +01:00 committed by GitHub
commit acc35e1107
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 52 deletions

View file

@ -4,6 +4,7 @@ version (Windows):
import eventcore.driver;
import eventcore.drivers.timer;
import eventcore.internal.consumablequeue;
import eventcore.internal.win32;
import core.time : Duration;
import taggedalgebraic;
@ -19,6 +20,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
HANDLE[] m_registeredEvents;
void delegate() @safe nothrow[HANDLE] m_eventCallbacks;
HANDLE m_fileCompletionEvent;
ConsumableQueue!IOEvent m_ioEvents;
}
package {
@ -31,6 +33,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_tid = () @trusted { return GetCurrentThreadId(); } ();
m_fileCompletionEvent = () @trusted { return CreateEventW(null, false, false, null); } ();
registerEvent(m_fileCompletionEvent);
m_ioEvents = new ConsumableQueue!IOEvent;
}
override size_t waiterCount() { return m_waiterCount + m_timers.pendingCount; }
@ -116,6 +119,9 @@ final class WinAPIEventDriverCore : EventDriverCore {
auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr,
timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } ();
foreach (evt; m_ioEvents.consume)
evt.process(evt.error, evt.bytesTransferred, evt.overlapped);
if (ret == WAIT_IO_COMPLETION) got_event = true;
else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) {
if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) {
@ -219,7 +225,7 @@ private struct HandleSlot {
package struct FileSlot {
static struct Direction(bool RO) {
OVERLAPPED_FILE overlapped;
OVERLAPPED_CORE overlapped;
FileIOCallback callback;
ulong offset;
size_t bytesTransferred;
@ -232,7 +238,7 @@ package struct FileSlot {
auto cb = this.callback;
this.callback = null;
assert(cb !is null);
cb(overlapped.handle, status, bytes_transferred);
cb(cast(FileFD)cast(size_t)overlapped.hEvent, status, bytes_transferred);
}
}
Direction!false read;
@ -241,14 +247,28 @@ package struct FileSlot {
package struct WatcherSlot {
ubyte[] buffer;
OVERLAPPED overlapped;
OVERLAPPED_CORE overlapped;
string directory;
bool recursive;
FileChangesCallback callback;
}
package struct OVERLAPPED_FILE {
package struct OVERLAPPED_CORE {
OVERLAPPED overlapped;
alias overlapped this;
WinAPIEventDriverCore driver;
FileFD handle;
}
package struct IOEvent {
void function(DWORD err, DWORD bts, OVERLAPPED_CORE*) @safe nothrow process;
DWORD error;
DWORD bytesTransferred;
OVERLAPPED_CORE* overlapped;
}
package extern(System) @system nothrow
void overlappedIOHandler(alias process, EXTRA...)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped, EXTRA extra)
{
auto overlapped = cast(OVERLAPPED_CORE*)_overlapped;
overlapped.driver.m_ioEvents.put(IOEvent(&process, error, bytes_transferred, overlapped));
}

View file

@ -64,9 +64,9 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
auto s = m_core.setupSlot!FileSlot(handle);
s.read.overlapped.driver = m_core;
s.read.overlapped.handle = FileFD(cast(size_t)handle);
s.read.overlapped.hEvent = handle;
s.write.overlapped.driver = m_core;
s.write.overlapped.handle = FileFD(cast(size_t)handle);
s.write.overlapped.hEvent = handle;
return FileFD(cast(size_t)handle);
}
@ -75,9 +75,9 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
{
auto h = idToHandle(file);
auto slot = () @trusted { return &m_core.m_handles[h].file(); } ();
if (slot.read.overlapped.handle != FileFD.invalid) {
if (slot.read.overlapped.hEvent != INVALID_HANDLE_VALUE) {
CloseHandle(h);
slot.read.overlapped.handle = slot.write.overlapped.handle = FileFD.invalid;
slot.read.overlapped.hEvent = slot.write.overlapped.hEvent = INVALID_HANDLE_VALUE;
}
}
@ -165,7 +165,8 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
}
auto nbytes = min(slot.buffer.length, DWORD.max);
if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, &onIOFinished!(fun, RO)); } ()) {
auto handler = &overlappedIOHandler!(onIOFinished!(fun, RO));
if (!() @trusted { return fun(h, &slot.buffer[0], nbytes, &slot.overlapped.overlapped, handler); } ()) {
slot.overlapped.driver.removeWaiter();
slot.invokeCallback(IOStatus.error, slot.bytesTransferred);
}
@ -181,16 +182,15 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
}
}
private static extern(Windows)
void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED* _overlapped)
private static nothrow
void onIOFinished(alias fun, bool RO)(DWORD error, DWORD bytes_transferred, OVERLAPPED_CORE* overlapped)
{
auto ctx = () @trusted { return cast(OVERLAPPED_FILE*)_overlapped; } ();
FileFD id = ctx.handle;
FileFD id = cast(FileFD)cast(size_t)overlapped.hEvent;
auto handle = idToHandle(id);
static if (RO)
auto slot = () @trusted { return &ctx.driver.m_handles[handle].file.write; } ();
auto slot = () @trusted { return &overlapped.driver.m_handles[handle].file.write; } ();
else
auto slot = () @trusted { return &ctx.driver.m_handles[handle].file.read; } ();
auto slot = () @trusted { return &overlapped.driver.m_handles[handle].file.read; } ();
assert(slot !is null);
if (!slot.callback) {
@ -199,7 +199,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
}
if (error != 0) {
ctx.driver.removeWaiter();
overlapped.driver.removeWaiter();
slot.invokeCallback(IOStatus.error, slot.bytesTransferred + bytes_transferred);
return;
}
@ -208,7 +208,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
slot.offset += bytes_transferred;
if (slot.bytesTransferred >= slot.buffer.length || slot.mode != IOMode.all) {
ctx.driver.removeWaiter();
overlapped.driver.removeWaiter();
slot.invokeCallback(IOStatus.ok, slot.bytesTransferred);
} else {
startIO!(fun, RO)(handle, slot);

View file

@ -111,12 +111,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
//uint enable = 1;
//() @trusted { ioctlsocket(socket, FIONBIO, &enable); } ();
void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow {
void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
overlapped.Internal = 0;
overlapped.InternalHigh = 0;
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
overlapped.driver = m_core;
}
initSocketSlot(fd);
@ -226,9 +227,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.read.wsabuf[0].len = buffer.length;
slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } ();
auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped;
auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -247,8 +249,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
}
private static extern(System) nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -278,9 +280,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.read.wsabuf[0].len = slot.streamSocket.read.buffer.length;
slot.streamSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.read.buffer.ptr; } ();
auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped;
auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, &onIOReadCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -301,8 +304,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.write.wsabuf[0].len = buffer.length;
slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } ();
auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped;
auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } ();
auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -320,8 +324,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.addWaiter();
}
private static extern(System) nothrow
void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -351,8 +355,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.write.wsabuf[0].len = slot.streamSocket.write.buffer.length;
slot.streamSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.write.buffer.ptr; } ();
auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped;
auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, &onIOWriteCompleted); } ();
auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -431,12 +436,13 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
if (m_sockets[fd].common.refCount) // FD already in use?
return DatagramSocketFD.invalid;
void setupOverlapped(ref WSAOVERLAPPEDX overlapped) @trusted @nogc nothrow {
void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
overlapped.Internal = 0;
overlapped.InternalHigh = 0;
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
overlapped.driver = m_core;
}
initSocketSlot(fd);
@ -500,9 +506,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.read.mode = mode;
slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof;
auto ovl = &slot.read.overlapped;
auto ovl = &slot.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err != WSA_IO_PENDING) {
@ -527,8 +534,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.removeWaiter();
}
private static extern(System) nothrow
void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -564,9 +571,10 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length;
slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } ();
auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped;
auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped.overlapped;
DWORD flags = 0;
auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, &onIOReceiveCompleted); } ();
auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -588,10 +596,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.write.mode = mode;
slot.targetAddr = target_address;
auto ovl = &slot.write.overlapped;
auto ovl = &slot.write.overlapped.overlapped;
auto tan = target_address ? target_address.name : null;
auto tal = target_address ? target_address.nameLen : 0;
auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
if (ret != 0) {
auto err = WSAGetLastError();
@ -617,8 +626,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.removeWaiter();
}
private static extern(System) nothrow
void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
private static nothrow
void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{
auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
@ -664,8 +673,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } ();
auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null;
auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0;
auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped;
auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, &onIOSendCompleted); } ();
auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped.overlapped;
auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
if (ret == SOCKET_ERROR) {
auto err = WSAGetLastError();
if (err == WSA_IO_PENDING) {
@ -878,7 +888,7 @@ private struct StreamSocketSlot {
}
static struct StreamDirection(bool RO) {
WSAOVERLAPPEDX overlapped;
OVERLAPPED_CORE overlapped;
static if (RO) const(ubyte)[] buffer;
else ubyte[] buffer;
WSABUF[1] wsabuf;
@ -902,7 +912,7 @@ private struct DatagramSocketSlot {
}
static struct DgramDirection(bool RO) {
WSAOVERLAPPEDX overlapped;
OVERLAPPED_CORE overlapped;
static if (RO) const(ubyte)[] buffer;
else ubyte[] buffer;
WSABUF[1] wsabuf;

View file

@ -41,6 +41,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
slot.directory = path;
slot.recursive = recursive;
slot.callback = callback;
slot.overlapped.driver = m_core;
slot.buffer = () @trusted {
try return theAllocator.makeArray!ubyte(16384);
catch (Exception e) assert(false, "Failed to allocate directory watcher buffer.");
@ -91,7 +92,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
// the current wait operation. Simply cancel the I/O to let the
// completion callback
if (slot.refCount == 1) {
() @trusted { CancelIoEx(handle, &slot.watcher.overlapped); } ();
() @trusted { CancelIoEx(handle, &slot.watcher.overlapped.overlapped); } ();
slot.watcher.callback = null;
core.removeWaiter();
}
@ -99,8 +100,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
return true;
}
private static nothrow extern(System)
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
private static nothrow
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* overlapped)
{
import std.conv : to;
import std.file : isDir;
@ -176,9 +177,10 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
slot.overlapped.hEvent = handle;
BOOL ret;
auto handler = &overlappedIOHandler!onIOCompleted;
() @trusted {
ret = ReadDirectoryChangesW(handle, slot.buffer.ptr, cast(DWORD)slot.buffer.length, slot.recursive,
notifications, null, &slot.overlapped, &onIOCompleted);
notifications, null, &slot.overlapped.overlapped, handler);
} ();
if (!ret) {

View file

@ -30,6 +30,8 @@ enum {
MAX_PROTOCOL_CHAIN = 7,
}
enum WSAEDISCON = 10101;
enum WSA_OPERATION_ABORTED = 995;
enum WSA_IO_PENDING = 997;
@ -153,7 +155,7 @@ void freeaddrinfo(ADDRINFOA* ai);
BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags);
BOOL CancelIoEx(HANDLE hFile, LPOVERLAPPED lpOverlapped);
struct WSAOVERLAPPEDX {
/*struct WSAOVERLAPPEDX {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
@ -164,4 +166,6 @@ struct WSAOVERLAPPEDX {
PVOID Pointer;
}
HANDLE hEvent;
}
}*/
alias WSAOVERLAPPEDX = OVERLAPPED;