diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 4a3f918..14ced71 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -17,7 +17,7 @@ version (Posix) { import core.sys.posix.netinet.tcp; import core.sys.posix.sys.un; import core.sys.posix.unistd : close, read, write; - import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED; + import core.stdc.errno; import core.sys.posix.fcntl; import core.sys.posix.sys.socket; @@ -87,11 +87,20 @@ version (Windows) { import core.sys.windows.windows; import core.sys.windows.winsock2; alias sockaddr_storage = SOCKADDR_STORAGE; + alias EAGAIN = WSAEWOULDBLOCK; alias ECONNREFUSED = WSAECONNREFUSED; + alias EPIPE = WSAECONNABORTED; + alias ECONNRESET = WSAECONNRESET; + alias ENETRESET = WSAENETRESET; + alias ENOTCONN = WSAENOTCONN; + alias ETIMEDOUT = WSAETIMEDOUT; + alias ESHUTDOWN = WSAESHUTDOWN; + enum SHUT_RDWR = SD_BOTH; enum SHUT_RD = SD_RECEIVE; enum SHUT_WR = SD_SEND; + extern (C) int read(int fd, void *buffer, uint count) nothrow; extern (C) int write(int fd, const(void) *buffer, uint count) nothrow; extern (C) int close(int fd) nothrow @safe; @@ -380,23 +389,26 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (!err.among!(EAGAIN, EINPROGRESS)) { + if (err.among!(EAGAIN, EINPROGRESS)) { + if (mode == IOMode.immediate) { + on_read_finish(socket, IOStatus.wouldBlock, 0); + return; + } + } else { + auto st = handleReadError(err, m_loop.m_fds[socket].streamSocket); print("sock error %s!", err); - on_read_finish(socket, IOStatus.error, 0); + on_read_finish(socket, st, 0); return; } } if (ret == 0 && buffer.length > 0) { + // treat as if the connection read end was shut down + handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket); on_read_finish(socket, IOStatus.disconnected, 0); return; } - if (ret < 0 && mode == IOMode.immediate) { - on_read_finish(socket, IOStatus.wouldBlock, 0); - return; - } - if (ret >= 0) { buffer = buffer[ret .. $]; if (mode != IOMode.all || buffer.length == 0) { @@ -448,13 +460,15 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); if (!err.among!(EAGAIN, EINPROGRESS)) { - finalize(IOStatus.error); + auto st = handleReadError(err, *slot); + finalize(st); return; } } if (ret == 0 && slot.readBuffer.length) { - slot.state = ConnectionState.passiveClose; + // treat as if the connection read end was shut down + handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket); finalize(IOStatus.disconnected); return; } @@ -469,6 +483,24 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } + private static IOStatus handleReadError(int err, ref StreamSocketSlot slot) + @safe nothrow { + switch (err) { + case 0: return IOStatus.ok; + case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT: + slot.state = ConnectionState.closed; + return IOStatus.disconnected; + case ESHUTDOWN: + if (slot.state == ConnectionState.activeClose) + slot.state = ConnectionState.closed; + else if (slot.state != ConnectionState.closed) + slot.state = ConnectionState.passiveClose; + return IOStatus.disconnected; + default: return IOStatus.error; + } + } + + final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { if (buffer.length == 0) { @@ -481,13 +513,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); - if (!err.among!(EAGAIN, EINPROGRESS)) { - on_write_finish(socket, IOStatus.error, 0); - return; - } - - if (mode == IOMode.immediate) { - on_write_finish(socket, IOStatus.wouldBlock, 0); + if (err.among!(EAGAIN, EINPROGRESS)) { + if (mode == IOMode.immediate) { + on_write_finish(socket, IOStatus.wouldBlock, 0); + return; + } + } else { + auto st = handleWriteError(err, m_loop.m_fds[socket].streamSocket); + on_write_finish(socket, st, 0); return; } } @@ -537,7 +570,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (!err.among!(EAGAIN, EINPROGRESS)) { auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.write)(socket, null); - slot.writeCallback(socket, IOStatus.error, slot.bytesRead); + auto st = handleWriteError(err, *slot); + slot.writeCallback(socket, st, slot.bytesRead); return; } } @@ -554,6 +588,24 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } + private static IOStatus handleWriteError(int err, ref StreamSocketSlot slot) + @safe nothrow { + switch (err) { + case 0: return IOStatus.ok; + case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT: + slot.state = ConnectionState.closed; + return IOStatus.disconnected; + case ESHUTDOWN: + if (slot.state == ConnectionState.passiveClose) + slot.state = ConnectionState.closed; + else if (slot.state != ConnectionState.closed) + slot.state = ConnectionState.activeClose; + return IOStatus.disconnected; + default: return IOStatus.error; + } + } + + final override void waitForData(StreamSocketFD socket, IOCallback on_data_available) { sizediff_t ret; diff --git a/source/eventcore/drivers/winapi/sockets.d b/source/eventcore/drivers/winapi/sockets.d index 8d82029..eff3e83 100644 --- a/source/eventcore/drivers/winapi/sockets.d +++ b/source/eventcore/drivers/winapi/sockets.d @@ -245,6 +245,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets { override void setUserTimeout(StreamSocketFD socket, Duration timeout) {} + override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); @@ -270,7 +271,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { } } else { resetBuffers(); - on_read_finish(socket, IOStatus.error, 0); + auto st = handleReadError(err, *slot); + on_read_finish(socket, st, 0); return; } } @@ -279,7 +281,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets { m_core.addWaiter(); } - private static nothrow void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) { @@ -302,7 +303,14 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $]; if (dwError) { - invokeCallback(IOStatus.error, 0); + auto st = handleReadError(dwError, slot.streamSocket); + invokeCallback(st, slot.streamSocket.read.bytesTransferred); + return; + } + + if (!cbTransferred) { + handleReadError(WSAEDISCON, slot.streamSocket); + invokeCallback(IOStatus.disconnected, slot.streamSocket.read.bytesTransferred); return; } @@ -324,11 +332,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets { invokeCallback(IOStatus.wouldBlock, 0); } } else { - invokeCallback(IOStatus.error, 0); + auto st = handleReadError(err, slot.streamSocket); + invokeCallback(st, slot.streamSocket.read.bytesTransferred); } } } + private static IOStatus handleReadError(DWORD err, ref StreamSocketSlot slot) + @safe nothrow { + switch (err) { + case 0: return IOStatus.ok; + case WSAEDISCON, WSAESHUTDOWN: + if (slot.state == ConnectionState.activeClose) + slot.state = ConnectionState.closed; + else if (slot.state != ConnectionState.closed) + slot.state = ConnectionState.passiveClose; + return IOStatus.disconnected; + case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT: + slot.state = ConnectionState.closed; + return IOStatus.disconnected; + default: return IOStatus.error; + } + } + + override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); @@ -349,7 +376,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { return; } } else { - on_write_finish(socket, IOStatus.error, 0); + auto st = handleWriteError(err, *slot); + on_write_finish(socket, st, 0); return; } } @@ -378,7 +406,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets { slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $]; if (dwError) { - invokeCallback(IOStatus.error, 0); + auto st = handleWriteError(dwError, slot.streamSocket); + invokeCallback(st, slot.streamSocket.write.bytesTransferred); return; } @@ -399,11 +428,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets { invokeCallback(IOStatus.wouldBlock, 0); } } else { - invokeCallback(IOStatus.error, 0); + auto st = handleWriteError(err, slot.streamSocket); + invokeCallback(st, slot.streamSocket.write.bytesTransferred); } } } + private static IOStatus handleWriteError(DWORD err, ref StreamSocketSlot slot) + @safe nothrow { + switch (err) { + case 0: return IOStatus.ok; + case WSAEDISCON, WSAESHUTDOWN: + if (slot.state == ConnectionState.passiveClose) + slot.state = ConnectionState.closed; + else if (slot.state != ConnectionState.closed) + slot.state = ConnectionState.activeClose; + return IOStatus.disconnected; + case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT: + slot.state = ConnectionState.closed; + return IOStatus.disconnected; + default: return IOStatus.error; + } + } + + override void waitForData(StreamSocketFD socket, IOCallback on_data_available) { assert(false, "TODO!"); @@ -413,7 +461,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets { { () @trusted { WSASendDisconnect(socket, null); } (); with (m_sockets[socket].streamSocket) { - state = ConnectionState.closed; + if (state == ConnectionState.passiveClose) + state = ConnectionState.closed; + else state = ConnectionState.activeClose; } }