From 04b3575c14dc7ad9971e19f153f3e3d712c1bdde Mon Sep 17 00:00:00 2001 From: Sebastian Wilzbach Date: Mon, 3 Jul 2017 00:39:41 +0200 Subject: [PATCH 01/14] Remove deprecated stdc import --- source/vibe/core/core.d | 2 +- source/vibe/core/net.d | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index aa862a2..c5f506f 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1264,7 +1264,7 @@ shared static this() static if (need_wsa) { logTrace("init winsock"); // initialize WinSock2 - import std.c.windows.winsock; + import core.sys.windows.winsock2; WSADATA data; WSAStartup(0x0202, &data); diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index aed3edb..7404f70 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -34,7 +34,7 @@ NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFa NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true) { import std.socket : parseAddress; - version (Windows) import std.c.windows.winsock : sockaddr_in, sockaddr_in6; + version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6; else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; enforce(host.length > 0, "Host name must not be empty."); From b501d419f86e2b2d46fb10bbc8513c40f50c32ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 7 Jul 2017 02:37:59 +0200 Subject: [PATCH 02/14] Test all branches with AppVeyor. --- appveyor.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 6de5d05..9de1e95 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -27,9 +27,6 @@ environment: arch: x64 skip_tags: false -branches: - only: - - master install: - ps: function SetUpDCompiler From 429d5dcb771af5d5953f6061cf53ed6164a54b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 7 Jul 2017 22:29:12 +0200 Subject: [PATCH 03/14] Fix GenericPath.parentPath. --- source/vibe/core/path.d | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/path.d b/source/vibe/core/path.d index 33fbeef..607d369 100644 --- a/source/vibe/core/path.d +++ b/source/vibe/core/path.d @@ -480,7 +480,7 @@ struct GenericPath(F) { auto b = Format.getBackNode(m_path); static const Exception e = new Exception("Path has no parent path"); if (b.length >= m_path.length) throw e; - return GenericPath.fromTrustedString(m_path[0 .. b.length]); + return GenericPath.fromTrustedString(m_path[0 .. $ - b.length]); } /** Removes any redundant path segments and replaces all separators by the @@ -722,6 +722,29 @@ unittest { assert(p.toString() == "/foo%2fbar/baz%2Fbam", p.toString); } +unittest { + assert(!PosixPath("").hasParentPath); + assert(!PosixPath("/").hasParentPath); + assert(!PosixPath("foo\\bar").hasParentPath); + assert(PosixPath("foo/bar").parentPath.toString() == "foo/"); + assert(PosixPath("./foo").parentPath.toString() == "./"); + assert(PosixPath("./foo").parentPath.toString() == "./"); + + assert(!WindowsPath("").hasParentPath); + assert(!WindowsPath("/").hasParentPath); + assert(WindowsPath("foo\\bar").parentPath.toString() == "foo\\"); + assert(WindowsPath("foo/bar").parentPath.toString() == "foo/"); + assert(WindowsPath("./foo").parentPath.toString() == "./"); + assert(WindowsPath("./foo").parentPath.toString() == "./"); + + assert(!InetPath("").hasParentPath); + assert(!InetPath("/").hasParentPath); + assert(InetPath("foo/bar").parentPath.toString() == "foo/"); + assert(InetPath("foo/bar%2Fbaz").parentPath.toString() == "foo/"); + assert(InetPath("./foo").parentPath.toString() == "./"); + assert(InetPath("./foo").parentPath.toString() == "./"); +} + /// Thrown when an invalid string representation of a path is detected. class PathValidationException : Exception { this(string text, string file = __FILE__, size_t line = cast(size_t)__LINE__, Throwable next = null) From ca2f9dbe9109ebdca50b4730b67435f57d96776a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 8 Jul 2017 10:44:42 +0200 Subject: [PATCH 04/14] Add UDP multicast declarations matching rejectedsoftware/vibe.d#1806. The actual functionality still needs to be implemented in eventcore. --- source/vibe/core/net.d | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 7404f70..e95681c 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -765,6 +765,27 @@ struct UDPConnection { return naddr; } + /** Set IP multicast loopback mode. + + This is on by default. All packets send will also loopback if enabled. + Useful if more than one application is running on same host and both need each other's packets. + */ + @property void multicastLoopback(bool loop) + { + assert(false, "not implemented."); + } + + /** Become a member of an IP multicast group. + + The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255. + See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12 + and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml + */ + void addMembership(ref NetworkAddress multiaddr) + { + assert(false, "not implemented."); + } + /** Stops listening for datagrams and frees all resources. */ void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; } From efeab80bd3822604045e22628335df3f04cfd156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 10 Jul 2017 11:22:26 +0200 Subject: [PATCH 05/14] Update change log. --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 604257e..37dce20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -1.0.0 - 2016-06- +1.0.0 - 2016-07-10 ================== This is the initial release of the `vibe-core` package. The source code was derived from the original `:core` sub package of vibe.d and received a complete work over, mostly under the surface, but also in parts of the API. The changes have been made in a way that is usually backwards compatible from the point of view of an application developer. At the same time, vibe.d 0.8.0 contains a number of forward compatibility declarations, so that switching back and forth between the still existing `vibe-d:core` and `vibe-core` is possible without changing the application code. @@ -6,12 +6,12 @@ This is the initial release of the `vibe-core` package. The source code was deri To use this package, it is currently necessary to put an explicit dependency with a sub configuration directive in the DUB package recipe: ``` // for dub.sdl: -dependency "vibe-d:core" version="~>0.8.0-rc" +dependency "vibe-d:core" version="~>0.8.0" subConfiguration "vibe-d:core" "vibe-core" // for dub.json: "dependencies": { - "vibe-d:core": "~>0.8.0-rc" + "vibe-d:core": "~>0.8.0" }, "subConfigurations": { "vibe-d:core": "vibe-core" From 87296bb1e4466b5467fe8b0a454286a1b0b1c426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 10 Jul 2017 19:46:16 +0200 Subject: [PATCH 06/14] Fix compilation error for VibeIdleCollect. --- source/vibe/core/core.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index c5f506f..f8f57dd 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1294,7 +1294,7 @@ shared static this() version(VibeIdleCollect) { logTrace("setup gc"); - driverCore.setupGcTimer(); + setupGcTimer(); } version (VibeNoDefaultArgs) {} From 254d91dcdfb0452d90db4934085b12532d8801ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 15 Jul 2017 10:12:52 +0200 Subject: [PATCH 07/14] Improve error message. --- source/vibe/core/task.d | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 1a3af74..2a4c397 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -515,13 +515,15 @@ package struct TaskFuncInfo { import std.algorithm : move; import std.traits : hasElaborateAssign; + import std.conv : to; static struct TARGS { ARGS expand; } - static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); + static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length, + "Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof); static assert(TARGS.sizeof <= maxTaskParameterSize, "The arguments passed to run(Worker)Task must not exceed "~ - maxTaskParameterSize.to!string~" bytes in total size."); + maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes"); static void callDelegate(ref TaskFuncInfo tfi) { assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); From 7efb496208a7fb862ddd97f1a99d48aa1a83d230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 15 Jul 2017 18:16:13 +0200 Subject: [PATCH 08/14] Fix double-free of ThreadLocalWaiter in ManualEvent. Also adds assertions and a randomized multi-thread test to rule out similar issues with a higher confidence. --- source/vibe/core/sync.d | 79 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 4b76bbd..dfac097 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -799,8 +799,8 @@ struct ManualEvent { private { int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; - StackSList!ThreadLocalWaiter free; + StackSList!ThreadLocalWaiter active; // actively waiting + StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -955,7 +955,7 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : theAllocator, make; + import vibe.internal.allocator : processAllocator, make; import core.memory : GC; ThreadLocalWaiter* w; @@ -965,6 +965,8 @@ struct ManualEvent { active.iterate((aw) { if (aw.m_driver is drv) { w = aw; + w.addRef(); + return false; } return true; }); @@ -973,6 +975,7 @@ struct ManualEvent { free.filter((fw) { if (fw.m_driver is drv) { w = fw; + w.addRef(); return false; } return true; @@ -981,7 +984,7 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = theAllocator.make!ThreadLocalWaiter; + w = processAllocator.make!ThreadLocalWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); @@ -996,9 +999,12 @@ struct ManualEvent { } scope (exit) { - if (w.unused) { + if (!w.releaseRef()) { + assert(w.m_driver is drv); + assert(w.unused); with (m_waiters.lock) { - active.remove(w); + auto rmvd = active.remove(w); + assert(rmvd); free.add(w); // TODO: cap size of m_freeWaiters } @@ -1026,6 +1032,62 @@ unittest { runEventLoop(); } +unittest { + import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep; + import vibe.core.taskpool : TaskPool; + import core.time : msecs, usecs; + import std.concurrency : send, receiveOnly; + import std.random : uniform; + + auto tpool = new shared TaskPool(4); + scope (exit) tpool.terminate(); + + static void test(shared(ManualEvent)* evt, Task owner) + { + owner.tid.send(Task.getThis()); + + int ec = evt.emitCount; + auto thist = Task.getThis(); + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + while (ec < 5_000) { + tm.rearm(500.msecs); + sleep(uniform(0, 10_000).usecs); + if (uniform(0, 10) == 0) evt.emit(); + auto ecn = evt.wait(ec); + assert(ecn > ec); + ec = ecn; + } + } + + auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); }); + scope (exit) watchdog.stop(); + + auto e = createSharedManualEvent(); + Task[] tasks; + + runTask({ + auto thist = Task.getThis(); + + // start 25 tasks in each thread + foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist); + // collect all task handles + foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task; + + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + int pec = 0; + while (e.emitCount < 5_000) { + tm.rearm(500.msecs); + sleep(50.usecs); + e.emit(); + } + + // wait for all worker tasks to finish + foreach (t; tasks) t.join(); + }).join(); +} + package shared struct Monitor(T, M) { alias Mutex = M; @@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter { NativeEventDriver m_driver; EventID m_event = EventID.invalid; Waiter* m_waiters; + int m_refCount = 1; } this(this) @@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter { @property bool unused() const @safe nothrow { return m_waiters is null; } + void addRef() @safe nothrow { m_refCount++; } + bool releaseRef() @safe nothrow { return --m_refCount > 0; } + bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; @@ -1249,6 +1315,7 @@ private struct StackSList(T) void add(T* elem) { + debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); elem.next = m_first; m_first = elem; } From 719c62d6c95a43400a4498b5e5654ecb32abfff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 16 Jul 2017 00:08:07 +0200 Subject: [PATCH 09/14] Fix TLS initialization condition. --- source/vibe/core/task.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 2a4c397..8733375 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -228,7 +228,7 @@ struct TaskLocal(T) } if (m_hasInitValue) { - static if (__traits(compiles, emplace!T(data, m_initValue))) + static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ())) () @trusted { emplace!T(data, m_initValue); } (); else assert(false, "Cannot emplace initialization value for type "~T.stringof); } else () @trusted { emplace!T(data); } (); From 4f69b1eaf3359220ae6a71911bae2f156a685acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 16 Jul 2017 22:07:59 +0200 Subject: [PATCH 10/14] Add setTaskCreationCallback() for better remote debugger integration. --- source/vibe/core/core.d | 22 ++++++++++++++++++++++ source/vibe/core/task.d | 24 +++++++++++++++++++++--- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f8f57dd..ef13d6d 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -373,6 +373,13 @@ package Task runTask_internal(alias TFI_SETUP)() f.bumpTaskCounter(); auto handle = f.task(); + debug if (TaskFiber.ms_taskCreationCallback) { + TaskCreationInfo info; + info.handle = handle; + info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } (); + () @trusted { TaskFiber.ms_taskCreationCallback(info); } (); + } + debug if (TaskFiber.ms_taskEventCallback) { () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); } @@ -916,6 +923,21 @@ void setTaskEventCallback(TaskEventCallback func) debug TaskFiber.ms_taskEventCallback = func; } +/** + Sets a callback that is invoked whenever new task is created. + + The callback is guaranteed to be invoked before the one set by + `setTaskEventCallback` for the same task handle. + + This function is useful mostly for implementing debuggers that + analyze the life time of tasks, including task switches. Note that + the callback will only be called for debug builds. +*/ +void setTaskCreationCallback(TaskCreationCallback func) +{ + debug TaskFiber.ms_taskCreationCallback = func; +} + /** A version string representing the current vibe version diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 8733375..0d5230f 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -262,7 +262,13 @@ enum TaskEvent { fail /// Ended with an exception } +struct TaskCreationInfo { + Task handle; + const(void)* functionPointer; +} + alias TaskEventCallback = void function(TaskEvent, Task) nothrow; +alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe; /** The maximum combined size of all parameters passed to a task delegate @@ -314,6 +320,7 @@ final package class TaskFiber : Fiber { package TaskFuncInfo m_taskFunc; package __gshared size_t ms_taskStackSize = defaultTaskStackSize; package __gshared debug TaskEventCallback ms_taskEventCallback; + package __gshared debug TaskCreationCallback ms_taskCreationCallback; this() @trusted nothrow { @@ -323,9 +330,7 @@ final package class TaskFiber : Fiber { static TaskFiber getThis() @safe nothrow { - auto f = () @trusted nothrow { - return Fiber.getThis(); - } (); + auto f = () @trusted nothrow { return Fiber.getThis(); } (); if (auto tf = cast(TaskFiber)f) return tf; if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber; return ms_globalDummyFiber; @@ -508,6 +513,7 @@ package struct TaskFuncInfo { void function(ref TaskFuncInfo) func; void[2*size_t.sizeof] callable; void[maxTaskParameterSize] args; + debug ulong functionPointer; void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) { @@ -525,6 +531,8 @@ package struct TaskFuncInfo { "The arguments passed to run(Worker)Task must not exceed "~ maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes"); + debug functionPointer = callPointer(callable); + static void callDelegate(ref TaskFuncInfo tfi) { assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); @@ -584,6 +592,16 @@ package struct TaskFuncInfo { } } +private ulong callPointer(C)(ref C callable) +@trusted nothrow @nogc { + alias IP = ulong; + static if (is(C == function)) return cast(IP)cast(void*)callable; + else static if (is(C == delegate)) return cast(IP)callable.funcptr; + else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall; + else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr; + else return cast(IP)&callable; +} + package struct TaskScheduler { import eventcore.driver : ExitReason; import eventcore.core : eventDriver; From 6aa2775381bb50174041d2f78b8431f2046a5bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 16 Jul 2017 22:28:43 +0200 Subject: [PATCH 11/14] Skip failing test configurations. - the libasync tests are disabled for now, since the libevent driver in eventcore isn't implemented. - LDC 1.0.0 on Windows fails with a stack overflow - added to allowed_failures due to the age of the underlying front end --- .travis.yml | 3 ++- appveyor.yml | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 42ca64e..dccf42b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,8 @@ d: env: - CONFIG=select - CONFIG=epoll - - CONFIG=libasync + # disabled until the libasync driver of eventcore is more than a stub + #- CONFIG=libasync matrix: allow_failures: diff --git a/appveyor.yml b/appveyor.yml index 9de1e95..fceaab0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -26,6 +26,12 @@ environment: DVersion: 1.0.0 arch: x64 +matrix: + allow_failures: + - DC: ldc + DVersion: 1.0.0 + arch: x64 + skip_tags: false install: From e3492bab067f07f9ed2e700d3c3b03958f7e238f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 16 Jul 2017 22:29:45 +0200 Subject: [PATCH 12/14] Update change log and bump version number. --- CHANGELOG.md | 8 ++++++++ source/vibe/core/core.d | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37dce20..22d0fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +1.1.0 - 2016-07-16 +================== + +- Added a new debug hook `setTaskCreationCallback` +- Fixed a compilation error for `VibeIdleCollect` +- Fixed a possible double-free in `ManualEvent` that resulted in an endless loop - [pull #23][issue23] + + 1.0.0 - 2016-07-10 ================== diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index ef13d6d..8eb846d 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -942,7 +942,7 @@ void setTaskCreationCallback(TaskCreationCallback func) /** A version string representing the current vibe version */ -enum vibeVersionString = "1.0.0"; +enum vibeVersionString = "1.1.0"; /** From a70f35e84694557241531fbd4bd3de23d49d35bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 17 Jul 2017 12:00:15 +0200 Subject: [PATCH 13/14] Update compiler support list. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 64f5ae7..52aadc2 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ The following compilers are tested and supported: - DMD 2.072.2 - DMD 2.071.2 - DMD 2.070.2 +- LDC 1.3.0 - LDC 1.2.0 - LDC 1.1.0 - LDC 1.0.0 From d7b2173cb3bd5f3317ec44f84e64d0f5d0f18933 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 18 Jul 2017 11:55:39 +0200 Subject: [PATCH 14/14] Implement TCPListener.stopListening and fix the vibe.core.net.1726 test. The test fix follows the fix in the vibe.d repository: rejectedsoftware/vibe.d#f960427e5974c176c58b516647895a2af4ea181b --- source/vibe/core/net.d | 22 ++++++++++++++-------- tests/vibe.core.net.1726.d | 37 ++++++++++++++++++++----------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index e95681c..18c86b3 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -192,9 +192,9 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA return () @trusted { // scope scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); - + // FIXME: make this interruptible - auto result = asyncAwaitUninterruptible!(ConnectCallback, + auto result = asyncAwaitUninterruptible!(ConnectCallback, cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) //cb => eventDriver.sockets.cancelConnect(cb) ); @@ -511,7 +511,7 @@ struct TCPConnection { @property bool empty() { return leastSize == 0; } @property ulong leastSize() { waitForData(); return m_context && m_context.readBuffer.length; } @property bool dataAvailableForRead() { return waitForData(0.seconds); } - + void close() nothrow { //logInfo("close %s", cast(int)m_fd); @@ -522,7 +522,7 @@ struct TCPConnection { m_context = null; } } - + bool waitForData(Duration timeout = Duration.max) { mixin(tracer); @@ -602,7 +602,7 @@ mixin(tracer); auto res = asyncAwait!(IOCallback, cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), cb => eventDriver.sockets.cancelWrite(m_socket)); - + switch (res[1]) { default: throw new Exception("Error writing data to socket."); @@ -667,7 +667,7 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration do { if (LoopBody(timeout)) return; - + if (timeout != Duration.max) { auto prev = now; now = Clock.currTime(UTC()); @@ -683,6 +683,9 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration Represents a listening TCP socket. */ struct TCPListener { + // FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking + // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, + // stopListening() needs to keep working. private { StreamListenSocketFD m_socket; NetworkAddress m_bindAddress; @@ -704,7 +707,10 @@ struct TCPListener { /// Stops listening and closes the socket. void stopListening() { - assert(false); + if (m_socket != StreamListenSocketFD.invalid) { + eventDriver.sockets.releaseRef(m_socket); + m_socket = StreamListenSocketFD.invalid; + } } } @@ -722,7 +728,7 @@ struct UDPConnection { Context* m_context; } - private this(ref NetworkAddress bind_address) + private this(ref NetworkAddress bind_address) { scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); diff --git a/tests/vibe.core.net.1726.d b/tests/vibe.core.net.1726.d index 55af1e4..394ebc1 100644 --- a/tests/vibe.core.net.1726.d +++ b/tests/vibe.core.net.1726.d @@ -10,12 +10,11 @@ import vibe.core.net; import core.time : msecs; import vibe.core.log; -void main() -{ - bool done = false; - auto buf = new ubyte[512*1024*1024]; +ubyte[] buf; - listenTCP(11375,(conn) { +void performTest(bool reverse) +{ + auto l = listenTCP(11375, (conn) { bool read_ex = false; bool write_ex = false; auto rt = runTask!TCPConnection((conn) { @@ -29,10 +28,10 @@ void main() } // expected }, conn); auto wt = runTask!TCPConnection((conn) { - sleep(1.msecs); // give the connection time to establish + sleep(reverse ? 100.msecs : 20.msecs); // give the connection time to establish try { conn.write(buf); - assert(false, "Expected read() to throw an exception."); + assert(false, "Expected write() to throw an exception."); } catch (Exception) { write_ex = true; conn.close(); @@ -44,24 +43,28 @@ void main() wt.join(); assert(read_ex, "No read exception thrown"); assert(write_ex, "No write exception thrown"); - done = true; + logInfo("Test has finished successfully."); + exitEventLoop(); }, "127.0.0.1"); runTask({ try { auto conn = connectTCP("127.0.0.1", 11375); - sleep(10.msecs); + sleep(reverse ? 20.msecs : 100.msecs); conn.close(); } catch (Exception e) assert(false, e.msg); - sleep(50.msecs); - assert(done, "Not done"); - - exitEventLoop(); }); - setTimer(2000.msecs, { - assert(false, "Test has hung."); - }); + runEventLoop(); - runApplication(); + l.stopListening(); +} + +void main() +{ + setTimer(10000.msecs, { assert(false, "Test has hung."); }); + buf = new ubyte[512*1024*1024]; + + performTest(false); + performTest(true); }