Handle disconnects during socket read/write.

The connection state was not updated anymore since the changes in b32b329d15.

This fixes CI errors in vibe.d
This commit is contained in:
Sönke Ludwig 2020-03-17 12:21:35 +01:00
parent 608f60237f
commit 3b44da604c
2 changed files with 128 additions and 26 deletions

View file

@ -17,7 +17,7 @@ version (Posix) {
import core.sys.posix.netinet.tcp; import core.sys.posix.netinet.tcp;
import core.sys.posix.sys.un; import core.sys.posix.sys.un;
import core.sys.posix.unistd : close, read, write; import core.sys.posix.unistd : close, read, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED; import core.stdc.errno;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
import core.sys.posix.sys.socket; import core.sys.posix.sys.socket;
@ -87,11 +87,20 @@ version (Windows) {
import core.sys.windows.windows; import core.sys.windows.windows;
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE; alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK; alias EAGAIN = WSAEWOULDBLOCK;
alias ECONNREFUSED = WSAECONNREFUSED; alias ECONNREFUSED = WSAECONNREFUSED;
alias EPIPE = WSAECONNABORTED;
alias ECONNRESET = WSAECONNRESET;
alias ENETRESET = WSAENETRESET;
alias ENOTCONN = WSAENOTCONN;
alias ETIMEDOUT = WSAETIMEDOUT;
alias ESHUTDOWN = WSAESHUTDOWN;
enum SHUT_RDWR = SD_BOTH; enum SHUT_RDWR = SD_BOTH;
enum SHUT_RD = SD_RECEIVE; enum SHUT_RD = SD_RECEIVE;
enum SHUT_WR = SD_SEND; enum SHUT_WR = SD_SEND;
extern (C) int read(int fd, void *buffer, uint count) nothrow; extern (C) int read(int fd, void *buffer, uint count) nothrow;
extern (C) int write(int fd, const(void) *buffer, uint count) nothrow; extern (C) int write(int fd, const(void) *buffer, uint count) nothrow;
extern (C) int close(int fd) nothrow @safe; extern (C) int close(int fd) nothrow @safe;
@ -380,23 +389,26 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (err.among!(EAGAIN, EINPROGRESS)) {
if (mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
} else {
auto st = handleReadError(err, m_loop.m_fds[socket].streamSocket);
print("sock error %s!", err); print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0); on_read_finish(socket, st, 0);
return; return;
} }
} }
if (ret == 0 && buffer.length > 0) { if (ret == 0 && buffer.length > 0) {
// treat as if the connection read end was shut down
handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
on_read_finish(socket, IOStatus.disconnected, 0); on_read_finish(socket, IOStatus.disconnected, 0);
return; return;
} }
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret >= 0) { if (ret >= 0) {
buffer = buffer[ret .. $]; buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) { if (mode != IOMode.all || buffer.length == 0) {
@ -448,13 +460,15 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (!err.among!(EAGAIN, EINPROGRESS)) {
finalize(IOStatus.error); auto st = handleReadError(err, *slot);
finalize(st);
return; return;
} }
} }
if (ret == 0 && slot.readBuffer.length) { if (ret == 0 && slot.readBuffer.length) {
slot.state = ConnectionState.passiveClose; // treat as if the connection read end was shut down
handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
finalize(IOStatus.disconnected); finalize(IOStatus.disconnected);
return; return;
} }
@ -469,6 +483,24 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
private static IOStatus handleReadError(int err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
case ESHUTDOWN:
if (slot.state == ConnectionState.activeClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.passiveClose;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
if (buffer.length == 0) { if (buffer.length == 0) {
@ -481,15 +513,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (err.among!(EAGAIN, EINPROGRESS)) {
on_write_finish(socket, IOStatus.error, 0);
return;
}
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_write_finish(socket, IOStatus.wouldBlock, 0); on_write_finish(socket, IOStatus.wouldBlock, 0);
return; return;
} }
} else {
auto st = handleWriteError(err, m_loop.m_fds[socket].streamSocket);
on_write_finish(socket, st, 0);
return;
}
} }
size_t bytes_written = 0; size_t bytes_written = 0;
@ -537,7 +570,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (!err.among!(EAGAIN, EINPROGRESS)) { if (!err.among!(EAGAIN, EINPROGRESS)) {
auto l = lockHandle(socket); auto l = lockHandle(socket);
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(socket, IOStatus.error, slot.bytesRead); auto st = handleWriteError(err, *slot);
slot.writeCallback(socket, st, slot.bytesRead);
return; return;
} }
} }
@ -554,6 +588,24 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
private static IOStatus handleWriteError(int err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
case ESHUTDOWN:
if (slot.state == ConnectionState.passiveClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.activeClose;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
final override void waitForData(StreamSocketFD socket, IOCallback on_data_available) final override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
{ {
sizediff_t ret; sizediff_t ret;

View file

@ -245,6 +245,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
override void setUserTimeout(StreamSocketFD socket, Duration timeout) {} override void setUserTimeout(StreamSocketFD socket, Duration timeout) {}
override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
{ {
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
@ -270,7 +271,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
} else { } else {
resetBuffers(); resetBuffers();
on_read_finish(socket, IOStatus.error, 0); auto st = handleReadError(err, *slot);
on_read_finish(socket, st, 0);
return; return;
} }
} }
@ -279,7 +281,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.addWaiter(); m_core.addWaiter();
} }
private static nothrow private static nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{ {
@ -302,7 +303,14 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $]; slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $];
if (dwError) { if (dwError) {
invokeCallback(IOStatus.error, 0); auto st = handleReadError(dwError, slot.streamSocket);
invokeCallback(st, slot.streamSocket.read.bytesTransferred);
return;
}
if (!cbTransferred) {
handleReadError(WSAEDISCON, slot.streamSocket);
invokeCallback(IOStatus.disconnected, slot.streamSocket.read.bytesTransferred);
return; return;
} }
@ -324,11 +332,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
invokeCallback(IOStatus.wouldBlock, 0); invokeCallback(IOStatus.wouldBlock, 0);
} }
} else { } else {
invokeCallback(IOStatus.error, 0); auto st = handleReadError(err, slot.streamSocket);
invokeCallback(st, slot.streamSocket.read.bytesTransferred);
} }
} }
} }
private static IOStatus handleReadError(DWORD err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case WSAEDISCON, WSAESHUTDOWN:
if (slot.state == ConnectionState.activeClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.passiveClose;
return IOStatus.disconnected;
case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
@ -349,7 +376,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
return; return;
} }
} else { } else {
on_write_finish(socket, IOStatus.error, 0); auto st = handleWriteError(err, *slot);
on_write_finish(socket, st, 0);
return; return;
} }
} }
@ -378,7 +406,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $]; slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $];
if (dwError) { if (dwError) {
invokeCallback(IOStatus.error, 0); auto st = handleWriteError(dwError, slot.streamSocket);
invokeCallback(st, slot.streamSocket.write.bytesTransferred);
return; return;
} }
@ -399,11 +428,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
invokeCallback(IOStatus.wouldBlock, 0); invokeCallback(IOStatus.wouldBlock, 0);
} }
} else { } else {
invokeCallback(IOStatus.error, 0); auto st = handleWriteError(err, slot.streamSocket);
invokeCallback(st, slot.streamSocket.write.bytesTransferred);
} }
} }
} }
private static IOStatus handleWriteError(DWORD err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case WSAEDISCON, WSAESHUTDOWN:
if (slot.state == ConnectionState.passiveClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.activeClose;
return IOStatus.disconnected;
case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
override void waitForData(StreamSocketFD socket, IOCallback on_data_available) override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
@ -413,7 +461,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
{ {
() @trusted { WSASendDisconnect(socket, null); } (); () @trusted { WSASendDisconnect(socket, null); } ();
with (m_sockets[socket].streamSocket) { with (m_sockets[socket].streamSocket) {
if (state == ConnectionState.passiveClose)
state = ConnectionState.closed; state = ConnectionState.closed;
else state = ConnectionState.activeClose;
} }
} }