diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index e511c8b..c4ffb62 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -13,8 +13,9 @@ import eventcore.core; import vibe.core.args; import vibe.core.concurrency; import vibe.core.log; -import vibe.core.sync : ManualEvent, createManualEvent; +import vibe.core.sync : ManualEvent, createSharedManualEvent; import vibe.internal.async; +import vibe.internal.array : FixedRingBuffer; //import vibe.utils.array; import std.algorithm; import std.conv; @@ -32,8 +33,6 @@ import core.sync.mutex; import core.stdc.stdlib; import core.thread; -alias TaskEventCb = void function(TaskEvent, Task) nothrow; - version(Posix) { import core.sys.posix.signal; @@ -76,7 +75,7 @@ version (Windows) Tasks will be started and handled from within the event loop. */ int runEventLoop() -{ +@safe nothrow { setupSignalHandlers(); logDebug("Starting event loop."); @@ -84,21 +83,26 @@ int runEventLoop() scope (exit) { s_eventLoopRunning = false; s_exitEventLoop = false; - st_threadShutdownCondition.notifyAll(); + () @trusted nothrow { + scope (failure) assert(false); // notifyAll is not marked nothrow + st_threadShutdownCondition.notifyAll(); + } (); } // runs any yield()ed tasks first - assert(!s_exitEventLoop); + assert(!s_exitEventLoop, "Exit flag set before event loop has started."); s_exitEventLoop = false; - notifyIdle(); + performIdleProcessing(); if (getExitFlag()) return 0; // handle exit flag in the main thread to exit when // exitEventLoop(true) is called from a thread) - if (Thread.getThis() is st_threads[0].thread) - runTask(toDelegate(&watchExitFlag)); + () @trusted nothrow { + if (Thread.getThis() is st_threads[0].thread) + runTask(toDelegate(&watchExitFlag)); + } (); - while (s_yieldedTasks.length || eventDriver.waiterCount) { + while (s_scheduler.scheduledTaskCount || eventDriver.waiterCount) { if (eventDriver.processEvents() == ExitReason.exited) break; } @@ -120,13 +124,15 @@ int runEventLoop() there is no need to set shutdown_all_threads to true. */ void exitEventLoop(bool shutdown_all_threads = false) -{ +@safe nothrow { logDebug("exitEventLoop called (%s)", shutdown_all_threads); - assert(s_eventLoopRunning || shutdown_all_threads); + assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running."); if (shutdown_all_threads) { - atomicStore(st_term, true); - st_threadsSignal.emit(); + () @trusted nothrow { + atomicStore(st_term, true); + st_threadsSignal.emit(); + } (); } // shutdown the calling thread @@ -142,9 +148,9 @@ void exitEventLoop(bool shutdown_all_threads = false) Returns: Returns false $(I iff) exitEventLoop was called in the process. */ bool processEvents() -{ +@safe nothrow { if (!eventDriver.processEvents(Duration.max)) return false; - notifyIdle(); + performIdleProcessing(); return true; } @@ -156,13 +162,13 @@ bool processEvents() be triggered immediately after processing any events that have arrived in the meantime. Returning false will instead wait until another event has arrived first. */ -void setIdleHandler(void delegate() del) -{ - s_idleHandler = { del(); return false; }; +void setIdleHandler(void delegate() @safe nothrow del) +@safe nothrow { + s_idleHandler = () @safe nothrow { del(); return false; }; } /// ditto -void setIdleHandler(bool delegate() del) -{ +void setIdleHandler(bool delegate() @safe nothrow del) +@safe nothrow { s_idleHandler = del; } @@ -185,7 +191,7 @@ private Task runTask_internal(ref TaskFuncInfo tfi) @safe nothrow { import std.typecons : Tuple, tuple; - CoreTask f; + TaskFiber f; while (!f && !s_availableFibers.empty) { f = s_availableFibers.back; s_availableFibers.popBack(); @@ -196,8 +202,7 @@ private Task runTask_internal(ref TaskFuncInfo tfi) // if there is no fiber available, create one. if (s_availableFibers.capacity == 0) s_availableFibers.capacity = 1024; logDebugV("Creating new fiber..."); - s_fiberCount++; - f = new CoreTask; + f = new TaskFiber; } f.m_taskFunc = tfi; @@ -206,14 +211,14 @@ private Task runTask_internal(ref TaskFuncInfo tfi) auto handle = f.task(); debug Task self = Task.getThis(); - debug if (s_taskEventCallback) { - if (self != Task.init) () @trusted { s_taskEventCallback(TaskEvent.yield, self); } (); - () @trusted { s_taskEventCallback(TaskEvent.preStart, handle); } (); + debug if (TaskFiber.ms_taskEventCallback) { + () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); } - resumeTask(handle, null, true); - debug if (s_taskEventCallback) { - () @trusted { s_taskEventCallback(TaskEvent.postStart, handle); } (); - if (self != Task.init) () @trusted { s_taskEventCallback(TaskEvent.resume, self); } (); + + s_scheduler.switchTo(handle); + + debug if (TaskFiber.ms_taskEventCallback) { + () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); } return handle; @@ -475,7 +480,7 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, import std.algorithm : move; import std.traits : hasElaborateAssign; - struct TARGS { ARGS expand; } + static struct TARGS { ARGS expand; } static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); static assert(TARGS.sizeof <= maxTaskParameterSize, @@ -483,7 +488,7 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, maxTaskParameterSize.to!string~" bytes in total size."); static void callDelegate(TaskFuncInfo* tfi) { - assert(tfi.func is &callDelegate); + assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); // copy original call data to stack CALLABLE c; @@ -513,7 +518,7 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, return tfi; } -import core.cpuid : threadsPerCPU; + /** Sets up num worker threads. @@ -597,43 +602,68 @@ version (linux) static if (__VERSION__ <= 2066) private extern(C) int get_nprocs the calling task. */ void yield() -@safe { - // throw any deferred exceptions - processDeferredExceptions(); - - auto t = CoreTask.getThis(); - if (t && t !is CoreTask.ms_coreTask) { - assert(!t.m_queue, "Calling yield() when already yielded!?"); - if (!t.m_queue) - s_yieldedTasks.insertBack(t); - scope (exit) assert(t.m_queue is null, "Task not removed from yielders queue after being resumed."); - rawYield(); +@safe nothrow { + auto t = Task.getThis(); + if (t != Task.init) { + s_scheduler.yield(); } else { // Let yielded tasks execute - () @trusted { notifyIdle(); } (); + () @safe nothrow { performIdleProcessing(); } (); } } /** - Yields execution of this task until an event wakes it up again. + Suspends the execution of the calling task until `switchToTask` is called + manually. - Beware that the task will starve if no event wakes it up. + This low-level scheduling function is usually only used internally. Failure + to call `switchToTask` will result in task starvation and resource leakage. + + Params: + on_interrupt = If specified, is required to + + See_Also: `switchToTask` */ -void rawYield() -@safe { - yieldForEvent(); +void hibernate(scope void delegate() @safe nothrow on_interrupt = null) +@safe nothrow { + auto t = Task.getThis(); + if (t == Task.init) { + processEvents(); + } else { + t.taskFiber.handleInterrupt(on_interrupt); + s_scheduler.hibernate(); + t.taskFiber.handleInterrupt(on_interrupt); + } } + +/** + Switches execution to the given task. + + This function can be used in conjunction with `hibernate` to wake up a + task. The task must live in the same thread as the caller. + + See_Also: `hibernate` +*/ +void switchToTask(Task t) +@safe nothrow { + s_scheduler.switchTo(t); +} + + /** Suspends the execution of the calling task for the specified amount of time. Note that other tasks of the same thread will continue to run during the wait time, in contrast to $(D core.thread.Thread.sleep), which shouldn't be used in vibe.d applications. + + Throws: May throw an `InterruptException` if the task gets interrupted using + `Task.interrupt()`. */ void sleep(Duration timeout) -{ +@safe { assert(timeout >= 0.seconds, "Argument to sleep must not be negative."); if (timeout <= 0.seconds) return; auto tm = setTimer(timeout, null); @@ -670,7 +700,7 @@ unittest { See_also: createTimer */ Timer setTimer(Duration timeout, void delegate() nothrow @safe callback, bool periodic = false) -{ +@safe nothrow { auto tm = createTimer(callback); tm.rearm(timeout, periodic); return tm; @@ -698,7 +728,7 @@ unittest { See_also: setTimer */ Timer createTimer(void delegate() nothrow @safe callback) -{ +@safe nothrow { void cb(TimerID tm) nothrow @safe { if (callback !is null) @@ -723,7 +753,7 @@ Timer createTimer(void delegate() nothrow @safe callback) file descriptor. */ FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger event_mask) -{ +@safe nothrow { return FileDescriptorEvent(file_descriptor, event_mask); } @@ -746,8 +776,8 @@ FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescripto without having to worry about memory usage. */ void setTaskStackSize(size_t sz) -{ - s_taskStackSize = sz; +nothrow { + TaskFiber.ms_taskStackSize = sz; } @@ -761,7 +791,7 @@ void setTaskStackSize(size_t sz) `setupWorkerThreads` */ @property size_t workerThreadCount() - out(count) { assert(count > 0); } + out(count) { assert(count > 0, "No worker threads started after setupWorkerThreads!?"); } body { setupWorkerThreads(); return st_threads.count!(c => c.isWorker); @@ -823,9 +853,9 @@ void lowerPrivileges() analyze the life time of tasks, including task switches. Note that the callback will only be called for debug builds. */ -void setTaskEventCallback(TaskEventCb func) +void setTaskEventCallback(TaskEventCallback func) { - debug s_taskEventCallback = func; + debug TaskFiber.ms_taskEventCallback = func; } @@ -835,14 +865,6 @@ void setTaskEventCallback(TaskEventCb func) enum vibeVersionString = "0.7.27"; -/** - The maximum combined size of all parameters passed to a task delegate - - See_Also: runTask -*/ -enum maxTaskParameterSize = 128; - - /** Generic file descriptor event. @@ -860,6 +882,8 @@ struct FileDescriptorEvent { any = read|write /// Match any kind of event } + @safe nothrow: + private this(int fd, Trigger event_mask) { assert(false); @@ -897,41 +921,43 @@ struct Timer { debug uint m_magicNumber = 0x4d34f916; } + @safe: + private this(TimerID id) - { + nothrow { m_driver = eventDriver; m_id = id; } this(this) - { - debug assert(m_magicNumber == 0x4d34f916); + nothrow { + debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); if (m_driver) m_driver.addRef(m_id); } ~this() - { - debug assert(m_magicNumber == 0x4d34f916); + nothrow { + debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); if (m_driver) m_driver.releaseRef(m_id); } /// True if the timer is yet to fire. - @property bool pending() { return m_driver.isTimerPending(m_id); } + @property bool pending() nothrow { return m_driver.isTimerPending(m_id); } /// The internal ID of the timer. - @property size_t id() const { return m_id; } + @property size_t id() const nothrow { return m_id; } - bool opCast() const { return m_driver !is null; } + bool opCast() const nothrow { return m_driver !is null; } /** Resets the timer to the specified timeout */ - void rearm(Duration dur, bool periodic = false) - in { assert(dur > 0.seconds); } + void rearm(Duration dur, bool periodic = false) nothrow + in { assert(dur > 0.seconds, "Negative timer duration specified."); } body { m_driver.setTimer(m_id, dur, periodic ? dur : 0.seconds); } /** Resets the timer and avoids any firing. */ - void stop() { m_driver.stopTimer(m_id); } + void stop() nothrow { m_driver.stopTimer(m_id); } /** Waits until the timer fires. */ @@ -939,307 +965,18 @@ struct Timer { { assert (!m_driver.isTimerPeriodic(m_id), "Cannot wait for a periodic timer."); if (!this.pending) return; - m_driver.asyncAwait!"waitTimer"(m_id); + asyncAwait!(TimerCallback, + cb => m_driver.waitTimer(m_id, cb), + cb => m_driver.cancelTimerWait(m_id, cb) + ); } } -/** - Implements a task local storage variable. - - Task local variables, similar to thread local variables, exist separately - in each task. Consequently, they do not need any form of synchronization - when accessing them. - - Note, however, that each TaskLocal variable will increase the memory footprint - of any task that uses task local storage. There is also an overhead to access - TaskLocal variables, higher than for thread local variables, but generelly - still O(1) (since actual storage acquisition is done lazily the first access - can require a memory allocation with unknown computational costs). - - Notice: - FiberLocal instances MUST be declared as static/global thread-local - variables. Defining them as a temporary/stack variable will cause - crashes or data corruption! - - Examples: - --- - TaskLocal!string s_myString = "world"; - - void taskFunc() - { - assert(s_myString == "world"); - s_myString = "hello"; - assert(s_myString == "hello"); - } - - shared static this() - { - // both tasks will get independent storage for s_myString - runTask(&taskFunc); - runTask(&taskFunc); - } - --- -*/ -struct TaskLocal(T) -{ - private { - size_t m_offset = size_t.max; - size_t m_id; - T m_initValue; - bool m_hasInitValue = false; - } - - this(T init_val) { m_initValue = init_val; m_hasInitValue = true; } - - @disable this(this); - - void opAssign(T value) { this.storage = value; } - - @property ref T storage() - { - auto fiber = CoreTask.getThis(); - - // lazily register in FLS storage - if (m_offset == size_t.max) { - static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof); - assert(CoreTask.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool."); - m_offset = CoreTask.ms_flsFill; - m_id = CoreTask.ms_flsCounter++; - - - CoreTask.ms_flsFill += T.sizeof; - while (CoreTask.ms_flsFill % 8 != 0) - CoreTask.ms_flsFill++; - } - - // make sure the current fiber has enough FLS storage - if (fiber.m_fls.length < CoreTask.ms_flsFill) { - fiber.m_fls.length = CoreTask.ms_flsFill + 128; - fiber.m_flsInit.length = CoreTask.ms_flsCounter + 64; - } - - // return (possibly default initialized) value - auto data = fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; - if (!fiber.m_flsInit[m_id]) { - fiber.m_flsInit[m_id] = true; - import std.traits : hasElaborateDestructor, hasAliasing; - static if (hasElaborateDestructor!T || hasAliasing!T) { - void function(void[], size_t) destructor = (void[] fls, size_t offset){ - static if (hasElaborateDestructor!T) { - auto obj = cast(T*)&fls[offset]; - // call the destructor on the object if a custom one is known declared - obj.destroy(); - } - else static if (hasAliasing!T) { - // zero the memory to avoid false pointers - foreach (size_t i; offset .. offset + T.sizeof) { - ubyte* u = cast(ubyte*)&fls[i]; - *u = 0; - } - } - }; - FLSInfo fls_info; - fls_info.fct = destructor; - fls_info.offset = m_offset; - - // make sure flsInfo has enough space - if (fiber.ms_flsInfo.length <= m_id) - fiber.ms_flsInfo.length = m_id + 64; - - fiber.ms_flsInfo[m_id] = fls_info; - } - - if (m_hasInitValue) { - static if (__traits(compiles, emplace!T(data, m_initValue))) - emplace!T(data, m_initValue); - else assert(false, "Cannot emplace initialization value for type "~T.stringof); - } else emplace!T(data); - } - return (cast(T[])data)[0]; - } - - alias storage this; -} - -private struct FLSInfo { - void function(void[], size_t) fct; - size_t offset; - void destroy(void[] fls) { - fct(fls, offset); - } -} - -/** - High level state change events for a Task -*/ -enum TaskEvent { - preStart, /// Just about to invoke the fiber which starts execution - postStart, /// After the fiber has returned for the first time (by yield or exit) - start, /// Just about to start execution - yield, /// Temporarily paused - resume, /// Resumed from a prior yield - end, /// Ended normally - fail /// Ended with an exception -} - - /**************************************************************************************************/ /* private types */ /**************************************************************************************************/ -private class CoreTask : TaskFiber { - import std.bitmanip; - private { - static CoreTask ms_coreTask; - CoreTask m_nextInQueue; - CoreTaskQueue* m_queue; - TaskFuncInfo m_taskFunc; - Exception m_exception; - Task[] m_yielders; - - // task local storage - static FLSInfo[] ms_flsInfo; - static size_t ms_flsFill = 0; // thread-local - static size_t ms_flsCounter = 0; - BitArray m_flsInit; - void[] m_fls; - } - - static CoreTask getThis() - @safe nothrow { - auto f = () @trusted nothrow { - return Fiber.getThis(); - } (); - if (f) return cast(CoreTask)f; - if (!ms_coreTask) ms_coreTask = new CoreTask; - return ms_coreTask; - } - - this() - @trusted nothrow { - super(&run, s_taskStackSize); - } - - @property State state() - @trusted const nothrow { - return super.state; - } - - @property size_t taskCounter() const { return m_taskCounter; } - - private void run() - { - version (VibeDebugCatchAll) alias UncaughtException = Throwable; - else alias UncaughtException = Exception; - try { - while(true){ - while (!m_taskFunc.func) { - try { - Fiber.yield(); - } catch( Exception e ){ - logWarn("CoreTaskFiber was resumed with exception but without active task!"); - logDiagnostic("Full error: %s", e.toString().sanitize()); - } - } - - auto task = m_taskFunc; - m_taskFunc = TaskFuncInfo.init; - Task handle = this.task; - try { - m_running = true; - scope(exit) m_running = false; - - std.concurrency.thisTid; // force creation of a message box - - debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.start, handle); - if (!s_eventLoopRunning) { - logTrace("Event loop not running at task start - yielding."); - .yield(); - logTrace("Initial resume of task."); - } - task.func(&task); - debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.end, handle); - } catch( Exception e ){ - debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.fail, handle); - import std.encoding; - logCritical("Task terminated with uncaught exception: %s", e.msg); - logDebug("Full error: %s", e.toString().sanitize()); - } - - this.tidInfo.ident = Tid.init; // clear message box - - // check for any unhandled deferred exceptions - if (m_exception !is null) { - if (cast(InterruptException)m_exception) { - logDebug("InterruptException not handled by task before exit."); - } else { - logCritical("Deferred exception not handled by task before exit: %s", m_exception.msg); - logDebug("Full error: %s", m_exception.toString().sanitize()); - } - } - - foreach (t; m_yielders) s_yieldedTasks.insertBack(cast(CoreTask)t.fiber); - m_yielders.length = 0; - - // make sure that the task does not get left behind in the yielder queue if terminated during yield() - if (m_queue) { - resumeYieldedTasks(); - assert(m_queue is null, "Still in yielder queue at the end of task after resuming all yielders!?"); - } - - // zero the fls initialization ByteArray for memory safety - foreach (size_t i, ref bool b; m_flsInit) { - if (b) { - if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) - ms_flsInfo[i].destroy(m_fls); - b = false; - } - } - - // make the fiber available for the next task - if (s_availableFibers.full) - s_availableFibers.capacity = 2 * s_availableFibers.capacity; - - s_availableFibers.put(this); - } - } catch(UncaughtException th) { - logCritical("CoreTaskFiber was terminated unexpectedly: %s", th.msg); - logDiagnostic("Full error: %s", th.toString().sanitize()); - s_fiberCount--; - } - } - - override void join() - { - auto caller = Task.getThis(); - if (!m_running) return; - if (caller != Task.init) { - assert(caller.fiber !is this, "A task cannot join itself."); - assert(caller.thread is this.thread, "Joining tasks in foreign threads is currently not supported."); - m_yielders ~= caller; - } else assert(Thread.getThis() is this.thread, "Joining tasks in different threads is not yet supported."); - auto run_count = m_taskCounter; - if (caller == Task.init) .yield(); // let the task continue (it must be yielded currently) - while (m_running && run_count == m_taskCounter) rawYield(); - } - - override void interrupt() - { - auto caller = Task.getThis(); - if (caller != Task.init) { - assert(caller != this.task, "A task cannot interrupt itself."); - assert(caller.thread is this.thread, "Interrupting tasks in different threads is not yet supported."); - } else assert(Thread.getThis() is this.thread, "Interrupting tasks in different threads is not yet supported."); - yieldAndResumeTask(this.task, new InterruptException); - } - - override void terminate() - { - assert(false, "Not implemented"); - } -} - private void setupGcTimer() { @@ -1253,95 +990,15 @@ private void setupGcTimer() s_gcCollectTimeout = dur!"seconds"(2); } -package(vibe) void yieldForEventDeferThrow() +package(vibe) void performIdleProcessing() @safe nothrow { - yieldForEventDeferThrow(Task.getThis()); -} - -package(vibe) void processDeferredExceptions() -@safe { - processDeferredExceptions(Task.getThis()); -} - -package(vibe) void yieldForEvent() -@safe { - auto task = Task.getThis(); - processDeferredExceptions(task); - yieldForEventDeferThrow(task); - processDeferredExceptions(task); -} - -package(vibe) void resumeTask(Task task, Exception event_exception = null) -@safe nothrow { - assert(Task.getThis() == Task.init, "Calling resumeTask from another task."); - resumeTask(task, event_exception, false); -} - -package(vibe) void yieldAndResumeTask(Task task, Exception event_exception = null) -@safe { - auto thisct = CoreTask.getThis(); - - if (thisct is null || thisct is CoreTask.ms_coreTask) { - resumeTask(task, event_exception); - return; - } - - auto otherct = cast(CoreTask)task.fiber; - assert(!thisct || otherct.thread is thisct.thread, "Resuming task in foreign thread."); - assert(() @trusted { return otherct.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD."); - - if (event_exception) otherct.m_exception = event_exception; - if (!otherct.m_queue) s_yieldedTasks.insertBack(otherct); - yield(); -} - -package(vibe) void resumeTask(Task task, Exception event_exception, bool initial_resume) -@safe nothrow { - assert(initial_resume || task.running, "Resuming terminated task."); - resumeCoreTask(cast(CoreTask)task.fiber, event_exception); -} - -package(vibe) void resumeCoreTask(CoreTask ctask, Exception event_exception = null) -nothrow @safe { - assert(ctask.thread is () @trusted { return Thread.getThis(); } (), "Resuming task in foreign thread."); - assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD"); - - if (event_exception) { - extrap(); - assert(!ctask.m_exception, "Resuming task with exception that is already scheduled to be resumed with exception."); - ctask.m_exception = event_exception; - } - - // do nothing if the task is aready scheduled to be resumed - if (ctask.m_queue) return; - - auto uncaught_exception = () @trusted nothrow { return ctask.call!(Fiber.Rethrow.no)(); } (); - - if (uncaught_exception) { - auto th = cast(Throwable)uncaught_exception; - assert(th, "Fiber returned exception object that is not a Throwable!?"); - extrap(); - - assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.TERM); - logError("Task terminated with unhandled exception: %s", th.msg); - logDebug("Full error: %s", () @trusted { return th.toString().sanitize; } ()); - - // always pass Errors on - if (auto err = cast(Error)th) throw err; - } -} - -package(vibe) void notifyIdle() -{ bool again = !getExitFlag(); while (again) { if (s_idleHandler) again = s_idleHandler(); else again = false; - resumeYieldedTasks(); - - again = (again || !s_yieldedTasks.empty) && !getExitFlag(); + again = (s_scheduler.schedule() || again) && !getExitFlag(); if (again) { auto er = eventDriver.processEvents(0.seconds); @@ -1352,52 +1009,14 @@ package(vibe) void notifyIdle() } } } - if (!s_yieldedTasks.empty) logDebug("Exiting from idle processing although there are still yielded tasks"); + + if (s_scheduler.scheduledTaskCount) logDebug("Exiting from idle processing although there are still yielded tasks"); if (!s_ignoreIdleForGC && s_gcTimer) { s_gcTimer.rearm(s_gcCollectTimeout); } else s_ignoreIdleForGC = false; } -private void resumeYieldedTasks() -{ - for (auto limit = s_yieldedTasks.length; limit > 0 && !s_yieldedTasks.empty; limit--) { - auto tf = s_yieldedTasks.front; - s_yieldedTasks.popFront(); - if (tf.state == Fiber.State.HOLD) resumeCoreTask(tf); - } -} - -private void yieldForEventDeferThrow(Task task) -@safe nothrow { - if (task != Task.init) { - debug if (s_taskEventCallback) () @trusted { s_taskEventCallback(TaskEvent.yield, task); } (); - () @trusted { task.fiber.yield(); } (); - debug if (s_taskEventCallback) () @trusted { s_taskEventCallback(TaskEvent.resume, task); } (); - // leave fiber.m_exception untouched, so that it gets thrown on the next yieldForEvent call - } else { - assert(!s_eventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); - s_eventException = null; - eventDriver.processEvents(); - // leave m_eventException untouched, so that it gets thrown on the next yieldForEvent call - } -} - -private void processDeferredExceptions(Task task) -@safe { - if (task != Task.init) { - auto fiber = cast(CoreTask)task.fiber; - if (auto e = fiber.m_exception) { - fiber.m_exception = null; - throw e; - } - } else { - if (auto e = s_eventException) { - s_eventException = null; - throw e; - } - } -} private struct ThreadContext { Thread thread; @@ -1407,67 +1026,28 @@ private struct ThreadContext { this(Thread thr, bool worker) { this.thread = thr; this.isWorker = worker; } } -private struct TaskFuncInfo { - void function(TaskFuncInfo*) func; - void[2*size_t.sizeof] callable = void; - void[maxTaskParameterSize] args = void; - - @property ref C typedCallable(C)() - { - static assert(C.sizeof <= callable.sizeof); - return *cast(C*)callable.ptr; - } - - @property ref A typedArgs(A)() - { - static assert(A.sizeof <= args.sizeof); - return *cast(A*)args.ptr; - } - - void initCallable(C)() - { - C cinit; - this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1]; - } - - void initArgs(A)() - { - A ainit; - this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1]; - } -} - -alias TaskArgsVariant = VariantN!maxTaskParameterSize; - /**************************************************************************************************/ /* private functions */ /**************************************************************************************************/ private { - static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024; - else enum defaultTaskStackSize = 512*1024; - - __gshared size_t s_taskStackSize = defaultTaskStackSize; Duration s_gcCollectTimeout; Timer s_gcTimer; bool s_ignoreIdleForGC = false; - Exception s_eventException; __gshared core.sync.mutex.Mutex st_threadsMutex; - __gshared ManualEvent st_threadsSignal; + shared ManualEvent st_threadsSignal; __gshared ThreadContext[] st_threads; __gshared TaskFuncInfo[] st_workerTasks; __gshared Condition st_threadShutdownCondition; - __gshared debug TaskEventCb s_taskEventCallback; shared bool st_term = false; bool s_exitEventLoop = false; - bool s_eventLoopRunning = false; - bool delegate() s_idleHandler; - CoreTaskQueue s_yieldedTasks; - Variant[string] s_taskLocalStorageGlobal; // for use outside of a task - FixedRingBuffer!CoreTask s_availableFibers; - size_t s_fiberCount; + package bool s_eventLoopRunning = false; + bool delegate() @safe nothrow s_idleHandler; + + TaskScheduler s_scheduler; + FixedRingBuffer!TaskFiber s_availableFibers; string s_privilegeLoweringUserName; string s_privilegeLoweringGroupName; @@ -1475,12 +1055,24 @@ private { } private bool getExitFlag() -{ +@trusted nothrow { return s_exitEventLoop || atomicLoad(st_term); } +package @property bool isEventLoopRunning() @safe nothrow @nogc { return s_eventLoopRunning; } +package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return s_scheduler; } + +package void recycleFiber(TaskFiber fiber) +@safe nothrow { + if (s_availableFibers.full) + s_availableFibers.capacity = 2 * s_availableFibers.capacity; + + s_availableFibers.put(fiber); +} + private void setupSignalHandlers() -{ +@trusted nothrow { + scope (failure) assert(false); // _d_monitorexit is not nothrow __gshared bool s_setup = false; // only initialize in main thread @@ -1550,7 +1142,7 @@ shared static this() assert(st_threads.length == 0, "Main thread not the first thread!?"); st_threads ~= ThreadContext(thisthr, false); - st_threadsSignal = createManualEvent(); + st_threadsSignal = createSharedManualEvent(); version(VibeIdleCollect) { logTrace("setup gc"); @@ -1569,7 +1161,7 @@ shared static this() shared static ~this() { - eventDriver.dispose(); + shutdownDriver(); size_t tasks_left; @@ -1577,7 +1169,8 @@ shared static ~this() if( !st_workerTasks.empty ) tasks_left = st_workerTasks.length; } - if (!s_yieldedTasks.empty) tasks_left += s_yieldedTasks.length; + tasks_left += s_scheduler.scheduledTaskCount; + if (tasks_left > 0) { logWarn("There were still %d tasks running at exit.", tasks_left); } @@ -1591,14 +1184,10 @@ static this() // vibe.core.core -> vibe.core.drivers.native -> vibe.core.drivers.libasync -> vibe.core.core if (Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor") return; - assert(st_threadsSignal); - auto thisthr = Thread.getThis(); synchronized (st_threadsMutex) if (!st_threads.any!(c => c.thread is thisthr)) st_threads ~= ThreadContext(thisthr, false); - - //CoreTask.ms_coreTask = new CoreTask; } static ~this() @@ -1637,15 +1226,24 @@ static ~this() } // delay deletion of the main event driver to "~shared static this()" - if (!is_main_thread) eventDriver.dispose(); + if (!is_main_thread) shutdownDriver(); st_threadShutdownCondition.notifyAll(); } +private void shutdownDriver() +{ + if (ManualEvent.ms_threadEvent != EventID.init) { + eventDriver.releaseRef(ManualEvent.ms_threadEvent); + ManualEvent.ms_threadEvent = EventID.init; + } + + eventDriver.dispose(); +} + private void workerThreadFunc() nothrow { try { - assert(st_threadsSignal); if (getExitFlag()) return; logDebug("entering worker thread"); runTask(toDelegate(&handleWorkerTasks)); @@ -1682,7 +1280,7 @@ private void handleWorkerTasks() synchronized (st_threadsMutex) { auto idx = st_threads.countUntil!(c => c.thread is thisthr); - assert(idx >= 0); + assert(idx >= 0, "Worker thread not in st_threads array!?"); logDebug("worker thread check"); if (getExitFlag()) { @@ -1797,42 +1395,6 @@ version(Posix) } } -private struct CoreTaskQueue { - @safe nothrow: - - CoreTask first, last; - size_t length; - - @disable this(this); - - @property bool empty() const { return first is null; } - - @property CoreTask front() { return first; } - - void insertBack(CoreTask task) - { - assert(task.m_queue == null, "Task is already scheduled to be resumed!"); - assert(task.m_nextInQueue is null, "Task has m_nextInQueue set without being in a queue!?"); - task.m_queue = &this; - if (empty) - first = task; - else - last.m_nextInQueue = task; - last = task; - length++; - } - - void popFront() - { - if (first is last) last = null; - assert(first && first.m_queue == &this); - auto next = first.m_nextInQueue; - first.m_nextInQueue = null; - first.m_queue = null; - first = next; - length--; - } -} // mixin string helper to call a function with arguments that potentially have // to be moved diff --git a/source/vibe/core/log.d b/source/vibe/core/log.d index 950ce22..04e1d0d 100644 --- a/source/vibe/core/log.d +++ b/source/vibe/core/log.d @@ -859,6 +859,7 @@ private struct LogOutputRange { void put(dchar ch) { + static import std.utf; if (ch < 128) put(cast(char)ch); else { char[4] buf; diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 041b05b..29c306a 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -14,7 +14,8 @@ import std.functional : toDelegate; import std.socket : AddressFamily, UnknownAddress; import vibe.core.log; import vibe.internal.async; - +import core.time : Duration; + /** Resolves the given host name/IP address string. @@ -29,13 +30,26 @@ NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFa /// ditto NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true) { - NetworkAddress ret; - ret.family = address_family; - if (host == "127.0.0.1") { - ret.family = AddressFamily.INET; - ret.sockAddrInet4.sin_addr.s_addr = 0x0100007F; - } else assert(false); - return ret; + import std.socket : parseAddress; + version (Windows) import std.c.windows.winsock : sockaddr_in, sockaddr_in6; + else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; + + enforce(host.length > 0, "Host name must not be empty."); + if (host[0] == ':' || host[0] >= '0' && host[0] <= '9') { + auto addr = parseAddress(host); + enforce(address_family == AddressFamily.UNSPEC || addr.addressFamily == address_family); + NetworkAddress ret; + ret.family = addr.addressFamily; + switch (addr.addressFamily) with(AddressFamily) { + default: throw new Exception("Unsupported address family"); + case INET: *ret.sockAddrInet4 = *cast(sockaddr_in*)addr.name; break; + case INET6: *ret.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; break; + } + return ret; + } else { + enforce(use_dns, "Malformed IP address string."); + assert(false, "DNS lookup not implemented."); // TODO + } } @@ -103,7 +117,11 @@ TCPConnection connectTCP(NetworkAddress addr) scope uaddr = new UnknownAddress; addr.toUnknownAddress(uaddr); - auto result = eventDriver.asyncAwait!"connectStream"(uaddr); + // FIXME: make this interruptible + auto result = asyncAwaitUninterruptible!(ConnectCallback, + cb => eventDriver.connectStream(uaddr, cb) + //cb => eventDriver.cancelConnect(cb) + ); enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); return TCPConnection(result[0]); } @@ -354,7 +372,10 @@ mixin(tracer); // TODO: timeout!! if (m_context.readBuffer.length > 0) return true; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; - auto res = eventDriver.asyncAwait!"readSocket"(m_socket, m_context.readBuffer.peekDst(), mode); + auto res = asyncAwait!(IOCallback, + cb => eventDriver.readSocket(m_socket, m_context.readBuffer.peekDst(), mode, cb), + cb => eventDriver.cancelRead(m_socket) + ); logTrace("Socket %s, read %s bytes: %s", res[0], res[2], res[1]); assert(m_context.readBuffer.length == 0); @@ -403,7 +424,9 @@ mixin(tracer); mixin(tracer); if (bytes.length == 0) return; - auto res = eventDriver.asyncAwait!"writeSocket"(m_socket, bytes, IOMode.all); + auto res = asyncAwait!(IOCallback, + cb => eventDriver.writeSocket(m_socket, bytes, IOMode.all, cb), + cb => eventDriver.cancelWrite(m_socket)); switch (res[1]) { default: diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 8914193..df3a878 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -126,7 +126,7 @@ class LocalTaskSemaphore //import vibe.utils.memory; private { - struct Waiter { + static struct Waiter { ManualEvent signal; ubyte priority; uint seq; @@ -632,28 +632,126 @@ ManualEvent createManualEvent() { return ManualEvent.init; } +/// ditto +shared(ManualEvent) createSharedManualEvent() +{ + return shared(ManualEvent).init; +} /** A manually triggered cross-task event. Note: the ownership can be shared between multiple fibers and threads. */ struct ManualEvent { + import core.thread : Thread; + import vibe.internal.async : asyncAwait, asyncAwaitUninterruptible; + + private { + static struct Waiter { + Waiter* next; + immutable EventID event; + immutable EventDriver driver; + immutable Thread thread; + StackSList!ThreadWaiter tasks; + } + static struct ThreadWaiter { + ThreadWaiter* next; + Task task; + void delegate() @safe nothrow notifier; + + void wait(void delegate() @safe nothrow del) @safe nothrow { assert(notifier is null); notifier = del; } + void cancel() @safe nothrow { notifier = null; } + + void wait(void delegate() @safe nothrow del) + shared @safe nothrow { + notifier = del; + if (!next) eventDriver.waitForEvent(ms_threadEvent, &onEvent); + } + + private void onEvent(EventID event) + @safe nothrow { + assert(event == ms_threadEvent); + notifier(); + } + } + int m_emitCount; + Waiter* m_waiters; + + } + + // thread destructor in vibe.core.core will decrement the ref. count + package static EventID ms_threadEvent; + + enum EmitMode { + single, + all + } + + //@disable this(this); + + deprecated("ManualEvent is always non-null!") bool opCast() const nothrow { return true; } - int emitCount() const nothrow { return 0; } - int emit() nothrow { return 0; } - int wait() { assert(false); } - int wait(int) { import vibe.core.core : sleep; sleep(30.seconds); assert(false); } - int wait(Duration, int) { assert(false); } - int waitUninterruptible() nothrow { assert(false); } - int waitUninterruptible(int) nothrow { assert(false); } - int waitUninterruptible(Duration, int) nothrow { assert(false); } -} -/+interface ManualEvent { + deprecated("ManualEvent is always non-null!") + bool opCast() const shared nothrow { return true; } + /// A counter that is increased with every emit() call - @property int emitCount() const nothrow; + int emitCount() const nothrow { return m_emitCount; } + /// ditto + int emitCount() const shared nothrow { return atomicLoad(m_emitCount); } /// Emits the signal, waking up all owners of the signal. - void emit() nothrow; + int emit(EmitMode mode = EmitMode.all) + shared nothrow { + import core.atomic : atomicOp, cas; + + auto ec = atomicOp!"+="(m_emitCount, 1); + auto thisthr = Thread.getThis(); + + final switch (mode) { + case EmitMode.all: + // FIXME: would be nice to have atomicSwap instead + auto w = cast(Waiter*)atomicLoad(m_waiters); + if (w !is null && !cas(&m_waiters, cast(shared(Waiter)*)w, cast(shared(Waiter)*)null)) + return ec; + while (w !is null) { + if (w.thread is thisthr) { + // Note: emitForThisThread can result in w getting deallocated at any + // time, so we need to copy any fields first + auto tasks = w.tasks; + w = w.next; + emitForThisThread(w.tasks.m_first, mode); + } else { + auto evt = w.event; + w = w.next; + eventDriver.triggerEvent(evt, true); + } + } + break; + case EmitMode.single: + assert(false); + } + return ec; + } + /// ditto + int emit(EmitMode mode = EmitMode.all) + nothrow { + auto ec = m_emitCount++; + + final switch (mode) { + case EmitMode.all: + auto w = m_waiters; + m_waiters = null; + if (w !is null) { + assert(w.thread is Thread.getThis(), "Unshared ManualEvent has waiters in foreign thread!"); + assert(w.next is null, "Unshared ManualEvent has waiters in multiple threads!"); + emitForThisThread(w.tasks.m_first, EmitMode.all); + } + break; + case EmitMode.single: + assert(false); + } + return ec; + } /** Acquires ownership and waits until the signal is emitted. @@ -661,7 +759,9 @@ struct ManualEvent { May throw an $(D InterruptException) if the task gets interrupted using $(D Task.interrupt()). */ - void wait(); + int wait() { return wait(this.emitCount); } + /// ditto + int wait() shared { return wait(this.emitCount); } /** Acquires ownership and waits until the emit count differs from the given one. @@ -669,7 +769,9 @@ struct ManualEvent { May throw an $(D InterruptException) if the task gets interrupted using $(D Task.interrupt()). */ - int wait(int reference_emit_count); + int wait(int emit_count) { return wait(Duration.max, emit_count); } + /// ditto + int wait(int emit_count) shared { return wait(Duration.max, emit_count); } /** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reaced. @@ -677,32 +779,218 @@ struct ManualEvent { May throw an $(D InterruptException) if the task gets interrupted using $(D Task.interrupt()). */ - int wait(Duration timeout, int reference_emit_count); + int wait(Duration timeout, int emit_count) + { + Waiter w; + ThreadWaiter tw; + int ec = this.emitCount; + while (ec <= emit_count) { + // wait for getting resumed directly by emit/emitForThisThread + acquireWaiter(w, tw); + asyncAwait!(void delegate() @safe nothrow, + cb => tw.wait(cb), + cb => tw.cancel() + )(timeout); + ec = this.emitCount; + } + return ec; + } + /// ditto + int wait(Duration timeout, int emit_count) + shared { + shared(Waiter) w; + ThreadWaiter tw; + acquireWaiter(w, tw); + + int ec = this.emitCount; + while (ec <= emit_count) { + if (tw.next) { + // if we are not the first waiter for this thread, + // wait for getting resumed by emitForThisThread + asyncAwait!(void delegate() @safe nothrow, + cb => tw.wait(cb), + cb => tw.cancel() + )(timeout); + ec = this.emitCount; + } else { + // if we are the first waiter for this thread, + // wait for the thread event to get emitted + /*asyncAwait!(EventCallback, void delegate() @safe nothrow, + cb => eventDriver.waitForEvent(ms_threadEvent, cb), + cb => tw.wait(cb), + cb => eventDriver.cancelWaitForEvent(ms_threadEvent) + )(timeout); + emitForThisThread(w.waiters); + ec = this.emitCount;*/ + assert(false); + } + } + return ec; + } + /** Same as $(D wait), but defers throwing any $(D InterruptException). This method is annotated $(D nothrow) at the expense that it cannot be interrupted. */ - int waitUninterruptible(int reference_emit_count) nothrow; - + int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); } + /// + int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } /// ditto - int waitUninterruptible(Duration timeout, int reference_emit_count) nothrow; -}+/ + int waitUninterruptible(int emit_count) nothrow { return waitUninterruptible(Duration.max, emit_count); } + /// ditto + int waitUninterruptible(int emit_count) shared nothrow { return waitUninterruptible(Duration.max, emit_count); } + /// ditto + int waitUninterruptible(Duration timeout, int emit_count) + nothrow { + Waiter w; + ThreadWaiter tw; + acquireWaiter(w, tw); + int ec = this.emitCount; + while (ec <= emit_count) { + asyncAwaitUninterruptible!(void delegate(), + cb => tw.wait(cb), + cb => tw.cancel() + )(timeout); + ec = this.emitCount; + } + return ec; + } + /// ditto + int waitUninterruptible(Duration timeout, int emit_count) + shared nothrow { + /*Waiter w; + ThreadWaiter tw; + auto event = acquireWaiter(w, tw); + + int ec = this.emitCount; + while (ec <= emit_count) { + asyncAwaitUninterruptible!(void delegate(), + cb => tw.wait(cb), + cb => tw.cancel() + )(timeout); + ec = this.emitCount; + } + return ec;*/ + assert(false); + } + + private static bool emitForThisThread(ThreadWaiter* waiters, EmitMode mode) + nothrow { + if (!waiters) return false; + + final switch (mode) { + case EmitMode.all: + while (waiters) { + if (waiters.notifier !is null) + waiters.notifier(); + waiters = waiters.next; + } + break; + case EmitMode.single: + assert(false, "TODO!"); + } + + return true; + } + + private void acquireWaiter(ref Waiter w, ref ThreadWaiter tw) + nothrow { + // FIXME: this doesn't work! if task a starts to wait, task b afterwards, and then a finishes its wait before b, the Waiter will be dangling + tw.task = Task.getThis(); + + if (m_waiters) { + m_waiters.tasks.add(&tw); + } else { + m_waiters = &w; + } + } + + private void acquireWaiter(ref shared(Waiter) w, ref ThreadWaiter tw) + nothrow shared { + tw.task = Task.getThis(); + + if (ms_threadEvent == EventID.init) + ms_threadEvent = eventDriver.createEvent(); + + if (m_waiters) { + //m_waiters.tasks.add(&tw); + assert(false); + } else { + m_waiters = &w; + } + } +} + + +private struct StackSList(T) +{ + import core.atomic : cas; + + private T* m_first; + + @property T* first() { return m_first; } + @property shared(T)* first() shared { return atomicLoad(m_first); } + + void add(shared(T)* elem) + shared { + do elem.next = atomicLoad(m_first); + while (cas(&m_first, elem.next, elem)); + } + + void remove(shared(T)* elem) + shared { + while (true) { + shared(T)* w = atomicLoad(m_first), wp; + while (w !is elem) { + wp = w; + w = atomicLoad(w.next); + } + if (wp !is null) { + if (cas(&wp.next, w, w.next)) + break; + } else { + if (cas(&m_first, w, w.next)) + break; + } + } + } + + bool empty() const { return m_first is null; } + + void add(T* elem) + { + elem.next = m_first; + m_first = elem; + } + + void remove(T* elem) + { + T* w = m_first, wp; + while (w !is elem) { + assert(w !is null); + wp = w; + w = w.next; + } + if (wp) wp.next = w.next; + else m_first = w.next; + } +} private struct TaskMutexImpl(bool INTERRUPTIBLE) { import std.stdio; private { shared(bool) m_locked = false; shared(uint) m_waiters = 0; - ManualEvent m_signal; + shared(ManualEvent) m_signal; debug Task m_owner; } void setup() { - m_signal = createManualEvent(); + m_signal = createSharedManualEvent(); } @@ -751,13 +1039,13 @@ private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { Task m_owner; size_t m_recCount = 0; shared(uint) m_waiters = 0; - ManualEvent m_signal; + shared(ManualEvent) m_signal; @property bool m_locked() const { return m_recCount > 0; } } void setup() { - m_signal = createManualEvent(); + m_signal = createSharedManualEvent(); m_mutex = new core.sync.mutex.Mutex; } @@ -812,7 +1100,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { private { LOCKABLE m_mutex; - ManualEvent m_signal; + shared(ManualEvent) m_signal; } static if (is(LOCKABLE == Lockable)) { @@ -833,7 +1121,7 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { void setup(LOCKABLE mtx) { m_mutex = mtx; - m_signal = createManualEvent(); + m_signal = createSharedManualEvent(); } @property LOCKABLE mutex() { return m_mutex; } @@ -955,9 +1243,9 @@ private struct ReadWriteMutexState(bool INTERRUPTIBLE) //Queue Events /** The event used to wake reading tasks waiting for the lock while it is blocked. */ - ManualEvent m_readyForReadLock; + shared(ManualEvent) m_readyForReadLock; /** The event used to wake writing tasks waiting for the lock while it is blocked. */ - ManualEvent m_readyForWriteLock; + shared(ManualEvent) m_readyForWriteLock; /** The underlying mutex that gates the access to the shared state. */ Mutex m_counterMutex; @@ -967,8 +1255,8 @@ private struct ReadWriteMutexState(bool INTERRUPTIBLE) { m_policy = policy; m_counterMutex = new Mutex(); - m_readyForReadLock = createManualEvent(); - m_readyForWriteLock = createManualEvent(); + m_readyForReadLock = createSharedManualEvent(); + m_readyForWriteLock = createSharedManualEvent(); } @disable this(this); diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index e746d6a..af7fd85 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -7,8 +7,8 @@ */ module vibe.core.task; +import vibe.core.log; import vibe.core.sync; -import vibe.internal.array : FixedRingBuffer; import core.thread; import std.exception; @@ -58,29 +58,29 @@ struct Task { } nothrow { - @property inout(TaskFiber) fiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; } + package @property inout(TaskFiber) taskFiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; } + @property inout(Fiber) fiber() inout @trusted { return this.taskFiber; } @property size_t taskCounter() const @safe { return m_taskCounter; } - @property inout(Thread) thread() inout @safe { if (m_fiber) return this.fiber.thread; return null; } + @property inout(Thread) thread() inout @safe { if (m_fiber) return this.taskFiber.thread; return null; } /** Determines if the task is still running. */ @property bool running() const @trusted { assert(m_fiber !is null, "Invalid task handle"); - try if (this.fiber.state == Fiber.State.TERM) return false; catch (Throwable) {} - return this.fiber.m_running && this.fiber.m_taskCounter == m_taskCounter; + try if (this.taskFiber.state == Fiber.State.TERM) return false; catch (Throwable) {} + return this.taskFiber.m_running && this.taskFiber.m_taskCounter == m_taskCounter; } // FIXME: this is not thread safe! - @property ref ThreadInfo tidInfo() { return m_fiber ? fiber.tidInfo : s_tidInfo; } + @property ref ThreadInfo tidInfo() { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } @property Tid tid() { return tidInfo.ident; } } T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; } - void join() { if (running) fiber.join(); } - void interrupt() { if (running) fiber.interrupt(); } - void terminate() { if (running) fiber.terminate(); } + void join() { if (running) taskFiber.join(); } + void interrupt() { if (running) taskFiber.interrupt(); } string toString() const { import std.string; return format("%s:%s", cast(void*)m_fiber, m_taskCounter); } @@ -88,6 +88,155 @@ struct Task { bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } } +/** + Implements a task local storage variable. + + Task local variables, similar to thread local variables, exist separately + in each task. Consequently, they do not need any form of synchronization + when accessing them. + + Note, however, that each TaskLocal variable will increase the memory footprint + of any task that uses task local storage. There is also an overhead to access + TaskLocal variables, higher than for thread local variables, but generelly + still O(1) (since actual storage acquisition is done lazily the first access + can require a memory allocation with unknown computational costs). + + Notice: + FiberLocal instances MUST be declared as static/global thread-local + variables. Defining them as a temporary/stack variable will cause + crashes or data corruption! + + Examples: + --- + TaskLocal!string s_myString = "world"; + + void taskFunc() + { + assert(s_myString == "world"); + s_myString = "hello"; + assert(s_myString == "hello"); + } + + shared static this() + { + // both tasks will get independent storage for s_myString + runTask(&taskFunc); + runTask(&taskFunc); + } + --- +*/ +struct TaskLocal(T) +{ + private { + size_t m_offset = size_t.max; + size_t m_id; + T m_initValue; + bool m_hasInitValue = false; + } + + this(T init_val) { m_initValue = init_val; m_hasInitValue = true; } + + @disable this(this); + + void opAssign(T value) { this.storage = value; } + + @property ref T storage() + { + auto fiber = TaskFiber.getThis(); + + // lazily register in FLS storage + if (m_offset == size_t.max) { + static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof); + assert(TaskFiber.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool."); + m_offset = TaskFiber.ms_flsFill; + m_id = TaskFiber.ms_flsCounter++; + + + TaskFiber.ms_flsFill += T.sizeof; + while (TaskFiber.ms_flsFill % 8 != 0) + TaskFiber.ms_flsFill++; + } + + // make sure the current fiber has enough FLS storage + if (fiber.m_fls.length < TaskFiber.ms_flsFill) { + fiber.m_fls.length = TaskFiber.ms_flsFill + 128; + fiber.m_flsInit.length = TaskFiber.ms_flsCounter + 64; + } + + // return (possibly default initialized) value + auto data = fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; + if (!fiber.m_flsInit[m_id]) { + fiber.m_flsInit[m_id] = true; + import std.traits : hasElaborateDestructor, hasAliasing; + static if (hasElaborateDestructor!T || hasAliasing!T) { + void function(void[], size_t) destructor = (void[] fls, size_t offset){ + static if (hasElaborateDestructor!T) { + auto obj = cast(T*)&fls[offset]; + // call the destructor on the object if a custom one is known declared + obj.destroy(); + } + else static if (hasAliasing!T) { + // zero the memory to avoid false pointers + foreach (size_t i; offset .. offset + T.sizeof) { + ubyte* u = cast(ubyte*)&fls[i]; + *u = 0; + } + } + }; + FLSInfo fls_info; + fls_info.fct = destructor; + fls_info.offset = m_offset; + + // make sure flsInfo has enough space + if (ms_flsInfo.length <= m_id) + ms_flsInfo.length = m_id + 64; + + ms_flsInfo[m_id] = fls_info; + } + + if (m_hasInitValue) { + static if (__traits(compiles, emplace!T(data, m_initValue))) + emplace!T(data, m_initValue); + else assert(false, "Cannot emplace initialization value for type "~T.stringof); + } else emplace!T(data); + } + return (cast(T[])data)[0]; + } + + alias storage this; +} + + +/** Exception that is thrown by Task.interrupt. +*/ +class InterruptException : Exception { + this() + @safe nothrow { + super("Task interrupted."); + } +} + +/** + High level state change events for a Task +*/ +enum TaskEvent { + preStart, /// Just about to invoke the fiber which starts execution + postStart, /// After the fiber has returned for the first time (by yield or exit) + start, /// Just about to start execution + yield, /// Temporarily paused + resume, /// Resumed from a prior yield + end, /// Ended normally + fail /// Ended with an exception +} + +alias TaskEventCallback = void function(TaskEvent, Task) nothrow; + +/** + The maximum combined size of all parameters passed to a task delegate + + See_Also: runTask +*/ +enum maxTaskParameterSize = 128; /** The base class for a task aka Fiber. @@ -95,24 +244,135 @@ struct Task { This class represents a single task that is executed concurrently with other tasks. Each task is owned by a single thread. */ -class TaskFiber : Fiber { - private { - Thread m_thread; - import std.concurrency : ThreadInfo; - ThreadInfo m_tidInfo; - } +final package class TaskFiber : Fiber { + static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024; + else enum defaultTaskStackSize = 512*1024; - protected { + private { + import std.concurrency : ThreadInfo; + import std.bitmanip : BitArray; + + // task queue management (TaskScheduler.m_taskQueue) + TaskFiber m_prev, m_next; + TaskFiberQueue* m_queue; + + Thread m_thread; + ThreadInfo m_tidInfo; shared size_t m_taskCounter; shared bool m_running; + + Task[] m_joiners; + + // task local storage + BitArray m_flsInit; + void[] m_fls; + + bool m_interrupt; // Task.interrupt() is progress + + static TaskFiber ms_globalDummyFiber; + static FLSInfo[] ms_flsInfo; + static size_t ms_flsFill = 0; // thread-local + static size_t ms_flsCounter = 0; } - protected this(void delegate() fun, size_t stack_size) - nothrow { - super(fun, stack_size); + + package TaskFuncInfo m_taskFunc; + package __gshared size_t ms_taskStackSize = defaultTaskStackSize; + package __gshared debug TaskEventCallback ms_taskEventCallback; + + this() + @trusted nothrow { + super(&run, ms_taskStackSize); m_thread = Thread.getThis(); } + static TaskFiber getThis() + @safe nothrow { + auto f = () @trusted nothrow { + return Fiber.getThis(); + } (); + if (f) return cast(TaskFiber)f; + if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber; + return ms_globalDummyFiber; + } + + @property State state() + @trusted const nothrow { + return super.state; + } + + + private void run() + { + import std.encoding : sanitize; + import std.concurrency : Tid; + import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield; + + version (VibeDebugCatchAll) alias UncaughtException = Throwable; + else alias UncaughtException = Exception; + try { + while (true) { + while (!m_taskFunc.func) { + try { + Fiber.yield(); + } catch (Exception e) { + logWarn("CoreTaskFiber was resumed with exception but without active task!"); + logDiagnostic("Full error: %s", e.toString().sanitize()); + } + } + + auto task = m_taskFunc; + m_taskFunc = TaskFuncInfo.init; + Task handle = this.task; + try { + m_running = true; + scope(exit) m_running = false; + + std.concurrency.thisTid; // force creation of a message box + + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); + if (!isEventLoopRunning) { + logTrace("Event loop not running at task start - yielding."); + vibe.core.core.yield(); + logTrace("Initial resume of task."); + } + task.func(&task); + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); + } catch (Exception e) { + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); + import std.encoding; + logCritical("Task terminated with uncaught exception: %s", e.msg); + logDebug("Full error: %s", e.toString().sanitize()); + } + + this.tidInfo.ident = Tid.init; // clear message box + + foreach (t; m_joiners) taskScheduler.switchTo(t); + m_joiners.length = 0; + m_joiners.assumeSafeAppend(); + + // make sure that the task does not get left behind in the yielder queue if terminated during yield() + if (m_queue) m_queue.remove(this); + + // zero the fls initialization ByteArray for memory safety + foreach (size_t i, ref bool b; m_flsInit) { + if (b) { + if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) + ms_flsInfo[i].destroy(m_fls); + b = false; + } + } + + // make the fiber available for the next task + recycleFiber(this); + } + } catch(UncaughtException th) { + logCritical("CoreTaskFiber was terminated unexpectedly: %s", th.msg); + logDiagnostic("Full error: %s", th.toString().sanitize()); + } + } + + /** Returns the thread that owns this task. */ @property inout(Thread) thread() inout @safe nothrow { return m_thread; } @@ -123,31 +383,289 @@ class TaskFiber : Fiber { @property ref inout(ThreadInfo) tidInfo() inout nothrow { return m_tidInfo; } + @property size_t taskCounter() const { return m_taskCounter; } + /** Blocks until the task has ended. */ - abstract void join(); + void join() + { + import vibe.core.core : hibernate, yield; - /** Throws an InterruptExeption within the task as soon as it calls a blocking function. - */ - abstract void interrupt(); + auto caller = Task.getThis(); + if (!m_running) return; + if (caller != Task.init) { + assert(caller.fiber !is this, "A task cannot join itself."); + assert(caller.thread is this.thread, "Joining tasks in foreign threads is currently not supported."); + m_joiners ~= caller; + } else assert(Thread.getThis() is this.thread, "Joining tasks in different threads is not yet supported."); + auto run_count = m_taskCounter; + if (caller == Task.init) vibe.core.core.yield(); // let the task continue (it must be yielded currently) + while (m_running && run_count == m_taskCounter) hibernate(); + } - /** Terminates the task without notice as soon as it calls a blocking function. + /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. */ - abstract void terminate(); + void interrupt() + { + import vibe.core.core : taskScheduler; + + auto caller = Task.getThis(); + if (caller != Task.init) { + assert(caller != this.task, "A task cannot interrupt itself."); + assert(caller.thread is this.thread, "Interrupting tasks in different threads is not yet supported."); + } else assert(Thread.getThis() is this.thread, "Interrupting tasks in different threads is not yet supported."); + m_interrupt = true; + taskScheduler.switchTo(this.task); + } void bumpTaskCounter() @safe nothrow { import core.atomic : atomicOp; () @trusted { atomicOp!"+="(this.m_taskCounter, 1); } (); } -} - -/** Exception that is thrown by Task.interrupt. -*/ -class InterruptException : Exception { - this() - { - super("Task interrupted."); + package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt) + @safe nothrow { + assert(Task.getThis().fiber is this, "Handling interrupt outside of the corresponding fiber."); + if (m_interrupt && on_interrupt) { + m_interrupt = false; + on_interrupt(); + } } } + +package struct TaskFuncInfo { + void function(TaskFuncInfo*) func; + void[2*size_t.sizeof] callable = void; + void[maxTaskParameterSize] args = void; + + @property ref C typedCallable(C)() + { + static assert(C.sizeof <= callable.sizeof); + return *cast(C*)callable.ptr; + } + + @property ref A typedArgs(A)() + { + static assert(A.sizeof <= args.sizeof); + return *cast(A*)args.ptr; + } + + void initCallable(C)() + { + C cinit; + this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1]; + } + + void initArgs(A)() + { + A ainit; + this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1]; + } +} + +package struct TaskScheduler { + private { + TaskFiberQueue m_taskQueue; + TaskFiber m_markerTask; + } + + @safe nothrow: + + @disable this(this); + + @property size_t scheduledTaskCount() const { return m_taskQueue.length; } + + /** Lets other pending tasks execute before continuing execution. + + This will give other tasks or events a chance to be processed. If + multiple tasks call this function, they will be processed in a + fĂ­rst-in-first-out manner. + */ + void yield() + { + auto t = Task.getThis(); + if (t == Task.init) return; // not really a task -> no-op + if (t.taskFiber.m_queue !is null) return; // already scheduled to be resumed + m_taskQueue.insertBack(t.taskFiber); + doYield(t); + } + + /** Holds execution until the task gets explicitly resumed. + + + */ + void hibernate() + { + import vibe.core.core : isEventLoopRunning; + auto thist = Task.getThis(); + if (thist == Task.init) { + assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); + static import vibe.core.core; + vibe.core.core.processEvents(); + } else { + doYield(thist); + } + } + + /** Immediately switches execution to the specified task without giving up execution privilege. + + This forces immediate execution of the specified task. After the tasks finishes or yields, + the calling task will continue execution. + */ + void switchTo(Task t) + { + auto thist = Task.getThis(); + auto thisthr = thist ? thist.taskFiber.thread : () @trusted { return Thread.getThis(); } (); + assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); + if (thist == Task.init) { + resumeTask(t); + } else { + assert(!thist.taskFiber.m_queue, "Task already scheduled to be resumed... FIXME: should this really be an error?"); + m_taskQueue.insertFront(thist.taskFiber); + m_taskQueue.insertFront(t.taskFiber); + doYield(thist); + } + } + + /** Runs any pending tasks. + + A pending tasks is a task that is scheduled to be resumed by either `yield` or + `switchTo`. + + Returns: + Returns `true` $(I iff) there are more tasks left to process. + */ + bool schedule() + { + if (!m_markerTask) m_markerTask = new TaskFiber; // TODO: avoid allocating an actual task here! + + assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); + assert(!m_markerTask.m_queue, "TaskScheduler.schedule() was called recursively!"); + + // keep track of the end of the queue, so that we don't process tasks + // infinitely + m_taskQueue.insertBack(m_markerTask); + + while (m_taskQueue.front !is m_markerTask) { + auto t = m_taskQueue.front; + m_taskQueue.popFront(); + resumeTask(t.task); + + assert(!m_taskQueue.empty, "Marker task got removed from tasks queue!?"); + if (m_taskQueue.empty) return false; // handle gracefully in release mode + } + + // remove marker task + m_taskQueue.popFront(); + + return !m_taskQueue.empty; + } + + /// Resumes execution of a yielded task. + private void resumeTask(Task t) + { + import std.encoding : sanitize; + + auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } (); + + if (uncaught_exception) { + auto th = cast(Throwable)uncaught_exception; + assert(th, "Fiber returned exception object that is not a Throwable!?"); + + assert(() @trusted nothrow { return t.fiber.state; } () == Fiber.State.TERM); + logError("Task terminated with unhandled exception: %s", th.msg); + logDebug("Full error: %s", () @trusted { return th.toString().sanitize; } ()); + + // always pass Errors on + if (auto err = cast(Error)th) throw err; + } + } + + private void doYield(Task task) + { + debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } (); + () @trusted { Fiber.yield(); } (); + debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } (); + } +} + +private struct TaskFiberQueue { + @safe nothrow: + + TaskFiber first, last; + size_t length; + + @disable this(this); + + @property bool empty() const { return first is null; } + + @property TaskFiber front() { return first; } + + void insertFront(TaskFiber task) + { + assert(task.m_queue == null, "Task is already scheduled to be resumed!"); + assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); + assert(task.m_next is null, "Task has m_next set without being in a queue!?"); + task.m_queue = &this; + if (empty) { + first = task; + last = task; + } else { + first.m_prev = task; + task.m_next = first; + first = task; + } + length++; + } + + void insertBack(TaskFiber task) + { + assert(task.m_queue == null, "Task is already scheduled to be resumed!"); + assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); + assert(task.m_next is null, "Task has m_next set without being in a queue!?"); + task.m_queue = &this; + if (empty) { + first = task; + last = task; + } else { + last.m_next = task; + task.m_prev = last; + last = task; + } + length++; + } + + void popFront() + { + if (first is last) last = null; + assert(first && first.m_queue == &this, "Popping from empty or mismatching queue"); + auto next = first.m_next; + if (next) next.m_prev = null; + first.m_next = null; + first.m_queue = null; + first = next; + length--; + } + + void remove(TaskFiber task) + { + assert(task.m_queue is &this, "Task is not contained in task queue."); + if (task.m_prev) task.m_prev.m_next = task.m_next; + else first = task.m_next; + if (task.m_next) task.m_next.m_prev = task.m_prev; + else last = task.m_prev; + task.m_queue = null; + task.m_prev = null; + task.m_next = null; + } +} + +private struct FLSInfo { + void function(void[], size_t) fct; + size_t offset; + void destroy(void[] fls) { + fct(fls, offset); + } +} + diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index cb3cbcb..fc296c4 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -2,15 +2,39 @@ module vibe.internal.async; import std.traits : ParameterTypeTuple; import std.typecons : tuple; -import vibe.core.core; +import vibe.core.core : hibernate, switchToTask; +import vibe.core.task : InterruptException, Task; import vibe.core.log; import core.time : Duration, seconds; -auto asyncAwait(string method, Object, ARGS...)(Object object, ARGS args) +auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)() +if (!is(Object == Duration)) { + return asyncAwaitImpl!(true, Callback, action, cancel, func)(Duration.max); +} + +auto asyncAwait(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) { - alias CB = ParameterTypeTuple!(__traits(getMember, Object, method))[$-1]; - alias CBTypes = ParameterTypeTuple!CB; + return asyncAwaitImpl!(true, Callback, action, cancel, func)(timeout); +} + +auto asyncAwaitUninterruptible(Callback, alias action, string func = __FUNCTION__)() +nothrow { + return asyncAwaitImpl!(false, Callback, action, (cb) { assert(false); }, func)(Duration.max); +} + +auto asyncAwaitUninterruptible(Callback, alias action, alias cancel, string func = __FUNCTION__)(Duration timeout) +nothrow { + assert(timeout >= 0.seconds); + asyncAwaitImpl!(false, Callback, action, cancel, func)(timeout); +} + +private auto asyncAwaitImpl(bool interruptible, Callback, alias action, alias cancel, string func)(Duration timeout) +@safe if (!is(Object == Duration)) { + alias CBTypes = ParameterTypeTuple!Callback; + + assert(timeout >= 0.seconds); + assert(timeout == Duration.max, "TODO!"); bool fired = false; CBTypes ret; @@ -21,25 +45,28 @@ auto asyncAwait(string method, Object, ARGS...)(Object object, ARGS args) logTrace("Got result."); fired = true; ret = params; - if (t != Task.init) - resumeTask(t); + if (t != Task.init) switchToTask(t); } - logTrace("Calling %s...", method); - __traits(getMember, object, method)(args, &callback); + scope cbdel = &callback; + + logTrace("Calling async function in "~func); + action(cbdel); if (!fired) { logTrace("Need to wait..."); t = Task.getThis(); - do yieldForEvent(); - while (!fired); + do { + static if (interruptible) { + bool interrupted = false; + hibernate(() @safe nothrow { + cancel(cbdel); + interrupted = true; + }); + if (interrupted) + throw new InterruptException; // FIXME: the original operation needs to be stopped! or the callback will still be called" + } else hibernate(); + } while (!fired); } logTrace("Return result."); return tuple(ret); } - -auto asyncAwait(string method, Object, ARGS...)(Duration timeout, Object object, ARGS args) -{ - assert(timeout >= 0.seconds); - if (timeout == Duration.max) return asyncAwait(object, args); - else assert(false, "TODO!"); -}