Fix ThreadedFileEventDriver read/write issues.

I/O errors were being reported as IOStatus.ok instead of IOStatus.error, and the I/O operation finalization protocol had a potential race-condition when storing the number of bytes processed.
This commit is contained in:
Sönke Ludwig 2017-10-30 22:54:14 +01:00
parent 5203766044
commit c4550f0e59
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -73,21 +73,26 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
FileIOCallback callback; FileIOCallback callback;
shared ThreadedFileStatus status; shared ThreadedFileStatus status;
shared size_t bytesWritten; shared size_t bytesWritten;
IOStatus ioStatus; shared IOStatus ioStatus;
void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb) void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb)
@safe nothrow { @safe nothrow {
if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) { auto st = safeAtomicLoad(this.status);
pre_cb(); if (st == ThreadedFileStatus.finished) {
auto ios = safeAtomicLoad(this.ioStatus);
auto btw = safeAtomicLoad(this.bytesWritten);
auto cb = this.callback; auto cb = this.callback;
this.callback = null; this.callback = null;
safeAtomicStore(this.status, ThreadedFileStatus.idle);
pre_cb();
if (cb) { if (cb) {
log("fire callback"); log("fire callback");
cb(fd, ioStatus, safeAtomicLoad(this.bytesWritten)); cb(fd, ios, btw);
} }
} else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) { } else if (st == ThreadedFileStatus.cancelled) {
pre_cb();
this.callback = null; this.callback = null;
safeAtomicStore(this.status, ThreadedFileStatus.idle);
pre_cb();
log("ignore callback due to cancellation"); log("ignore callback due to cancellation");
} }
} }
@ -286,15 +291,16 @@ log("start processing");
scope (exit) { scope (exit) {
log("trigger event"); log("trigger event");
safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
() @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true);
} }
if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok);
while (bytes.length > 0) { while (bytes.length > 0) {
auto sz = min(bytes.length, 4096); auto sz = min(bytes.length, 4096);
auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } (); auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } ();
if (ret != sz) { if (ret != sz) {
f.ioStatus = IOStatus.error; safeAtomicStore(f.ioStatus, IOStatus.error);
log("error"); log("error");
break; break;
} }
@ -303,8 +309,7 @@ log("check for cancel");
if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return; if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return;
} }
f.ioStatus = IOStatus.ok; safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
log("wait for status set"); log("wait for status set");
while (true) { while (true) {
if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break; if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;