7e2d1dd038
The library is able to support simple TCP servers in the current state. The API is still mostly compatible with mainline vibe.d, but the driver systen has been replaced by the eventcore library and sockets/files/timers/... are now structs with automatic reference counting instead of GC collected classes. The stream interfaces have been removed for now.
154 lines
4.5 KiB
D
154 lines
4.5 KiB
D
/**
|
|
Contains interfaces and enums for evented I/O drivers.
|
|
|
|
Copyright: © 2012-2016 RejectedSoftware e.K.
|
|
Authors: Sönke Ludwig
|
|
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
|
|
*/
|
|
module vibe.core.task;
|
|
|
|
import vibe.core.sync;
|
|
import vibe.internal.array : FixedRingBuffer;
|
|
|
|
import core.thread;
|
|
import std.exception;
|
|
import std.traits;
|
|
import std.typecons;
|
|
import std.variant;
|
|
|
|
|
|
/** Represents a single task as started using vibe.core.runTask.
|
|
|
|
Note that the Task type is considered weakly isolated and thus can be
|
|
passed between threads using vibe.core.concurrency.send or by passing
|
|
it as a parameter to vibe.core.core.runWorkerTask.
|
|
*/
|
|
struct Task {
|
|
private {
|
|
shared(TaskFiber) m_fiber;
|
|
size_t m_taskCounter;
|
|
import std.concurrency : ThreadInfo, Tid;
|
|
static ThreadInfo s_tidInfo;
|
|
}
|
|
|
|
private this(TaskFiber fiber, size_t task_counter)
|
|
@safe nothrow {
|
|
() @trusted { m_fiber = cast(shared)fiber; } ();
|
|
m_taskCounter = task_counter;
|
|
}
|
|
|
|
this(in Task other) nothrow { m_fiber = cast(shared(TaskFiber))other.m_fiber; m_taskCounter = other.m_taskCounter; }
|
|
|
|
/** Returns the Task instance belonging to the calling task.
|
|
*/
|
|
static Task getThis() nothrow @safe
|
|
{
|
|
// In 2067, synchronized statements where annotated nothrow.
|
|
// DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704
|
|
// However, they were "logically" nothrow before.
|
|
static if (__VERSION__ <= 2066)
|
|
scope (failure) assert(0, "Internal error: function should be nothrow");
|
|
|
|
auto fiber = () @trusted { return Fiber.getThis(); } ();
|
|
if (!fiber) return Task.init;
|
|
auto tfiber = cast(TaskFiber)fiber;
|
|
assert(tfiber !is null, "Invalid or null fiber used to construct Task handle.");
|
|
if (!tfiber.m_running) return Task.init;
|
|
return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
|
|
}
|
|
|
|
nothrow {
|
|
@property inout(TaskFiber) fiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; }
|
|
@property size_t taskCounter() const @safe { return m_taskCounter; }
|
|
@property inout(Thread) thread() inout @safe { if (m_fiber) return this.fiber.thread; return null; }
|
|
|
|
/** Determines if the task is still running.
|
|
*/
|
|
@property bool running()
|
|
const @trusted {
|
|
assert(m_fiber !is null, "Invalid task handle");
|
|
try if (this.fiber.state == Fiber.State.TERM) return false; catch (Throwable) {}
|
|
return this.fiber.m_running && this.fiber.m_taskCounter == m_taskCounter;
|
|
}
|
|
|
|
// FIXME: this is not thread safe!
|
|
@property ref ThreadInfo tidInfo() { return m_fiber ? fiber.tidInfo : s_tidInfo; }
|
|
@property Tid tid() { return tidInfo.ident; }
|
|
}
|
|
|
|
T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; }
|
|
|
|
void join() { if (running) fiber.join(); }
|
|
void interrupt() { if (running) fiber.interrupt(); }
|
|
void terminate() { if (running) fiber.terminate(); }
|
|
|
|
string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); }
|
|
|
|
bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
|
|
bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
|
|
}
|
|
|
|
|
|
|
|
/** The base class for a task aka Fiber.
|
|
|
|
This class represents a single task that is executed concurrently
|
|
with other tasks. Each task is owned by a single thread.
|
|
*/
|
|
class TaskFiber : Fiber {
|
|
private {
|
|
Thread m_thread;
|
|
import std.concurrency : ThreadInfo;
|
|
ThreadInfo m_tidInfo;
|
|
}
|
|
|
|
protected {
|
|
shared size_t m_taskCounter;
|
|
shared bool m_running;
|
|
}
|
|
|
|
protected this(void delegate() fun, size_t stack_size)
|
|
nothrow {
|
|
super(fun, stack_size);
|
|
m_thread = Thread.getThis();
|
|
}
|
|
|
|
/** Returns the thread that owns this task.
|
|
*/
|
|
@property inout(Thread) thread() inout @safe nothrow { return m_thread; }
|
|
|
|
/** Returns the handle of the current Task running on this fiber.
|
|
*/
|
|
@property Task task() @safe nothrow { return Task(this, m_taskCounter); }
|
|
|
|
@property ref inout(ThreadInfo) tidInfo() inout nothrow { return m_tidInfo; }
|
|
|
|
/** Blocks until the task has ended.
|
|
*/
|
|
abstract void join();
|
|
|
|
/** Throws an InterruptExeption within the task as soon as it calls a blocking function.
|
|
*/
|
|
abstract void interrupt();
|
|
|
|
/** Terminates the task without notice as soon as it calls a blocking function.
|
|
*/
|
|
abstract void terminate();
|
|
|
|
void bumpTaskCounter()
|
|
@safe nothrow {
|
|
import core.atomic : atomicOp;
|
|
() @trusted { atomicOp!"+="(this.m_taskCounter, 1); } ();
|
|
}
|
|
}
|
|
|
|
|
|
/** Exception that is thrown by Task.interrupt.
|
|
*/
|
|
class InterruptException : Exception {
|
|
this()
|
|
{
|
|
super("Task interrupted.");
|
|
}
|
|
}
|