diff --git a/tests/0-tcp.d b/tests/0-tcp.d new file mode 100644 index 0000000..453f6e6 --- /dev/null +++ b/tests/0-tcp.d @@ -0,0 +1,190 @@ +/+ dub.sdl: + name "tests" + description "TCP semantics tests" + copyright "Copyright © 2015-2020, Sönke Ludwig" + dependency "vibe-core" path=".." ++/ +module tests; + +import vibe.core.core; +import vibe.core.log; +import vibe.core.net; +import core.time; +import std.datetime.stopwatch : StopWatch; + +enum Test { + receive, + receiveExisting, + timeout, + noTimeout, + close +} + +void test1() +{ + Test test; + Task lt; + + auto l = listenTCP(0, (conn) { + lt = Task.getThis(); + try { + while (!conn.empty) { + assert(conn.readLine() == "next"); + auto curtest = test; + conn.write("continue\n"); + logInfo("Perform test %s", curtest); + StopWatch sw; + sw.start(); + final switch (curtest) { + case Test.receive: + assert(conn.waitForData(2.seconds) == true); + assert(cast(Duration)sw.peek < 2.seconds); // should receive something instantly + assert(conn.readLine() == "receive"); + break; + case Test.receiveExisting: + assert(conn.waitForData(2.seconds) == true); + // TODO: validate that waitForData didn't yield! + assert(cast(Duration)sw.peek < 2.seconds); // should receive something instantly + assert(conn.readLine() == "receiveExisting"); + break; + case Test.timeout: + assert(conn.waitForData(2.seconds) == false); + assert(cast(Duration)sw.peek > 1900.msecs); // should wait for at least 2 seconds + assert(conn.connected); + break; + case Test.noTimeout: + assert(conn.waitForData(Duration.max) == true); + assert(cast(Duration)sw.peek > 2.seconds); // data only sent after 3 seconds + assert(conn.readLine() == "noTimeout"); + break; + case Test.close: + assert(conn.waitForData(2.seconds) == false); + assert(cast(Duration)sw.peek < 2.seconds); // connection should be closed instantly + assert(conn.empty); + conn.close(); + assert(!conn.connected); + return; + } + conn.write("ok\n"); + } + } catch (Exception e) { + assert(false, e.msg); + } + }, "127.0.0.1"); + scope (exit) l.stopListening; + + auto conn = connectTCP(l.bindAddress); + + test = Test.receive; + conn.write("next\n"); + assert(conn.readLine() == "continue"); + conn.write("receive\n"); + assert(conn.readLine() == "ok"); + + test = Test.receiveExisting; + conn.write("next\nreceiveExisting\n"); + assert(conn.readLine() == "continue"); + assert(conn.readLine() == "ok"); + + test = Test.timeout; + conn.write("next\n"); + assert(conn.readLine() == "continue"); + sleep(3.seconds); + assert(conn.readLine() == "ok"); + + test = Test.noTimeout; + conn.write("next\n"); + assert(conn.readLine() == "continue"); + sleep(3.seconds); + conn.write("noTimeout\n"); + assert(conn.readLine() == "ok"); + + test = Test.close; + conn.write("next\n"); + assert(conn.readLine() == "continue"); + conn.close(); + + lt.join(); +} + +void test2() +{ + Task lt; + logInfo("Perform test \"disconnect with pending data\""); + auto l = listenTCP(0, (conn) { + try { + lt = Task.getThis(); + sleep(1.seconds); + StopWatch sw; + sw.start(); + try { + assert(conn.waitForData() == true); + assert(cast(Duration)sw.peek < 500.msecs); // waitForData should return immediately + assert(conn.dataAvailableForRead); + assert(conn.readAll() == "test"); + conn.close(); + } catch (Exception e) { + assert(false, "Failed to read pending data: " ~ e.msg); + } + } catch (Exception e) { + assert(false, e.msg); + } + }, "127.0.0.1"); + scope (exit) l.stopListening; + + auto conn = connectTCP(l.bindAddress); + conn.write("test"); + conn.close(); + + sleep(100.msecs); + + assert(lt != Task.init); + lt.join(); +} + +void test() +{ + test1(); + test2(); + exitEventLoop(); +} + +void main() +{ + import std.functional : toDelegate; + runTask(toDelegate(&test)); + runEventLoop(); +} + +string readLine(TCPConnection c) +{ + import std.string : indexOf; + + string ret; + while (!c.empty) { + auto buf = cast(char[])c.peek(); + auto idx = buf.indexOf('\n'); + if (idx < 0) { + ret ~= buf; + c.skip(buf.length); + } else { + ret ~= buf[0 .. idx]; + c.skip(idx+1); + break; + } + } + return ret; +} + +string readAll(TCPConnection c) +{ + import std.algorithm.comparison : min; + + ubyte[] ret; + while (!c.empty) { + auto len = min(c.leastSize, size_t.max); + ret.length += len; + c.read(ret[$-len .. $]); + } + return cast(string)ret; +} diff --git a/tests/0-tcpproxy.d b/tests/0-tcpproxy.d new file mode 100644 index 0000000..38be778 --- /dev/null +++ b/tests/0-tcpproxy.d @@ -0,0 +1,134 @@ +/+ dub.sdl: + name "tests" + description "TCP proxy test" + copyright "Copyright © 2015-2020, Sönke Ludwig" + dependency "vibe-core" path=".." ++/ +module tests; + +import vibe.core.core; +import vibe.core.log; +import vibe.core.net; +import vibe.core.stream; +import std.exception; +import std.string; +import std.range.primitives : front; +import core.time; + + +void testProtocol(TCPConnection server, bool terminate) +{ + foreach (i; 0 .. 1) { + foreach (j; 0 .. 1) { + auto str = format("Hello, World #%s", i*100+j); + server.write(str); + server.write("\n"); + auto reply = server.readLine(); + assert(reply == format("Hash: %08X", typeid(string).getHash(&str)), "Unexpected reply"); + } + sleep(10.msecs); + } + + assert(!server.dataAvailableForRead, "Still data available."); + + if (terminate) { + // forcefully close connection + server.close(); + } else { + server.write("quit\n"); + enforce(server.readLine() == "Bye bye!"); + // should have closed within 500 ms + enforce(!server.waitForData(500.msecs)); + assert(server.empty, "Server still connected."); + } +} + +void runTest() +{ + import std.algorithm : find; + import std.socket : AddressFamily; + + // server for a simple line based protocol + auto l1 = listenTCP(0, (client) { + while (!client.empty) { + auto ln = client.readLine(); + if (ln == "quit") { + client.write("Bye bye!\n"); + client.close(); + break; + } + + client.write(format("Hash: %08X\n", typeid(string).getHash(&ln))); + } + }, "127.0.0.1"); + scope (exit) l1.stopListening; + + // proxy server + auto l2 = listenTCP(0, (client) { + auto server = connectTCP(l1.bindAddress); + + // pipe server to client as long as the server connection is alive + auto t = runTask!(TCPConnection, TCPConnection)((client, server) { + scope (exit) client.close(); + pipe(server, client); + logInfo("Proxy 2 out"); + }, client, server); + + // pipe client to server as long as the client connection is alive + scope (exit) { + server.close(); + t.join(); + } + pipe(client, server); + logInfo("Proxy out"); + }, "127.0.0.1"); + scope (exit) l2.stopListening; + + // test server + logInfo("Test protocol implementation on server"); + testProtocol(connectTCP(l2.bindAddress), false); + logInfo("Test protocol implementation on server with forced disconnect"); + testProtocol(connectTCP(l2.bindAddress), true); + + // test proxy + logInfo("Test protocol implementation on proxy"); + testProtocol(connectTCP(l2.bindAddress), false); + logInfo("Test protocol implementation on proxy with forced disconnect"); + testProtocol(connectTCP(l2.bindAddress), true); +} + +int main() +{ + int ret = 0; + runTask({ + try runTest(); + catch (Throwable th) { + logError("Test failed: %s", th.msg); + logDiagnostic("Full error: %s", th); + ret = 1; + } finally exitEventLoop(true); + }); + runEventLoop(); + return ret; +} + +string readLine(TCPConnection c) +{ + import std.string : indexOf; + + string ret; + while (!c.empty) { + auto buf = cast(char[])c.peek(); + auto idx = buf.indexOf('\n'); + if (idx < 0) { + ret ~= buf; + c.skip(buf.length); + } else { + ret ~= buf[0 .. idx]; + c.skip(idx+1); + break; + } + } + return ret; +} +