Merge pull request #27 from vibe-d/fix_threaded_file

Fix ThreadedFileEventDriver read/write issues.
merged-on-behalf-of: Sönke Ludwig <s-ludwig@users.noreply.github.com>
This commit is contained in:
The Dlang Bot 2017-10-30 23:51:40 +01:00 committed by GitHub
commit 8d7c1e3c96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -73,21 +73,26 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
FileIOCallback callback;
shared ThreadedFileStatus status;
shared size_t bytesWritten;
IOStatus ioStatus;
shared IOStatus ioStatus;
void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb)
@safe nothrow {
if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) {
pre_cb();
auto st = safeAtomicLoad(this.status);
if (st == ThreadedFileStatus.finished) {
auto ios = safeAtomicLoad(this.ioStatus);
auto btw = safeAtomicLoad(this.bytesWritten);
auto cb = this.callback;
this.callback = null;
safeAtomicStore(this.status, ThreadedFileStatus.idle);
pre_cb();
if (cb) {
log("fire callback");
cb(fd, ioStatus, safeAtomicLoad(this.bytesWritten));
cb(fd, ios, btw);
}
} else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) {
pre_cb();
} else if (st == ThreadedFileStatus.cancelled) {
this.callback = null;
safeAtomicStore(this.status, ThreadedFileStatus.idle);
pre_cb();
log("ignore callback due to cancellation");
}
}
@ -286,15 +291,16 @@ log("start processing");
scope (exit) {
log("trigger event");
safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
() @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true);
}
if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok);
while (bytes.length > 0) {
auto sz = min(bytes.length, 4096);
auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } ();
if (ret != sz) {
f.ioStatus = IOStatus.error;
safeAtomicStore(f.ioStatus, IOStatus.error);
log("error");
break;
}
@ -303,8 +309,7 @@ log("check for cancel");
if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return;
}
f.ioStatus = IOStatus.ok;
safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
log("wait for status set");
while (true) {
if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;