Merge pull request #172 from vibe-d/async_file_operations

Avoid all blocking file operations outside of worker threads
This commit is contained in:
Sönke Ludwig 2019-08-21 20:59:45 +02:00 committed by GitHub
commit ce1adf8c3b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 144 additions and 51 deletions

View file

@ -1083,12 +1083,14 @@ struct Future(T) {
import vibe.internal.freelistref : FreeListRef; import vibe.internal.freelistref : FreeListRef;
private { private {
FreeListRef!(shared(T)) m_result; alias ResultRef = FreeListRef!(shared(Tuple!(T, string)));
ResultRef m_result;
Task m_task; Task m_task;
} }
/// Checks if the values was fully computed. /// Checks if the values was fully computed.
@property bool ready() const { return !m_task.running; } @property bool ready() const @safe { return !m_task.running; }
/** Returns the computed value. /** Returns the computed value.
@ -1098,17 +1100,22 @@ struct Future(T) {
instead. instead.
*/ */
ref T getResult() ref T getResult()
{ @safe {
if (!ready) m_task.join(); if (!ready) m_task.join();
assert(ready, "Task still running after join()!?"); assert(ready, "Task still running after join()!?");
return *cast(T*)&m_result.get(); // casting away shared is safe, because this is a unique reference
if (m_result.get[1].length)
throw new Exception(m_result.get[1]);
// casting away shared is safe, because this is a unique reference
return *() @trusted { return cast(T*)&m_result.get()[0]; } ();
} }
alias getResult this; alias getResult this;
private void init() private void init()
{ @safe {
m_result = FreeListRef!(shared(T))(); m_result = ResultRef();
} }
} }
@ -1141,8 +1148,9 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar
alias RET = ReturnType!CALLABLE; alias RET = ReturnType!CALLABLE;
Future!RET ret; Future!RET ret;
ret.init(); ret.init();
static void compute(FreeListRef!(shared(RET)) dst, CALLABLE callable, ARGS args) { static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) {
dst.get = cast(shared(RET))callable(args); try dst.get[0] = cast(shared(RET))callable(args);
catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed";
} }
static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) {
ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args);
@ -1153,7 +1161,7 @@ Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS ar
} }
/// ///
unittest { @safe unittest {
import vibe.core.core; import vibe.core.core;
import vibe.core.log; import vibe.core.log;
@ -1198,6 +1206,26 @@ unittest {
} }
} }
Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) @safe
if (is(typeof(callable(args)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import vibe.core.core;
import vibe.internal.freelistref : FreeListRef;
import std.functional : toDelegate;
alias RET = ReturnType!CALLABLE;
Future!RET ret;
ret.init();
static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) {
try *cast(RET*)&dst.get[0] = callable(args);
catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed";
}
ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args);
return ret;
}
/******************************************************************************/ /******************************************************************************/
/* std.concurrency compatible interface for message passing */ /* std.concurrency compatible interface for message passing */
/******************************************************************************/ /******************************************************************************/

View file

@ -614,15 +614,17 @@ void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref
See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`
*/ */
public void setupWorkerThreads(uint num = logicalProcessorCount()) public void setupWorkerThreads(uint num = logicalProcessorCount())
{ @safe {
static bool s_workerThreadsStarted = false; static bool s_workerThreadsStarted = false;
if (s_workerThreadsStarted) return; if (s_workerThreadsStarted) return;
s_workerThreadsStarted = true; s_workerThreadsStarted = true;
() @trusted {
synchronized (st_threadsMutex) { synchronized (st_threadsMutex) {
if (!st_workerPool) if (!st_workerPool)
st_workerPool = new shared TaskPool(num); st_workerPool = new shared TaskPool(num);
} }
} ();
} }

View file

@ -1,12 +1,13 @@
/** /**
File handling functions and types. File handling functions and types.
Copyright: © 2012-2018 RejectedSoftware e.K. Copyright: © 2012-2019 RejectedSoftware e.K.
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig Authors: Sönke Ludwig
*/ */
module vibe.core.file; module vibe.core.file;
import vibe.core.concurrency : asyncWork;
import eventcore.core : NativeEventDriver, eventDriver; import eventcore.core : NativeEventDriver, eventDriver;
import eventcore.driver; import eventcore.driver;
import vibe.core.internal.release; import vibe.core.internal.release;
@ -26,6 +27,8 @@ import std.exception;
import std.file; import std.file;
import std.path; import std.path;
import std.string; import std.string;
import std.typecons : Flag, No;
version(Posix){ version(Posix){
private extern(C) int mkstemps(char* templ, int suffixlen); private extern(C) int mkstemps(char* templ, int suffixlen);
@ -278,7 +281,12 @@ bool existsFile(string path) nothrow
// This was *annotated* nothrow in 2.067. // This was *annotated* nothrow in 2.067.
static if (__VERSION__ < 2067) static if (__VERSION__ < 2067)
scope(failure) assert(0, "Error: existsFile should never throw"); scope(failure) assert(0, "Error: existsFile should never throw");
return std.file.exists(path);
try return asyncWork((string p) => std.file.exists(p), path).getResult();
catch (Exception e) {
logDebug("Failed to determine file existence for '%s': %s", path, e.msg);
return false;
}
} }
/** Stores information about the specified file/directory into 'info' /** Stores information about the specified file/directory into 'info'
@ -287,13 +295,15 @@ bool existsFile(string path) nothrow
*/ */
FileInfo getFileInfo(NativePath path) FileInfo getFileInfo(NativePath path)
@trusted { @trusted {
auto ent = DirEntry(path.toNativeString()); return getFileInfo(path.toNativeString);
return makeFileInfo(ent);
} }
/// ditto /// ditto
FileInfo getFileInfo(string path) FileInfo getFileInfo(string path)
{ {
return getFileInfo(NativePath(path)); return asyncWork((string p) {
auto ent = DirEntry(p);
return makeFileInfo(ent);
}, path).getResult();
} }
/** /**
@ -301,27 +311,77 @@ FileInfo getFileInfo(string path)
*/ */
void createDirectory(NativePath path) void createDirectory(NativePath path)
{ {
() @trusted { mkdir(path.toNativeString()); } (); createDirectory(path.toNativeString);
} }
/// ditto /// ditto
void createDirectory(string path) void createDirectory(string path, Flag!"recursive" recursive = No.recursive)
{ {
createDirectory(NativePath(path)); auto fail = asyncWork((string p, bool rec) {
try {
if (rec) mkdirRecurse(p);
else mkdir(p);
} catch (Exception e) {
return e.msg.length ? e.msg : "Failed to create directory.";
}
return null;
}, path, !!recursive);
if (fail) throw new Exception(fail);
} }
/** /**
Enumerates all files in the specified directory. Enumerates all files in the specified directory.
*/ */
void listDirectory(NativePath path, scope bool delegate(FileInfo info) del) void listDirectory(NativePath path, scope bool delegate(FileInfo info) @safe del)
@trusted { {
foreach( DirEntry ent; dirEntries(path.toNativeString(), SpanMode.shallow) ) listDirectory(path.toNativeString, del);
if( !del(makeFileInfo(ent)) )
break;
} }
/// ditto /// ditto
void listDirectory(string path, scope bool delegate(FileInfo info) del) void listDirectory(string path, scope bool delegate(FileInfo info) @safe del)
{ {
listDirectory(NativePath(path), del); import vibe.core.core : runWorkerTaskH;
import vibe.core.channel : Channel, createChannel;
struct S {
FileInfo info;
string error;
}
auto ch = createChannel!S();
runWorkerTaskH((string path, Channel!S ch) nothrow {
scope (exit) ch.close();
try {
foreach (DirEntry ent; dirEntries(path, SpanMode.shallow)) {
auto nfo = makeFileInfo(ent);
try ch.put(S(nfo, null));
catch (Exception e) break; // channel got closed
}
} catch (Exception e) {
try ch.put(S(FileInfo.init, e.msg.length ? e.msg : "Failed to iterate directory"));
catch (Exception e) {} // channel got closed
}
}, path, ch);
S itm;
while (ch.tryConsumeOne(itm)) {
if (itm.error.length)
throw new Exception(itm.error);
if (!del(itm.info)) {
ch.close();
break;
}
}
}
/// ditto
void listDirectory(NativePath path, scope bool delegate(FileInfo info) @system del)
@system {
listDirectory(path, (nfo) @trusted => del(nfo));
}
/// ditto
void listDirectory(string path, scope bool delegate(FileInfo info) @system del)
@system {
listDirectory(path, (nfo) @trusted => del(nfo));
} }
/// ditto /// ditto
int delegate(scope int delegate(ref FileInfo)) iterateDirectory(NativePath path) int delegate(scope int delegate(ref FileInfo)) iterateDirectory(NativePath path)

View file

@ -135,28 +135,14 @@ shared final class TaskPool {
Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
if (isFunctionPointer!FT) if (isFunctionPointer!FT)
{ {
import std.typecons : Typedef;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
// workaround for runWorkerTaskH to work when called outside of a task // workaround for runWorkerTaskH to work when called outside of a task
if (caller == Task.init) { if (Task.getThis() == Task.init) {
Task ret; Task ret;
.runTask(&runTaskHWrapper!(FT, ARGS), () @trusted { return &ret; } (), func, args).join(); .runTask({ ret = doRunTaskH(func, args); }).join();
return ret; return ret;
} } else return doRunTaskH(func, args);
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.tid.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
return cast(Task)receiveOnly!PrivateTask();
} }
/// ditto /// ditto
Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
@ -168,6 +154,28 @@ shared final class TaskPool {
return runTaskH(&wrapper!(), object, args); return runTaskH(&wrapper!(), object, args);
} }
// NOTE: needs to be a separate function to avoid recursion for the
// workaround above, which breaks @safe inference
private Task doRunTaskH(FT, ARGS...)(FT func, ref ARGS args)
if (isFunctionPointer!FT)
{
import std.typecons : Typedef;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
Task caller = Task.getThis();
assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task.");
static void taskFun(Task caller, FT func, ARGS args) {
PrivateTask callee = Task.getThis();
caller.tid.prioritySend(callee);
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(&taskFun, caller, func, args);
return cast(Task)() @trusted { return receiveOnly!PrivateTask(); } ();
}
/** Runs a new asynchronous task in all worker threads concurrently. /** Runs a new asynchronous task in all worker threads concurrently.
@ -223,11 +231,6 @@ shared final class TaskPool {
on_handle(receiveOnly!Task); on_handle(receiveOnly!Task);
} }
private void runTaskHWrapper(FT, ARGS...)(Task* ret, FT func, ARGS args)
{
*ret = runTaskH!(FT, ARGS)(func, args);
}
private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) private void runTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args)
{ {
import std.traits : ParameterTypeTuple; import std.traits : ParameterTypeTuple;