vibe-core/source/vibe/core/task.d

154 lines
4.5 KiB
D
Raw Normal View History

/**
Contains interfaces and enums for evented I/O drivers.
Copyright: © 2012-2016 RejectedSoftware e.K.
Authors: Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
*/
module vibe.core.task;
import vibe.core.sync;
import vibe.internal.array : FixedRingBuffer;
import core.thread;
import std.exception;
import std.traits;
import std.typecons;
import std.variant;
/** Represents a single task as started using vibe.core.runTask.
Note that the Task type is considered weakly isolated and thus can be
passed between threads using vibe.core.concurrency.send or by passing
it as a parameter to vibe.core.core.runWorkerTask.
*/
struct Task {
private {
shared(TaskFiber) m_fiber;
size_t m_taskCounter;
import std.concurrency : ThreadInfo, Tid;
static ThreadInfo s_tidInfo;
}
private this(TaskFiber fiber, size_t task_counter)
@safe nothrow {
() @trusted { m_fiber = cast(shared)fiber; } ();
m_taskCounter = task_counter;
}
this(in Task other) nothrow { m_fiber = cast(shared(TaskFiber))other.m_fiber; m_taskCounter = other.m_taskCounter; }
/** Returns the Task instance belonging to the calling task.
*/
static Task getThis() nothrow @safe
{
// In 2067, synchronized statements where annotated nothrow.
// DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704
// However, they were "logically" nothrow before.
static if (__VERSION__ <= 2066)
scope (failure) assert(0, "Internal error: function should be nothrow");
auto fiber = () @trusted { return Fiber.getThis(); } ();
if (!fiber) return Task.init;
auto tfiber = cast(TaskFiber)fiber;
assert(tfiber !is null, "Invalid or null fiber used to construct Task handle.");
if (!tfiber.m_running) return Task.init;
return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
}
nothrow {
@property inout(TaskFiber) fiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; }
@property size_t taskCounter() const @safe { return m_taskCounter; }
@property inout(Thread) thread() inout @safe { if (m_fiber) return this.fiber.thread; return null; }
/** Determines if the task is still running.
*/
@property bool running()
const @trusted {
assert(m_fiber !is null, "Invalid task handle");
try if (this.fiber.state == Fiber.State.TERM) return false; catch (Throwable) {}
return this.fiber.m_running && this.fiber.m_taskCounter == m_taskCounter;
}
// FIXME: this is not thread safe!
@property ref ThreadInfo tidInfo() { return m_fiber ? fiber.tidInfo : s_tidInfo; }
@property Tid tid() { return tidInfo.ident; }
}
T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; }
void join() { if (running) fiber.join(); }
void interrupt() { if (running) fiber.interrupt(); }
void terminate() { if (running) fiber.terminate(); }
string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); }
bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
}
/** The base class for a task aka Fiber.
This class represents a single task that is executed concurrently
with other tasks. Each task is owned by a single thread.
*/
class TaskFiber : Fiber {
private {
Thread m_thread;
import std.concurrency : ThreadInfo;
ThreadInfo m_tidInfo;
}
protected {
shared size_t m_taskCounter;
shared bool m_running;
}
protected this(void delegate() fun, size_t stack_size)
nothrow {
super(fun, stack_size);
m_thread = Thread.getThis();
}
/** Returns the thread that owns this task.
*/
@property inout(Thread) thread() inout @safe nothrow { return m_thread; }
/** Returns the handle of the current Task running on this fiber.
*/
@property Task task() @safe nothrow { return Task(this, m_taskCounter); }
@property ref inout(ThreadInfo) tidInfo() inout nothrow { return m_tidInfo; }
/** Blocks until the task has ended.
*/
abstract void join();
/** Throws an InterruptExeption within the task as soon as it calls a blocking function.
*/
abstract void interrupt();
/** Terminates the task without notice as soon as it calls a blocking function.
*/
abstract void terminate();
void bumpTaskCounter()
@safe nothrow {
import core.atomic : atomicOp;
() @trusted { atomicOp!"+="(this.m_taskCounter, 1); } ();
}
}
/** Exception that is thrown by Task.interrupt.
*/
class InterruptException : Exception {
this()
{
super("Task interrupted.");
}
}