diff --git a/.travis.yml b/.travis.yml index 42ca64e..dccf42b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,8 @@ d: env: - CONFIG=select - CONFIG=epoll - - CONFIG=libasync + # disabled until the libasync driver of eventcore is more than a stub + #- CONFIG=libasync matrix: allow_failures: diff --git a/CHANGELOG.md b/CHANGELOG.md index 604257e..22d0fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,12 @@ -1.0.0 - 2016-06- +1.1.0 - 2016-07-16 +================== + +- Added a new debug hook `setTaskCreationCallback` +- Fixed a compilation error for `VibeIdleCollect` +- Fixed a possible double-free in `ManualEvent` that resulted in an endless loop - [pull #23][issue23] + + +1.0.0 - 2016-07-10 ================== This is the initial release of the `vibe-core` package. The source code was derived from the original `:core` sub package of vibe.d and received a complete work over, mostly under the surface, but also in parts of the API. The changes have been made in a way that is usually backwards compatible from the point of view of an application developer. At the same time, vibe.d 0.8.0 contains a number of forward compatibility declarations, so that switching back and forth between the still existing `vibe-d:core` and `vibe-core` is possible without changing the application code. @@ -6,12 +14,12 @@ This is the initial release of the `vibe-core` package. The source code was deri To use this package, it is currently necessary to put an explicit dependency with a sub configuration directive in the DUB package recipe: ``` // for dub.sdl: -dependency "vibe-d:core" version="~>0.8.0-rc" +dependency "vibe-d:core" version="~>0.8.0" subConfiguration "vibe-d:core" "vibe-core" // for dub.json: "dependencies": { - "vibe-d:core": "~>0.8.0-rc" + "vibe-d:core": "~>0.8.0" }, "subConfigurations": { "vibe-d:core": "vibe-core" diff --git a/README.md b/README.md index 64f5ae7..52aadc2 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ The following compilers are tested and supported: - DMD 2.072.2 - DMD 2.071.2 - DMD 2.070.2 +- LDC 1.3.0 - LDC 1.2.0 - LDC 1.1.0 - LDC 1.0.0 diff --git a/appveyor.yml b/appveyor.yml index 6de5d05..fceaab0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -26,10 +26,13 @@ environment: DVersion: 1.0.0 arch: x64 +matrix: + allow_failures: + - DC: ldc + DVersion: 1.0.0 + arch: x64 + skip_tags: false -branches: - only: - - master install: - ps: function SetUpDCompiler diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 0fb048f..1837133 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -373,6 +373,13 @@ package Task runTask_internal(alias TFI_SETUP)() f.bumpTaskCounter(); auto handle = f.task(); + debug if (TaskFiber.ms_taskCreationCallback) { + TaskCreationInfo info; + info.handle = handle; + info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } (); + () @trusted { TaskFiber.ms_taskCreationCallback(info); } (); + } + debug if (TaskFiber.ms_taskEventCallback) { () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); } @@ -916,11 +923,26 @@ void setTaskEventCallback(TaskEventCallback func) debug TaskFiber.ms_taskEventCallback = func; } +/** + Sets a callback that is invoked whenever new task is created. + + The callback is guaranteed to be invoked before the one set by + `setTaskEventCallback` for the same task handle. + + This function is useful mostly for implementing debuggers that + analyze the life time of tasks, including task switches. Note that + the callback will only be called for debug builds. +*/ +void setTaskCreationCallback(TaskCreationCallback func) +{ + debug TaskFiber.ms_taskCreationCallback = func; +} + /** A version string representing the current vibe version */ -enum vibeVersionString = "1.0.0"; +enum vibeVersionString = "1.1.0"; /** @@ -1264,7 +1286,7 @@ shared static this() static if (need_wsa) { logTrace("init winsock"); // initialize WinSock2 - import std.c.windows.winsock; + import core.sys.windows.winsock2; WSADATA data; WSAStartup(0x0202, &data); @@ -1294,7 +1316,7 @@ shared static this() version(VibeIdleCollect) { logTrace("setup gc"); - driverCore.setupGcTimer(); + setupGcTimer(); } version (VibeNoDefaultArgs) {} diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index bbabe2c..18c86b3 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -34,7 +34,7 @@ NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFa NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true) { import std.socket : parseAddress; - version (Windows) import std.c.windows.winsock : sockaddr_in, sockaddr_in6; + version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6; else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; enforce(host.length > 0, "Host name must not be empty."); @@ -683,6 +683,9 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration Represents a listening TCP socket. */ struct TCPListener { + // FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking + // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, + // stopListening() needs to keep working. private { StreamListenSocketFD m_socket; NetworkAddress m_bindAddress; @@ -704,7 +707,10 @@ struct TCPListener { /// Stops listening and closes the socket. void stopListening() { - assert(false); + if (m_socket != StreamListenSocketFD.invalid) { + eventDriver.sockets.releaseRef(m_socket); + m_socket = StreamListenSocketFD.invalid; + } } } @@ -765,6 +771,27 @@ struct UDPConnection { return naddr; } + /** Set IP multicast loopback mode. + + This is on by default. All packets send will also loopback if enabled. + Useful if more than one application is running on same host and both need each other's packets. + */ + @property void multicastLoopback(bool loop) + { + assert(false, "not implemented."); + } + + /** Become a member of an IP multicast group. + + The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255. + See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12 + and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml + */ + void addMembership(ref NetworkAddress multiaddr) + { + assert(false, "not implemented."); + } + /** Stops listening for datagrams and frees all resources. */ void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; } diff --git a/source/vibe/core/path.d b/source/vibe/core/path.d index 116749f..27d0085 100644 --- a/source/vibe/core/path.d +++ b/source/vibe/core/path.d @@ -480,7 +480,7 @@ struct GenericPath(F) { auto b = Format.getBackNode(m_path); static const Exception e = new Exception("Path has no parent path"); if (b.length >= m_path.length) throw e; - return GenericPath.fromTrustedString(m_path[0 .. b.length]); + return GenericPath.fromTrustedString(m_path[0 .. $ - b.length]); } /** Removes any redundant path segments and replaces all separators by the @@ -722,6 +722,29 @@ unittest { assert(p.toString() == "/foo%2fbar/baz%2Fbam", p.toString); } +unittest { + assert(!PosixPath("").hasParentPath); + assert(!PosixPath("/").hasParentPath); + assert(!PosixPath("foo\\bar").hasParentPath); + assert(PosixPath("foo/bar").parentPath.toString() == "foo/"); + assert(PosixPath("./foo").parentPath.toString() == "./"); + assert(PosixPath("./foo").parentPath.toString() == "./"); + + assert(!WindowsPath("").hasParentPath); + assert(!WindowsPath("/").hasParentPath); + assert(WindowsPath("foo\\bar").parentPath.toString() == "foo\\"); + assert(WindowsPath("foo/bar").parentPath.toString() == "foo/"); + assert(WindowsPath("./foo").parentPath.toString() == "./"); + assert(WindowsPath("./foo").parentPath.toString() == "./"); + + assert(!InetPath("").hasParentPath); + assert(!InetPath("/").hasParentPath); + assert(InetPath("foo/bar").parentPath.toString() == "foo/"); + assert(InetPath("foo/bar%2Fbaz").parentPath.toString() == "foo/"); + assert(InetPath("./foo").parentPath.toString() == "./"); + assert(InetPath("./foo").parentPath.toString() == "./"); +} + /// Thrown when an invalid string representation of a path is detected. class PathValidationException : Exception { this(string text, string file = __FILE__, size_t line = cast(size_t)__LINE__, Throwable next = null) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 39c6b99..a3b9976 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -799,8 +799,8 @@ struct ManualEvent { private { int m_emitCount; static struct Waiters { - StackSList!ThreadLocalWaiter active; - StackSList!ThreadLocalWaiter free; + StackSList!ThreadLocalWaiter active; // actively waiting + StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(SpinLock)) m_waiters; } @@ -955,7 +955,7 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : theAllocator, make; + import vibe.internal.allocator : processAllocator, make; import core.memory : GC; ThreadLocalWaiter* w; @@ -965,6 +965,8 @@ struct ManualEvent { active.iterate((aw) { if (aw.m_driver is drv) { w = aw; + w.addRef(); + return false; } return true; }); @@ -973,6 +975,7 @@ struct ManualEvent { free.filter((fw) { if (fw.m_driver is drv) { w = fw; + w.addRef(); return false; } return true; @@ -981,7 +984,7 @@ struct ManualEvent { if (!w) { () @trusted { try { - w = theAllocator.make!ThreadLocalWaiter; + w = processAllocator.make!ThreadLocalWaiter; w.m_driver = drv; w.m_event = ms_threadEvent; GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof); @@ -996,9 +999,12 @@ struct ManualEvent { } scope (exit) { - if (w.unused) { + if (!w.releaseRef()) { + assert(w.m_driver is drv); + assert(w.unused); with (m_waiters.lock) { - active.remove(w); + auto rmvd = active.remove(w); + assert(rmvd); free.add(w); // TODO: cap size of m_freeWaiters } @@ -1026,6 +1032,62 @@ unittest { runEventLoop(); } +unittest { + import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep; + import vibe.core.taskpool : TaskPool; + import core.time : msecs, usecs; + import std.concurrency : send, receiveOnly; + import std.random : uniform; + + auto tpool = new shared TaskPool(4); + scope (exit) tpool.terminate(); + + static void test(shared(ManualEvent)* evt, Task owner) + { + owner.tid.send(Task.getThis()); + + int ec = evt.emitCount; + auto thist = Task.getThis(); + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + while (ec < 5_000) { + tm.rearm(500.msecs); + sleep(uniform(0, 10_000).usecs); + if (uniform(0, 10) == 0) evt.emit(); + auto ecn = evt.wait(ec); + assert(ecn > ec); + ec = ecn; + } + } + + auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); }); + scope (exit) watchdog.stop(); + + auto e = createSharedManualEvent(); + Task[] tasks; + + runTask({ + auto thist = Task.getThis(); + + // start 25 tasks in each thread + foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist); + // collect all task handles + foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task; + + auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog + scope (exit) tm.stop(); + int pec = 0; + while (e.emitCount < 5_000) { + tm.rearm(500.msecs); + sleep(50.usecs); + e.emit(); + } + + // wait for all worker tasks to finish + foreach (t; tasks) t.join(); + }).join(); +} + package shared struct Monitor(T, M) { alias Mutex = M; @@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter { NativeEventDriver m_driver; EventID m_event = EventID.invalid; Waiter* m_waiters; + int m_refCount = 1; } this(this) @@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter { @property bool unused() const @safe nothrow { return m_waiters is null; } + void addRef() @safe nothrow { m_refCount++; } + bool releaseRef() @safe nothrow { return --m_refCount > 0; } + bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) @safe { import std.datetime : SysTime, Clock, UTC; @@ -1249,6 +1315,7 @@ private struct StackSList(T) void add(T* elem) { + debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; }); elem.next = m_first; m_first = elem; } diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 3954339..93fa65f 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -228,7 +228,7 @@ struct TaskLocal(T) } if (m_hasInitValue) { - static if (__traits(compiles, emplace!T(data, m_initValue))) + static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ())) () @trusted { emplace!T(data, m_initValue); } (); else assert(false, "Cannot emplace initialization value for type "~T.stringof); } else () @trusted { emplace!T(data); } (); @@ -262,7 +262,13 @@ enum TaskEvent { fail /// Ended with an exception } +struct TaskCreationInfo { + Task handle; + const(void)* functionPointer; +} + alias TaskEventCallback = void function(TaskEvent, Task) nothrow; +alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe; /** The maximum combined size of all parameters passed to a task delegate @@ -314,6 +320,7 @@ final package class TaskFiber : Fiber { package TaskFuncInfo m_taskFunc; package __gshared size_t ms_taskStackSize = defaultTaskStackSize; package __gshared debug TaskEventCallback ms_taskEventCallback; + package __gshared debug TaskCreationCallback ms_taskCreationCallback; this() @trusted nothrow { @@ -323,9 +330,7 @@ final package class TaskFiber : Fiber { static TaskFiber getThis() @safe nothrow { - auto f = () @trusted nothrow { - return Fiber.getThis(); - } (); + auto f = () @trusted nothrow { return Fiber.getThis(); } (); if (auto tf = cast(TaskFiber)f) return tf; if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber; return ms_globalDummyFiber; @@ -508,6 +513,7 @@ package struct TaskFuncInfo { void function(ref TaskFuncInfo) func; void[2*size_t.sizeof] callable; void[maxTaskParameterSize] args; + debug ulong functionPointer; void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) { @@ -515,13 +521,17 @@ package struct TaskFuncInfo { import std.algorithm : move; import std.traits : hasElaborateAssign; + import std.conv : to; static struct TARGS { ARGS expand; } - static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); + static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length, + "Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof); static assert(TARGS.sizeof <= maxTaskParameterSize, "The arguments passed to run(Worker)Task must not exceed "~ - maxTaskParameterSize.to!string~" bytes in total size."); + maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes"); + + debug functionPointer = callPointer(callable); static void callDelegate(ref TaskFuncInfo tfi) { assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); @@ -582,6 +592,16 @@ package struct TaskFuncInfo { } } +private ulong callPointer(C)(ref C callable) +@trusted nothrow @nogc { + alias IP = ulong; + static if (is(C == function)) return cast(IP)cast(void*)callable; + else static if (is(C == delegate)) return cast(IP)callable.funcptr; + else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall; + else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr; + else return cast(IP)&callable; +} + package struct TaskScheduler { import eventcore.driver : ExitReason; import eventcore.core : eventDriver; diff --git a/tests/vibe.core.net.1726.d b/tests/vibe.core.net.1726.d index db79516..394ebc1 100644 --- a/tests/vibe.core.net.1726.d +++ b/tests/vibe.core.net.1726.d @@ -10,12 +10,11 @@ import vibe.core.net; import core.time : msecs; import vibe.core.log; -void main() -{ - bool done = false; - auto buf = new ubyte[512*1024*1024]; +ubyte[] buf; - listenTCP(11375,(conn) { +void performTest(bool reverse) +{ + auto l = listenTCP(11375, (conn) { bool read_ex = false; bool write_ex = false; auto rt = runTask!TCPConnection((conn) { @@ -29,10 +28,10 @@ void main() } // expected }, conn); auto wt = runTask!TCPConnection((conn) { - sleep(1.msecs); // give the connection time to establish + sleep(reverse ? 100.msecs : 20.msecs); // give the connection time to establish try { conn.write(buf); - assert(false, "Expected read() to throw an exception."); + assert(false, "Expected write() to throw an exception."); } catch (Exception) { write_ex = true; conn.close(); @@ -44,24 +43,28 @@ void main() wt.join(); assert(read_ex, "No read exception thrown"); assert(write_ex, "No write exception thrown"); - done = true; + logInfo("Test has finished successfully."); + exitEventLoop(); }, "127.0.0.1"); runTask({ try { auto conn = connectTCP("127.0.0.1", 11375); - sleep(10.msecs); + sleep(reverse ? 20.msecs : 100.msecs); conn.close(); } catch (Exception e) assert(false, e.msg); - sleep(50.msecs); - assert(done, "Not done"); - - exitEventLoop(); }); - setTimer(2000.msecs, { - assert(false, "Test has hung."); - }); + runEventLoop(); - runApplication(); + l.stopListening(); +} + +void main() +{ + setTimer(10000.msecs, { assert(false, "Test has hung."); }); + buf = new ubyte[512*1024*1024]; + + performTest(false); + performTest(true); }