Merge pull request #139 from vibe-d/tcp_state_fixes

Fix TCP state handling
This commit is contained in:
Sönke Ludwig 2020-03-18 10:27:43 +01:00 committed by GitHub
commit 6845e055bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 263 additions and 46 deletions

View file

@ -17,7 +17,7 @@ version (Posix) {
import core.sys.posix.netinet.tcp; import core.sys.posix.netinet.tcp;
import core.sys.posix.sys.un; import core.sys.posix.sys.un;
import core.sys.posix.unistd : close, read, write; import core.sys.posix.unistd : close, read, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS, ECONNREFUSED; import core.stdc.errno;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
import core.sys.posix.sys.socket; import core.sys.posix.sys.socket;
@ -69,6 +69,8 @@ version(OSX) {
enum IP_ADD_MEMBERSHIP = 12; enum IP_ADD_MEMBERSHIP = 12;
enum IP_MULTICAST_LOOP = 11; enum IP_MULTICAST_LOOP = 11;
} else import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; } else import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
static if (!is(typeof(ESHUTDOWN))) enum ESHUTDOWN = 58;
} }
version(FreeBSD) { version(FreeBSD) {
static if (__VERSION__ < 2077) { static if (__VERSION__ < 2077) {
@ -87,11 +89,20 @@ version (Windows) {
import core.sys.windows.windows; import core.sys.windows.windows;
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE; alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK; alias EAGAIN = WSAEWOULDBLOCK;
alias ECONNREFUSED = WSAECONNREFUSED; alias ECONNREFUSED = WSAECONNREFUSED;
alias EPIPE = WSAECONNABORTED;
alias ECONNRESET = WSAECONNRESET;
alias ENETRESET = WSAENETRESET;
alias ENOTCONN = WSAENOTCONN;
alias ETIMEDOUT = WSAETIMEDOUT;
alias ESHUTDOWN = WSAESHUTDOWN;
enum SHUT_RDWR = SD_BOTH; enum SHUT_RDWR = SD_BOTH;
enum SHUT_RD = SD_RECEIVE; enum SHUT_RD = SD_RECEIVE;
enum SHUT_WR = SD_SEND; enum SHUT_WR = SD_SEND;
extern (C) int read(int fd, void *buffer, uint count) nothrow; extern (C) int read(int fd, void *buffer, uint count) nothrow;
extern (C) int write(int fd, const(void) *buffer, uint count) nothrow; extern (C) int write(int fd, const(void) *buffer, uint count) nothrow;
extern (C) int close(int fd) nothrow @safe; extern (C) int close(int fd) nothrow @safe;
@ -380,23 +391,26 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (err.among!(EAGAIN, EINPROGRESS)) {
if (mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
} else {
auto st = handleReadError(err, m_loop.m_fds[socket].streamSocket);
print("sock error %s!", err); print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0); on_read_finish(socket, st, 0);
return; return;
} }
} }
if (ret == 0 && buffer.length > 0) { if (ret == 0 && buffer.length > 0) {
// treat as if the connection read end was shut down
handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
on_read_finish(socket, IOStatus.disconnected, 0); on_read_finish(socket, IOStatus.disconnected, 0);
return; return;
} }
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret >= 0) { if (ret >= 0) {
buffer = buffer[ret .. $]; buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) { if (mode != IOMode.all || buffer.length == 0) {
@ -443,18 +457,21 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
assert(m_loop.m_fds[socket].common.refCount > 0); assert(m_loop.m_fds[socket].common.refCount > 0);
} }
while (true) {
sizediff_t ret = 0; sizediff_t ret = 0;
() @trusted { ret = .recv(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0); } (); () @trusted { ret = .recv(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0); } ();
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (!err.among!(EAGAIN, EINPROGRESS)) {
finalize(IOStatus.error); auto st = handleReadError(err, *slot);
finalize(st);
return; return;
} }
} }
if (ret == 0 && slot.readBuffer.length) { if (ret == 0 && slot.readBuffer.length) {
slot.state = ConnectionState.passiveClose; // treat as if the connection read end was shut down
handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
finalize(IOStatus.disconnected); finalize(IOStatus.disconnected);
return; return;
} }
@ -467,7 +484,30 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return; return;
} }
} }
// retry if this was just a partial read, as it could mean that
// the connection was closed by the remove peer
if (ret <= 0 || !slot.readBuffer.length) break;
} }
}
private static IOStatus handleReadError(int err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
case ESHUTDOWN:
if (slot.state == ConnectionState.activeClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.passiveClose;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
@ -481,15 +521,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) { if (err.among!(EAGAIN, EINPROGRESS)) {
on_write_finish(socket, IOStatus.error, 0);
return;
}
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_write_finish(socket, IOStatus.wouldBlock, 0); on_write_finish(socket, IOStatus.wouldBlock, 0);
return; return;
} }
} else {
auto st = handleWriteError(err, m_loop.m_fds[socket].streamSocket);
on_write_finish(socket, st, 0);
return;
}
} }
size_t bytes_written = 0; size_t bytes_written = 0;
@ -537,7 +578,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (!err.among!(EAGAIN, EINPROGRESS)) { if (!err.among!(EAGAIN, EINPROGRESS)) {
auto l = lockHandle(socket); auto l = lockHandle(socket);
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(socket, IOStatus.error, slot.bytesRead); auto st = handleWriteError(err, *slot);
slot.writeCallback(socket, st, slot.bytesRead);
return; return;
} }
} }
@ -554,6 +596,24 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
private static IOStatus handleWriteError(int err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
case ESHUTDOWN:
if (slot.state == ConnectionState.passiveClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.activeClose;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
final override void waitForData(StreamSocketFD socket, IOCallback on_data_available) final override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
{ {
sizediff_t ret; sizediff_t ret;

View file

@ -245,6 +245,7 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
override void setUserTimeout(StreamSocketFD socket, Duration timeout) {} override void setUserTimeout(StreamSocketFD socket, Duration timeout) {}
override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
{ {
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
@ -270,7 +271,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
} }
} else { } else {
resetBuffers(); resetBuffers();
on_read_finish(socket, IOStatus.error, 0); auto st = handleReadError(err, *slot);
on_read_finish(socket, st, 0);
return; return;
} }
} }
@ -279,7 +281,6 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
m_core.addWaiter(); m_core.addWaiter();
} }
private static nothrow private static nothrow
void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
{ {
@ -302,7 +303,14 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $]; slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $];
if (dwError) { if (dwError) {
invokeCallback(IOStatus.error, 0); auto st = handleReadError(dwError, slot.streamSocket);
invokeCallback(st, slot.streamSocket.read.bytesTransferred);
return;
}
if (!cbTransferred) {
handleReadError(WSAEDISCON, slot.streamSocket);
invokeCallback(IOStatus.disconnected, slot.streamSocket.read.bytesTransferred);
return; return;
} }
@ -324,11 +332,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
invokeCallback(IOStatus.wouldBlock, 0); invokeCallback(IOStatus.wouldBlock, 0);
} }
} else { } else {
invokeCallback(IOStatus.error, 0); auto st = handleReadError(err, slot.streamSocket);
invokeCallback(st, slot.streamSocket.read.bytesTransferred);
} }
} }
} }
private static IOStatus handleReadError(DWORD err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case WSAEDISCON, WSAESHUTDOWN:
if (slot.state == ConnectionState.activeClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.passiveClose;
return IOStatus.disconnected;
case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
@ -349,7 +376,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
return; return;
} }
} else { } else {
on_write_finish(socket, IOStatus.error, 0); auto st = handleWriteError(err, *slot);
on_write_finish(socket, st, 0);
return; return;
} }
} }
@ -378,7 +406,8 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $]; slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $];
if (dwError) { if (dwError) {
invokeCallback(IOStatus.error, 0); auto st = handleWriteError(dwError, slot.streamSocket);
invokeCallback(st, slot.streamSocket.write.bytesTransferred);
return; return;
} }
@ -399,11 +428,30 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
invokeCallback(IOStatus.wouldBlock, 0); invokeCallback(IOStatus.wouldBlock, 0);
} }
} else { } else {
invokeCallback(IOStatus.error, 0); auto st = handleWriteError(err, slot.streamSocket);
invokeCallback(st, slot.streamSocket.write.bytesTransferred);
} }
} }
} }
private static IOStatus handleWriteError(DWORD err, ref StreamSocketSlot slot)
@safe nothrow {
switch (err) {
case 0: return IOStatus.ok;
case WSAEDISCON, WSAESHUTDOWN:
if (slot.state == ConnectionState.passiveClose)
slot.state = ConnectionState.closed;
else if (slot.state != ConnectionState.closed)
slot.state = ConnectionState.activeClose;
return IOStatus.disconnected;
case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
slot.state = ConnectionState.closed;
return IOStatus.disconnected;
default: return IOStatus.error;
}
}
override void waitForData(StreamSocketFD socket, IOCallback on_data_available) override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
@ -413,7 +461,9 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
{ {
() @trusted { WSASendDisconnect(socket, null); } (); () @trusted { WSASendDisconnect(socket, null); } ();
with (m_sockets[socket].streamSocket) { with (m_sockets[socket].streamSocket) {
if (state == ConnectionState.passiveClose)
state = ConnectionState.closed; state = ConnectionState.closed;
else state = ConnectionState.activeClose;
} }
} }

View file

@ -64,7 +64,7 @@ void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode)
} }
eventDriver.sockets.read(socket.m_fd, buffer, mode, &cb); eventDriver.sockets.read(socket.m_fd, buffer, mode, &cb);
} }
void cancelRead(ref StreamSocket socket) { eventDriver.sockets.cancelRead(socket.m_fd); } void cancelRead(ref StreamSocket socket) @safe nothrow { eventDriver.sockets.cancelRead(socket.m_fd); }
void waitForData(alias callback)(ref StreamSocket socket) void waitForData(alias callback)(ref StreamSocket socket)
{ {
void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow {
@ -79,8 +79,11 @@ void write(alias callback)(ref StreamSocket socket, const(ubyte)[] buffer, IOMod
} }
eventDriver.sockets.write(socket.m_fd, buffer, mode, &cb); eventDriver.sockets.write(socket.m_fd, buffer, mode, &cb);
} }
void cancelWrite(ref StreamSocket socket) { eventDriver.sockets.cancelWrite(socket.m_fd); } void cancelWrite(ref StreamSocket socket) @safe nothrow { eventDriver.sockets.cancelWrite(socket.m_fd); }
void shutdown(ref StreamSocket socket, bool shut_read = true, bool shut_write = true) { eventDriver.sockets.shutdown(socket.m_fd, shut_read, shut_write); } void shutdown(ref StreamSocket socket, bool shut_read = true, bool shut_write = true)
@safe nothrow {
eventDriver.sockets.shutdown(socket.m_fd, shut_read, shut_write);
}
struct StreamListenSocket { struct StreamListenSocket {

View file

@ -15,6 +15,15 @@ bool s_done;
void main() void main()
{ {
testBasicExchange();
testShutdown();
}
void testBasicExchange()
{
print("Basic test:");
print("");
// watchdog timer in case of starvation/deadlocks // watchdog timer in case of starvation/deadlocks
auto tm = eventDriver.timers.create(); auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, 10000.msecs, 0.msecs); eventDriver.timers.set(tm, 10000.msecs, 0.msecs);
@ -92,3 +101,98 @@ void main()
assert(s_done); assert(s_done);
s_done = false; s_done = false;
} }
void testShutdown()
{
static ubyte[10] srbuf, crbuf;
print("");
print("Shutdown test:");
print("");
// watchdog timer in case of starvation/deadlocks
auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, 10000.msecs, 0.msecs);
eventDriver.timers.wait(tm, (tm) { assert(false, "Test hung."); });
auto baddr = new InternetAddress(0x7F000001, 40001);
auto server = listenStream(baddr);
StreamSocket client;
StreamSocket incoming;
server.waitForConnections!((sock, addr) {
incoming = sock;
assert(incoming.state == ConnectionState.connected);
print("Server read");
ubyte[10] buf;
incoming.read!((rstatus, bytes) {
print("Server read done %s", bytes);
assert(rstatus == IOStatus.disconnected);
assert(bytes == 4);
assert(srbuf[0 .. 4] == [1, 2, 3, 4]);
assert(incoming.state == ConnectionState.passiveClose);
print("Server write 4 bytes");
incoming.write!((wstatus, bytes) {
print("Server write done");
assert(wstatus == IOStatus.ok);
assert(bytes == 4);
print("Shutdown server write");
incoming.shutdown(false, true);
assert(incoming.state == ConnectionState.closed);
print("Attempt server write after shutdown");
incoming.write!((wstatus, bytes) {
print("Attempted server write done");
assert(wstatus == IOStatus.disconnected);
assert(bytes == 0);
})([1], IOMode.all);
})([5, 6, 7, 8], IOMode.all);
})(srbuf, IOMode.all);
});
print("Connect...");
connectStream!((sock, status) {
client = sock;
assert(client.state == ConnectionState.connected);
print("Client write 4 bytes");
client.write!((wstatus, bytes) {
print("Client write done");
assert(wstatus == IOStatus.ok);
assert(bytes == 4);
print("Shutdown client write");
client.shutdown(false, true);
assert(client.state == ConnectionState.activeClose);
print("Attempt client write after shutdown");
client.write!((wstatus, bytes) {
print("Attempted client write done");
assert(wstatus == IOStatus.disconnected);
assert(bytes == 0);
print("Client read");
client.read!((rstatus, bytes) {
print("Client read done");
assert(rstatus == IOStatus.disconnected);
assert(bytes == 4);
assert(crbuf[0 .. 4] == [5, 6, 7, 8]);
assert(client.state == ConnectionState.closed);
destroy(client);
destroy(incoming);
destroy(server);
s_done = true;
eventDriver.timers.stop(tm);
})(crbuf, IOMode.all);
})([1], IOMode.all);
})([1, 2, 3, 4], IOMode.all);
})(baddr);
ExitReason er;
do er = eventDriver.core.processEvents(Duration.max);
while (er == ExitReason.idle);
assert(er == ExitReason.outOfWaiters);
assert(s_done);
s_done = false;
}