Merge pull request #210 from vibe-d/async_improvements

Remove potentially blocking file I/O code (upgrade to eventcore 0.9.0).
This commit is contained in:
Sönke Ludwig 2020-05-22 18:14:22 +02:00 committed by GitHub
commit 872bff8472
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 615 additions and 550 deletions

View file

@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016-2020, Sönke Ludwig" copyright "Copyright © 2016-2020, Sönke Ludwig"
license "MIT" license "MIT"
dependency "eventcore" version="~>0.8.43" dependency "eventcore" version="~>0.9.0"
dependency "stdx-allocator" version="~>2.77.0" dependency "stdx-allocator" version="~>2.77.0"
targetName "vibe_core" targetName "vibe_core"

View file

@ -190,16 +190,21 @@ void moveFile(NativePath from, NativePath to, bool copy_fallback = false)
/// ditto /// ditto
void moveFile(string from, string to, bool copy_fallback = false) void moveFile(string from, string to, bool copy_fallback = false)
{ {
if (!copy_fallback) { auto fail = performInWorker((string from, string to) {
std.file.rename(from, to);
} else {
try { try {
std.file.rename(from, to); std.file.rename(from, to);
} catch (FileException e) { } catch (Exception e) {
return e.msg.length ? e.msg : "Failed to move file.";
}
return null;
}, from, to);
if (!fail.length) return;
if (!copy_fallback) throw new Exception(fail);
copyFile(from, to); copyFile(from, to);
std.file.remove(from); removeFile(from);
}
}
} }
/** /**
@ -236,6 +241,7 @@ void copyFile(NativePath from, NativePath to, bool overwrite = false)
enforce(overwrite || !existsFile(to), "Destination file already exists."); enforce(overwrite || !existsFile(to), "Destination file already exists.");
auto dst = openFile(to, FileMode.createTrunc); auto dst = openFile(to, FileMode.createTrunc);
scope(exit) dst.close(); scope(exit) dst.close();
dst.truncate(src.size);
dst.write(src); dst.write(src);
} }
@ -267,7 +273,16 @@ void removeFile(NativePath path)
/// ditto /// ditto
void removeFile(string path) void removeFile(string path)
{ {
auto fail = performInWorker((string path) {
try {
std.file.remove(path); std.file.remove(path);
} catch (Exception e) {
return e.msg.length ? e.msg : "Failed to delete file.";
}
return null;
}, path);
if (fail.length) throw new Exception(fail);
} }
/** /**
@ -567,12 +582,18 @@ struct FileStream {
/// Closes the file handle. /// Closes the file handle.
void close() void close()
{ {
if (m_fd != FileFD.init) { if (m_fd == FileFD.invalid) return;
eventDriver.files.close(m_fd); // FIXME: may leave dangling references! if (!eventDriver.files.isValid(m_fd)) return;
auto res = asyncAwaitUninterruptible!(FileCloseCallback,
cb => eventDriver.files.close(m_fd, cb)
);
releaseHandle!"files"(m_fd, m_ctx.driver); releaseHandle!"files"(m_fd, m_ctx.driver);
m_fd = FileFD.init; m_fd = FileFD.invalid;
m_ctx = null; m_ctx = null;
}
if (res[1] != CloseStatus.ok)
throw new Exception("Failed to close file");
} }
@property bool empty() const { assert(this.readable); return ctx.ptr >= ctx.size; } @property bool empty() const { assert(this.readable); return ctx.ptr >= ctx.size; }

View file

@ -1,6 +1,7 @@
module vibe.core.internal.release; module vibe.core.internal.release;
import eventcore.core; import eventcore.core;
import std.stdint : intptr_t;
/// Release a handle in a thread-safe way /// Release a handle in a thread-safe way
void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv) void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv)
@ -19,8 +20,8 @@ void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv)
// in case the destructor was called from a foreign thread, // in case the destructor was called from a foreign thread,
// perform the release in the owner thread // perform the release in the owner thread
drv.core.runInOwnerThread((h) { drv.core.runInOwnerThread((H handle) {
__traits(getMember, eventDriver, subsys).releaseRef(cast(H)h); __traits(getMember, eventDriver, subsys).releaseRef(handle);
}, cast(size_t)handle); }, handle);
} }
} }

View file

@ -40,7 +40,10 @@ Process adoptProcessID(Pid pid)
/// ditto /// ditto
Process adoptProcessID(int pid) Process adoptProcessID(int pid)
{ {
return Process(eventDriver.processes.adopt(pid)); auto p = eventDriver.processes.adopt(pid);
if (p == ProcessID.invalid)
throw new Exception("Failed to adopt process ID");
return Process(p);
} }
/** /**
@ -81,15 +84,19 @@ Process spawnProcess(
Config config = Config.none, Config config = Config.none,
scope NativePath workDir = NativePath.init) scope NativePath workDir = NativePath.init)
@trusted { @trusted {
return Process(eventDriver.processes.spawn( auto process = eventDriver.processes.spawn(
args, args,
ProcessStdinFile(ProcessRedirect.inherit), ProcessStdinFile(ProcessRedirect.inherit),
ProcessStdoutFile(ProcessRedirect.inherit), ProcessStdoutFile(ProcessRedirect.inherit),
ProcessStderrFile(ProcessRedirect.inherit), ProcessStderrFile(ProcessRedirect.inherit),
env, env,
config, config,
workDir.toNativeString()).pid workDir.toNativeString());
);
if (process.pid == ProcessID.invalid)
throw new Exception("Failed to spawn process");
return Process(process.pid);
} }
/// ditto /// ditto
@ -453,7 +460,14 @@ struct PipeInputStream {
*/ */
void close() void close()
nothrow { nothrow {
eventDriver.pipes.close(m_pipe); if (m_pipe == PipeFD.invalid) return;
asyncAwaitUninterruptible!(PipeCloseCallback,
cb => eventDriver.pipes.close(m_pipe, cb)
);
eventDriver.pipes.releaseRef(m_pipe);
m_pipe = PipeFD.invalid;
} }
} }
@ -528,7 +542,14 @@ struct PipeOutputStream {
*/ */
void close() void close()
nothrow { nothrow {
eventDriver.pipes.close(m_pipe); if (m_pipe == PipeFD.invalid) return;
asyncAwaitUninterruptible!(PipeCloseCallback,
cb => eventDriver.pipes.close(m_pipe, cb)
);
eventDriver.pipes.releaseRef(m_pipe);
m_pipe = PipeFD.invalid;
} }
} }
@ -624,6 +645,9 @@ ProcessPipes pipeProcess(
config, config,
workDir.toNativeString()); workDir.toNativeString());
if (process.pid == ProcessID.invalid)
throw new Exception("Failed to spawn process");
return ProcessPipes( return ProcessPipes(
Process(process.pid), Process(process.pid),
PipeOutputStream(cast(PipeFD)process.stdin), PipeOutputStream(cast(PipeFD)process.stdin),

View file

@ -7,6 +7,7 @@ module test;
import vibe.core.core; import vibe.core.core;
import vibe.core.log; import vibe.core.log;
import std.algorithm;
import std.concurrency; import std.concurrency;
import core.atomic; import core.atomic;
import core.time; import core.time;
@ -18,9 +19,16 @@ shared watchdog_count = 0;
void main() void main()
{ {
t1 = spawn({ t1 = spawn({
// ensure that asynchronous operations run in parallel to receive() // ensure that asynchronous operations can run in parallel to receive()
int wc = 0; int wc = 0;
runTask({ while (true) { sleep(250.msecs); wc++; logInfo("Watchdog receiver %s", wc); } }); MonoTime stime = MonoTime.currTime;
runTask({
while (true) {
sleepUntil((wc + 1) * 250.msecs, stime, 200.msecs);
wc++;
logInfo("Watchdog receiver %s", wc);
}
});
bool finished = false; bool finished = false;
try while (!finished) { try while (!finished) {
@ -57,28 +65,30 @@ void main()
}); });
t2 = spawn({ t2 = spawn({
MonoTime stime = MonoTime.currTime;
scope (failure) assert(false); scope (failure) assert(false);
sleep(1.seconds()); sleepUntil(1.seconds, stime, 900.msecs);
logInfo("send Hello World"); logInfo("send Hello World");
t1.send("Hello, World!"); t1.send("Hello, World!");
sleep(1.seconds()); sleepUntil(2.seconds, stime, 900.msecs);
logInfo("send int 1"); logInfo("send int 1");
t1.send(1); t1.send(1);
sleep(1.seconds()); sleepUntil(3.seconds, stime, 900.msecs);
logInfo("send double 1.2"); logInfo("send double 1.2");
t1.send(1.2); t1.send(1.2);
sleep(1.seconds()); sleepUntil(4.seconds, stime, 900.msecs);
logInfo("send int 2"); logInfo("send int 2");
t1.send(2); t1.send(2);
sleep(1.seconds()); sleepUntil(5.seconds, stime, 900.msecs);
logInfo("send 3xint 1 2 3"); logInfo("send 3xint 1 2 3");
t1.send(1, 2, 3); t1.send(1, 2, 3);
sleep(1.seconds()); sleepUntil(6.seconds, stime, 900.msecs);
logInfo("send string Bye bye"); logInfo("send string Bye bye");
t1.send("Bye bye"); t1.send("Bye bye");
@ -89,3 +99,12 @@ void main()
runApplication(); runApplication();
} }
// corrects for small timing inaccuracies to avoid the counter
// getting systematically out of sync when sleep timing is inaccurate
void sleepUntil(Duration until, MonoTime start_time, Duration min_sleep)
{
auto tm = MonoTime.currTime;
auto timeout = max(start_time - tm + until, min_sleep);
sleep(timeout);
}