Merge branch 'master' into trailing-whitespace

This commit is contained in:
Sönke Ludwig 2017-07-19 00:08:33 +02:00 committed by GitHub
commit fcf98c2016
10 changed files with 217 additions and 42 deletions

View file

@ -18,7 +18,8 @@ d:
env:
- CONFIG=select
- CONFIG=epoll
- CONFIG=libasync
# disabled until the libasync driver of eventcore is more than a stub
#- CONFIG=libasync
matrix:
allow_failures:

View file

@ -1,4 +1,12 @@
1.0.0 - 2016-06-
1.1.0 - 2016-07-16
==================
- Added a new debug hook `setTaskCreationCallback`
- Fixed a compilation error for `VibeIdleCollect`
- Fixed a possible double-free in `ManualEvent` that resulted in an endless loop - [pull #23][issue23]
1.0.0 - 2016-07-10
==================
This is the initial release of the `vibe-core` package. The source code was derived from the original `:core` sub package of vibe.d and received a complete work over, mostly under the surface, but also in parts of the API. The changes have been made in a way that is usually backwards compatible from the point of view of an application developer. At the same time, vibe.d 0.8.0 contains a number of forward compatibility declarations, so that switching back and forth between the still existing `vibe-d:core` and `vibe-core` is possible without changing the application code.
@ -6,12 +14,12 @@ This is the initial release of the `vibe-core` package. The source code was deri
To use this package, it is currently necessary to put an explicit dependency with a sub configuration directive in the DUB package recipe:
```
// for dub.sdl:
dependency "vibe-d:core" version="~>0.8.0-rc"
dependency "vibe-d:core" version="~>0.8.0"
subConfiguration "vibe-d:core" "vibe-core"
// for dub.json:
"dependencies": {
"vibe-d:core": "~>0.8.0-rc"
"vibe-d:core": "~>0.8.0"
},
"subConfigurations": {
"vibe-d:core": "vibe-core"

View file

@ -40,6 +40,7 @@ The following compilers are tested and supported:
- DMD 2.072.2
- DMD 2.071.2
- DMD 2.070.2
- LDC 1.3.0
- LDC 1.2.0
- LDC 1.1.0
- LDC 1.0.0

View file

@ -26,10 +26,13 @@ environment:
DVersion: 1.0.0
arch: x64
matrix:
allow_failures:
- DC: ldc
DVersion: 1.0.0
arch: x64
skip_tags: false
branches:
only:
- master
install:
- ps: function SetUpDCompiler

View file

@ -373,6 +373,13 @@ package Task runTask_internal(alias TFI_SETUP)()
f.bumpTaskCounter();
auto handle = f.task();
debug if (TaskFiber.ms_taskCreationCallback) {
TaskCreationInfo info;
info.handle = handle;
info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } ();
() @trusted { TaskFiber.ms_taskCreationCallback(info); } ();
}
debug if (TaskFiber.ms_taskEventCallback) {
() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
}
@ -916,11 +923,26 @@ void setTaskEventCallback(TaskEventCallback func)
debug TaskFiber.ms_taskEventCallback = func;
}
/**
Sets a callback that is invoked whenever new task is created.
The callback is guaranteed to be invoked before the one set by
`setTaskEventCallback` for the same task handle.
This function is useful mostly for implementing debuggers that
analyze the life time of tasks, including task switches. Note that
the callback will only be called for debug builds.
*/
void setTaskCreationCallback(TaskCreationCallback func)
{
debug TaskFiber.ms_taskCreationCallback = func;
}
/**
A version string representing the current vibe version
*/
enum vibeVersionString = "1.0.0";
enum vibeVersionString = "1.1.0";
/**
@ -1264,7 +1286,7 @@ shared static this()
static if (need_wsa) {
logTrace("init winsock");
// initialize WinSock2
import std.c.windows.winsock;
import core.sys.windows.winsock2;
WSADATA data;
WSAStartup(0x0202, &data);
@ -1294,7 +1316,7 @@ shared static this()
version(VibeIdleCollect) {
logTrace("setup gc");
driverCore.setupGcTimer();
setupGcTimer();
}
version (VibeNoDefaultArgs) {}

View file

@ -34,7 +34,7 @@ NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFa
NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true)
{
import std.socket : parseAddress;
version (Windows) import std.c.windows.winsock : sockaddr_in, sockaddr_in6;
version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6;
else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6;
enforce(host.length > 0, "Host name must not be empty.");
@ -683,6 +683,9 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration
Represents a listening TCP socket.
*/
struct TCPListener {
// FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking
// the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
// stopListening() needs to keep working.
private {
StreamListenSocketFD m_socket;
NetworkAddress m_bindAddress;
@ -704,7 +707,10 @@ struct TCPListener {
/// Stops listening and closes the socket.
void stopListening()
{
assert(false);
if (m_socket != StreamListenSocketFD.invalid) {
eventDriver.sockets.releaseRef(m_socket);
m_socket = StreamListenSocketFD.invalid;
}
}
}
@ -765,6 +771,27 @@ struct UDPConnection {
return naddr;
}
/** Set IP multicast loopback mode.
This is on by default. All packets send will also loopback if enabled.
Useful if more than one application is running on same host and both need each other's packets.
*/
@property void multicastLoopback(bool loop)
{
assert(false, "not implemented.");
}
/** Become a member of an IP multicast group.
The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255.
See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12
and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
*/
void addMembership(ref NetworkAddress multiaddr)
{
assert(false, "not implemented.");
}
/** Stops listening for datagrams and frees all resources.
*/
void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; }

View file

@ -480,7 +480,7 @@ struct GenericPath(F) {
auto b = Format.getBackNode(m_path);
static const Exception e = new Exception("Path has no parent path");
if (b.length >= m_path.length) throw e;
return GenericPath.fromTrustedString(m_path[0 .. b.length]);
return GenericPath.fromTrustedString(m_path[0 .. $ - b.length]);
}
/** Removes any redundant path segments and replaces all separators by the
@ -722,6 +722,29 @@ unittest {
assert(p.toString() == "/foo%2fbar/baz%2Fbam", p.toString);
}
unittest {
assert(!PosixPath("").hasParentPath);
assert(!PosixPath("/").hasParentPath);
assert(!PosixPath("foo\\bar").hasParentPath);
assert(PosixPath("foo/bar").parentPath.toString() == "foo/");
assert(PosixPath("./foo").parentPath.toString() == "./");
assert(PosixPath("./foo").parentPath.toString() == "./");
assert(!WindowsPath("").hasParentPath);
assert(!WindowsPath("/").hasParentPath);
assert(WindowsPath("foo\\bar").parentPath.toString() == "foo\\");
assert(WindowsPath("foo/bar").parentPath.toString() == "foo/");
assert(WindowsPath("./foo").parentPath.toString() == "./");
assert(WindowsPath("./foo").parentPath.toString() == "./");
assert(!InetPath("").hasParentPath);
assert(!InetPath("/").hasParentPath);
assert(InetPath("foo/bar").parentPath.toString() == "foo/");
assert(InetPath("foo/bar%2Fbaz").parentPath.toString() == "foo/");
assert(InetPath("./foo").parentPath.toString() == "./");
assert(InetPath("./foo").parentPath.toString() == "./");
}
/// Thrown when an invalid string representation of a path is detected.
class PathValidationException : Exception {
this(string text, string file = __FILE__, size_t line = cast(size_t)__LINE__, Throwable next = null)

View file

@ -799,8 +799,8 @@ struct ManualEvent {
private {
int m_emitCount;
static struct Waiters {
StackSList!ThreadLocalWaiter active;
StackSList!ThreadLocalWaiter free;
StackSList!ThreadLocalWaiter active; // actively waiting
StackSList!ThreadLocalWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(SpinLock)) m_waiters;
}
@ -955,7 +955,7 @@ struct ManualEvent {
private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : theAllocator, make;
import vibe.internal.allocator : processAllocator, make;
import core.memory : GC;
ThreadLocalWaiter* w;
@ -965,6 +965,8 @@ struct ManualEvent {
active.iterate((aw) {
if (aw.m_driver is drv) {
w = aw;
w.addRef();
return false;
}
return true;
});
@ -973,6 +975,7 @@ struct ManualEvent {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
@ -981,7 +984,7 @@ struct ManualEvent {
if (!w) {
() @trusted {
try {
w = theAllocator.make!ThreadLocalWaiter;
w = processAllocator.make!ThreadLocalWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
GC.addRange(cast(void*)w, ThreadLocalWaiter.sizeof);
@ -996,9 +999,12 @@ struct ManualEvent {
}
scope (exit) {
if (w.unused) {
if (!w.releaseRef()) {
assert(w.m_driver is drv);
assert(w.unused);
with (m_waiters.lock) {
active.remove(w);
auto rmvd = active.remove(w);
assert(rmvd);
free.add(w);
// TODO: cap size of m_freeWaiters
}
@ -1026,6 +1032,62 @@ unittest {
runEventLoop();
}
unittest {
import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
import vibe.core.taskpool : TaskPool;
import core.time : msecs, usecs;
import std.concurrency : send, receiveOnly;
import std.random : uniform;
auto tpool = new shared TaskPool(4);
scope (exit) tpool.terminate();
static void test(shared(ManualEvent)* evt, Task owner)
{
owner.tid.send(Task.getThis());
int ec = evt.emitCount;
auto thist = Task.getThis();
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
while (ec < 5_000) {
tm.rearm(500.msecs);
sleep(uniform(0, 10_000).usecs);
if (uniform(0, 10) == 0) evt.emit();
auto ecn = evt.wait(ec);
assert(ecn > ec);
ec = ecn;
}
}
auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
scope (exit) watchdog.stop();
auto e = createSharedManualEvent();
Task[] tasks;
runTask({
auto thist = Task.getThis();
// start 25 tasks in each thread
foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
// collect all task handles
foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
scope (exit) tm.stop();
int pec = 0;
while (e.emitCount < 5_000) {
tm.rearm(500.msecs);
sleep(50.usecs);
e.emit();
}
// wait for all worker tasks to finish
foreach (t; tasks) t.join();
}).join();
}
package shared struct Monitor(T, M)
{
alias Mutex = M;
@ -1112,6 +1174,7 @@ private struct ThreadLocalWaiter {
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
Waiter* m_waiters;
int m_refCount = 1;
}
this(this)
@ -1128,6 +1191,9 @@ private struct ThreadLocalWaiter {
@property bool unused() const @safe nothrow { return m_waiters is null; }
void addRef() @safe nothrow { m_refCount++; }
bool releaseRef() @safe nothrow { return --m_refCount > 0; }
bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
@safe {
import std.datetime : SysTime, Clock, UTC;
@ -1249,6 +1315,7 @@ private struct StackSList(T)
void add(T* elem)
{
debug iterate((el) { assert(el !is elem, "Double-insertion of list element."); return true; });
elem.next = m_first;
m_first = elem;
}

View file

@ -228,7 +228,7 @@ struct TaskLocal(T)
}
if (m_hasInitValue) {
static if (__traits(compiles, emplace!T(data, m_initValue)))
static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ()))
() @trusted { emplace!T(data, m_initValue); } ();
else assert(false, "Cannot emplace initialization value for type "~T.stringof);
} else () @trusted { emplace!T(data); } ();
@ -262,7 +262,13 @@ enum TaskEvent {
fail /// Ended with an exception
}
struct TaskCreationInfo {
Task handle;
const(void)* functionPointer;
}
alias TaskEventCallback = void function(TaskEvent, Task) nothrow;
alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe;
/**
The maximum combined size of all parameters passed to a task delegate
@ -314,6 +320,7 @@ final package class TaskFiber : Fiber {
package TaskFuncInfo m_taskFunc;
package __gshared size_t ms_taskStackSize = defaultTaskStackSize;
package __gshared debug TaskEventCallback ms_taskEventCallback;
package __gshared debug TaskCreationCallback ms_taskCreationCallback;
this()
@trusted nothrow {
@ -323,9 +330,7 @@ final package class TaskFiber : Fiber {
static TaskFiber getThis()
@safe nothrow {
auto f = () @trusted nothrow {
return Fiber.getThis();
} ();
auto f = () @trusted nothrow { return Fiber.getThis(); } ();
if (auto tf = cast(TaskFiber)f) return tf;
if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber;
return ms_globalDummyFiber;
@ -508,6 +513,7 @@ package struct TaskFuncInfo {
void function(ref TaskFuncInfo) func;
void[2*size_t.sizeof] callable;
void[maxTaskParameterSize] args;
debug ulong functionPointer;
void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
{
@ -515,13 +521,17 @@ package struct TaskFuncInfo {
import std.algorithm : move;
import std.traits : hasElaborateAssign;
import std.conv : to;
static struct TARGS { ARGS expand; }
static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length);
static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length,
"Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof);
static assert(TARGS.sizeof <= maxTaskParameterSize,
"The arguments passed to run(Worker)Task must not exceed "~
maxTaskParameterSize.to!string~" bytes in total size.");
maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes");
debug functionPointer = callPointer(callable);
static void callDelegate(ref TaskFuncInfo tfi) {
assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
@ -582,6 +592,16 @@ package struct TaskFuncInfo {
}
}
private ulong callPointer(C)(ref C callable)
@trusted nothrow @nogc {
alias IP = ulong;
static if (is(C == function)) return cast(IP)cast(void*)callable;
else static if (is(C == delegate)) return cast(IP)callable.funcptr;
else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall;
else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr;
else return cast(IP)&callable;
}
package struct TaskScheduler {
import eventcore.driver : ExitReason;
import eventcore.core : eventDriver;

View file

@ -10,12 +10,11 @@ import vibe.core.net;
import core.time : msecs;
import vibe.core.log;
void main()
{
bool done = false;
auto buf = new ubyte[512*1024*1024];
ubyte[] buf;
listenTCP(11375,(conn) {
void performTest(bool reverse)
{
auto l = listenTCP(11375, (conn) {
bool read_ex = false;
bool write_ex = false;
auto rt = runTask!TCPConnection((conn) {
@ -29,10 +28,10 @@ void main()
} // expected
}, conn);
auto wt = runTask!TCPConnection((conn) {
sleep(1.msecs); // give the connection time to establish
sleep(reverse ? 100.msecs : 20.msecs); // give the connection time to establish
try {
conn.write(buf);
assert(false, "Expected read() to throw an exception.");
assert(false, "Expected write() to throw an exception.");
} catch (Exception) {
write_ex = true;
conn.close();
@ -44,24 +43,28 @@ void main()
wt.join();
assert(read_ex, "No read exception thrown");
assert(write_ex, "No write exception thrown");
done = true;
logInfo("Test has finished successfully.");
exitEventLoop();
}, "127.0.0.1");
runTask({
try {
auto conn = connectTCP("127.0.0.1", 11375);
sleep(10.msecs);
sleep(reverse ? 20.msecs : 100.msecs);
conn.close();
} catch (Exception e) assert(false, e.msg);
sleep(50.msecs);
assert(done, "Not done");
exitEventLoop();
});
setTimer(2000.msecs, {
assert(false, "Test has hung.");
});
runEventLoop();
runApplication();
l.stopListening();
}
void main()
{
setTimer(10000.msecs, { assert(false, "Test has hung."); });
buf = new ubyte[512*1024*1024];
performTest(false);
performTest(true);
}