Big refactoring step.

- Moves a lot of stuff from vibe.core.core to vibe.core.task
- Introduces TaskScheduler to unify the scheduling process
- Refines how tasks are scheduled and processed (can push to the front of the task queue and uses a marker task to keep track of the spot up to which to process)
- Start to add proper support for task interrupts and timeouts by properly cancelling in-flight async operations
- Work on ManualEvent - still not functional for the shared case
- Implement proper IP address parsing in NetworkAddress
This commit is contained in:
Sönke Ludwig 2016-06-14 08:01:03 +02:00
parent c3857d1bc9
commit 3b0e4e0452
6 changed files with 1097 additions and 678 deletions

View file

@ -126,7 +126,7 @@ class LocalTaskSemaphore
//import vibe.utils.memory;
private {
struct Waiter {
static struct Waiter {
ManualEvent signal;
ubyte priority;
uint seq;
@ -632,28 +632,126 @@ ManualEvent createManualEvent()
{
return ManualEvent.init;
}
/// ditto
shared(ManualEvent) createSharedManualEvent()
{
return shared(ManualEvent).init;
}
/** A manually triggered cross-task event.
Note: the ownership can be shared between multiple fibers and threads.
*/
struct ManualEvent {
import core.thread : Thread;
import vibe.internal.async : asyncAwait, asyncAwaitUninterruptible;
private {
static struct Waiter {
Waiter* next;
immutable EventID event;
immutable EventDriver driver;
immutable Thread thread;
StackSList!ThreadWaiter tasks;
}
static struct ThreadWaiter {
ThreadWaiter* next;
Task task;
void delegate() @safe nothrow notifier;
void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; }
void cancel() @safe nothrow { notifier = null; }
void wait(void delegate() @safe nothrow del)
shared @safe nothrow {
notifier = del;
if (!next) eventDriver.waitForEvent(ms_threadEvent, &onEvent);
}
private void onEvent(EventID event)
@safe nothrow {
assert(event == ms_threadEvent);
notifier();
}
}
int m_emitCount;
Waiter* m_waiters;
}
// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;
enum EmitMode {
single,
all
}
//@disable this(this);
deprecated("ManualEvent is always non-null!")
bool opCast() const nothrow { return true; }
int emitCount() const nothrow { return 0; }
int emit() nothrow { return 0; }
int wait() { assert(false); }
int wait(int) { import vibe.core.core : sleep; sleep(30.seconds); assert(false); }
int wait(Duration, int) { assert(false); }
int waitUninterruptible() nothrow { assert(false); }
int waitUninterruptible(int) nothrow { assert(false); }
int waitUninterruptible(Duration, int) nothrow { assert(false); }
}
/+interface ManualEvent {
deprecated("ManualEvent is always non-null!")
bool opCast() const shared nothrow { return true; }
/// A counter that is increased with every emit() call
@property int emitCount() const nothrow;
int emitCount() const nothrow { return m_emitCount; }
/// ditto
int emitCount() const shared nothrow { return atomicLoad(m_emitCount); }
/// Emits the signal, waking up all owners of the signal.
void emit() nothrow;
int emit(EmitMode mode = EmitMode.all)
shared nothrow {
import core.atomic : atomicOp, cas;
auto ec = atomicOp!"+="(m_emitCount, 1);
auto thisthr = Thread.getThis();
final switch (mode) {
case EmitMode.all:
// FIXME: would be nice to have atomicSwap instead
auto w = cast(Waiter*)atomicLoad(m_waiters);
if (w !is null && !cas(&m_waiters, cast(shared(Waiter)*)w, cast(shared(Waiter)*)null))
return ec;
while (w !is null) {
if (w.thread is thisthr) {
// Note: emitForThisThread can result in w getting deallocated at any
// time, so we need to copy any fields first
auto tasks = w.tasks;
w = w.next;
emitForThisThread(w.tasks.m_first, mode);
} else {
auto evt = w.event;
w = w.next;
eventDriver.triggerEvent(evt, true);
}
}
break;
case EmitMode.single:
assert(false);
}
return ec;
}
/// ditto
int emit(EmitMode mode = EmitMode.all)
nothrow {
auto ec = m_emitCount++;
final switch (mode) {
case EmitMode.all:
auto w = m_waiters;
m_waiters = null;
if (w !is null) {
assert(w.thread is Thread.getThis(), "Unshared ManualEvent has waiters in foreign thread!");
assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!");
emitForThisThread(w.tasks.m_first, EmitMode.all);
}
break;
case EmitMode.single:
assert(false);
}
return ec;
}
/** Acquires ownership and waits until the signal is emitted.
@ -661,7 +759,9 @@ struct ManualEvent {
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
void wait();
int wait() { return wait(this.emitCount); }
/// ditto
int wait() shared { return wait(this.emitCount); }
/** Acquires ownership and waits until the emit count differs from the given one.
@ -669,7 +769,9 @@ struct ManualEvent {
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(int reference_emit_count);
int wait(int emit_count) { return wait(Duration.max, emit_count); }
/// ditto
int wait(int emit_count) shared { return wait(Duration.max, emit_count); }
/** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reaced.
@ -677,32 +779,218 @@ struct ManualEvent {
May throw an $(D InterruptException) if the task gets interrupted
using $(D Task.interrupt()).
*/
int wait(Duration timeout, int reference_emit_count);
int wait(Duration timeout, int emit_count)
{
Waiter w;
ThreadWaiter tw;
int ec = this.emitCount;
while (ec <= emit_count) {
// wait for getting resumed directly by emit/emitForThisThread
acquireWaiter(w, tw);
asyncAwait!(void delegate() @safe nothrow,
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
}
return ec;
}
/// ditto
int wait(Duration timeout, int emit_count)
shared {
shared(Waiter) w;
ThreadWaiter tw;
acquireWaiter(w, tw);
int ec = this.emitCount;
while (ec <= emit_count) {
if (tw.next) {
// if we are not the first waiter for this thread,
// wait for getting resumed by emitForThisThread
asyncAwait!(void delegate() @safe nothrow,
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
} else {
// if we are the first waiter for this thread,
// wait for the thread event to get emitted
/*asyncAwait!(EventCallback, void delegate() @safe nothrow,
cb => eventDriver.waitForEvent(ms_threadEvent, cb),
cb => tw.wait(cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent)
)(timeout);
emitForThisThread(w.waiters);
ec = this.emitCount;*/
assert(false);
}
}
return ec;
}
/** Same as $(D wait), but defers throwing any $(D InterruptException).
This method is annotated $(D nothrow) at the expense that it cannot be
interrupted.
*/
int waitUninterruptible(int reference_emit_count) nothrow;
int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
///
int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
/// ditto
int waitUninterruptible(Duration timeout, int reference_emit_count) nothrow;
}+/
int waitUninterruptible(int emit_count) nothrow { return waitUninterruptible(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(int emit_count) shared nothrow { return waitUninterruptible(Duration.max, emit_count); }
/// ditto
int waitUninterruptible(Duration timeout, int emit_count)
nothrow {
Waiter w;
ThreadWaiter tw;
acquireWaiter(w, tw);
int ec = this.emitCount;
while (ec <= emit_count) {
asyncAwaitUninterruptible!(void delegate(),
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
}
return ec;
}
/// ditto
int waitUninterruptible(Duration timeout, int emit_count)
shared nothrow {
/*Waiter w;
ThreadWaiter tw;
auto event = acquireWaiter(w, tw);
int ec = this.emitCount;
while (ec <= emit_count) {
asyncAwaitUninterruptible!(void delegate(),
cb => tw.wait(cb),
cb => tw.cancel()
)(timeout);
ec = this.emitCount;
}
return ec;*/
assert(false);
}
private static bool emitForThisThread(ThreadWaiter* waiters, EmitMode mode)
nothrow {
if (!waiters) return false;
final switch (mode) {
case EmitMode.all:
while (waiters) {
if (waiters.notifier !is null)
waiters.notifier();
waiters = waiters.next;
}
break;
case EmitMode.single:
assert(false, "TODO!");
}
return true;
}
private void acquireWaiter(ref Waiter w, ref ThreadWaiter tw)
nothrow {
// FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the Waiter will be dangling
tw.task = Task.getThis();
if (m_waiters) {
m_waiters.tasks.add(&tw);
} else {
m_waiters = &w;
}
}
private void acquireWaiter(ref shared(Waiter) w, ref ThreadWaiter tw)
nothrow shared {
tw.task = Task.getThis();
if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.createEvent();
if (m_waiters) {
//m_waiters.tasks.add(&tw);
assert(false);
} else {
m_waiters = &w;
}
}
}
private struct StackSList(T)
{
import core.atomic : cas;
private T* m_first;
@property T* first() { return m_first; }
@property shared(T)* first() shared { return atomicLoad(m_first); }
void add(shared(T)* elem)
shared {
do elem.next = atomicLoad(m_first);
while (cas(&m_first, elem.next, elem));
}
void remove(shared(T)* elem)
shared {
while (true) {
shared(T)* w = atomicLoad(m_first), wp;
while (w !is elem) {
wp = w;
w = atomicLoad(w.next);
}
if (wp !is null) {
if (cas(&wp.next, w, w.next))
break;
} else {
if (cas(&m_first, w, w.next))
break;
}
}
}
bool empty() const { return m_first is null; }
void add(T* elem)
{
elem.next = m_first;
m_first = elem;
}
void remove(T* elem)
{
T* w = m_first, wp;
while (w !is elem) {
assert(w !is null);
wp = w;
w = w.next;
}
if (wp) wp.next = w.next;
else m_first = w.next;
}
}
private struct TaskMutexImpl(bool INTERRUPTIBLE) {
import std.stdio;
private {
shared(bool) m_locked = false;
shared(uint) m_waiters = 0;
ManualEvent m_signal;
shared(ManualEvent) m_signal;
debug Task m_owner;
}
void setup()
{
m_signal = createManualEvent();
m_signal = createSharedManualEvent();
}
@ -751,13 +1039,13 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
Task m_owner;
size_t m_recCount = 0;
shared(uint) m_waiters = 0;
ManualEvent m_signal;
shared(ManualEvent) m_signal;
@property bool m_locked() const { return m_recCount > 0; }
}
void setup()
{
m_signal = createManualEvent();
m_signal = createSharedManualEvent();
m_mutex = new core.sync.mutex.Mutex;
}
@ -812,7 +1100,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
private {
LOCKABLE m_mutex;
ManualEvent m_signal;
shared(ManualEvent) m_signal;
}
static if (is(LOCKABLE == Lockable)) {
@ -833,7 +1121,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
void setup(LOCKABLE mtx)
{
m_mutex = mtx;
m_signal = createManualEvent();
m_signal = createSharedManualEvent();
}
@property LOCKABLE mutex() { return m_mutex; }
@ -955,9 +1243,9 @@ private struct ReadWriteMutexState(bool INTERRUPTIBLE)
//Queue Events
/** The event used to wake reading tasks waiting for the lock while it is blocked. */
ManualEvent m_readyForReadLock;
shared(ManualEvent) m_readyForReadLock;
/** The event used to wake writing tasks waiting for the lock while it is blocked. */
ManualEvent m_readyForWriteLock;
shared(ManualEvent) m_readyForWriteLock;
/** The underlying mutex that gates the access to the shared state. */
Mutex m_counterMutex;
@ -967,8 +1255,8 @@ private struct ReadWriteMutexState(bool INTERRUPTIBLE)
{
m_policy = policy;
m_counterMutex = new Mutex();
m_readyForReadLock = createManualEvent();
m_readyForWriteLock = createManualEvent();
m_readyForReadLock = createSharedManualEvent();
m_readyForWriteLock = createSharedManualEvent();
}
@disable this(this);