Fix multiple bugs in ThreadedFileEventDriver.

- Errors were reported as "ok"
- The cancel state logic was flawed and could lose cancel requests
- Operations could be removed from the active sets before they actually finished, causing the callback to never get called
This commit is contained in:
Sönke Ludwig 2017-08-14 01:58:42 +02:00
parent dd873ec6cf
commit a63907b9f7
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -61,11 +61,12 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
private { private {
enum ThreadedFileStatus { enum ThreadedFileStatus {
idle, // -> processing idle, // -> initiated (by caller)
processing, // -> cancelling, finished initiated, // -> processing (by worker)
cancelling, // -> cancelled processing, // -> cancelling, finished (by caller, worker)
cancelled, // -> idle cancelling, // -> cancelled (by worker)
finished // -> idle cancelled, // -> idle (by event receiver)
finished // -> idle (by event receiver)
} }
static struct IOInfo { static struct IOInfo {
@ -74,14 +75,18 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
shared size_t bytesWritten; shared size_t bytesWritten;
IOStatus ioStatus; IOStatus ioStatus;
void flush(FileFD fd) void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb)
@safe nothrow { @safe nothrow {
if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) { if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) {
pre_cb();
auto cb = this.callback; auto cb = this.callback;
this.callback = null; this.callback = null;
log("fire callback"); if (cb) {
cb(fd, IOStatus.ok, safeAtomicLoad(this.bytesWritten)); log("fire callback");
cb(fd, ioStatus, safeAtomicLoad(this.bytesWritten));
}
} else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) { } else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) {
pre_cb();
this.callback = null; this.callback = null;
log("ignore callback due to cancellation"); log("ignore callback due to cancellation");
} }
@ -153,7 +158,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } (); auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } ();
if (flags == -1) return FileFD.invalid; if (flags == -1) return FileFD.invalid;
} }
if (m_files[system_file_handle].refCount > 0) return FileFD.invalid; if (m_files[system_file_handle].refCount > 0) return FileFD.invalid;
m_files[system_file_handle] = FileInfo.init; m_files[system_file_handle] = FileInfo.init;
m_files[system_file_handle].refCount = 1; m_files[system_file_handle].refCount = 1;
@ -181,6 +186,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
{ {
//assert(this.writable); //assert(this.writable);
auto f = &m_files[file].write; auto f = &m_files[file].write;
if (!safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
assert(false, "Concurrent file writes are not allowed.");
assert(f.callback is null, "Concurrent file writes are not allowed."); assert(f.callback is null, "Concurrent file writes are not allowed.");
f.callback = on_write_finish; f.callback = on_write_finish;
m_activeWrites.insert(file); m_activeWrites.insert(file);
@ -201,16 +208,20 @@ log("start task");
final override void cancelWrite(FileFD file) final override void cancelWrite(FileFD file)
{ {
assert(m_activeWrites.contains(file), "Cancelling write when no write is in progress.");
auto f = &m_files[file].write; auto f = &m_files[file].write;
f.callback = null;
m_activeWrites.remove(file); m_activeWrites.remove(file);
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling);
assert(res, "Cancelling write when no write is in progress.");
} }
final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish) final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish)
{ {
auto f = &m_files[file].read; auto f = &m_files[file].read;
if (!safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
assert(false, "Concurrent file writes are not allowed.");
assert(f.callback is null, "Concurrent file reads are not allowed."); assert(f.callback is null, "Concurrent file reads are not allowed.");
f.callback = on_read_finish; f.callback = on_read_finish;
m_activeReads.insert(file); m_activeReads.insert(file);
@ -230,11 +241,13 @@ log("start task");
final override void cancelRead(FileFD file) final override void cancelRead(FileFD file)
{ {
assert(m_activeReads.contains(file), "Cancelling read when no read is in progress.");
auto f = &m_files[file].read; auto f = &m_files[file].read;
f.callback = null;
m_activeReads.remove(file); m_activeReads.remove(file);
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling);
assert(res, "Cancelling read when no write is in progress.");
} }
final override void addRef(FileFD descriptor) final override void addRef(FileFD descriptor)
@ -260,22 +273,10 @@ log("start task");
{ {
log("task fun"); log("task fun");
IOInfo* f = mixin("&fd.m_files[file]."~op); IOInfo* f = mixin("&fd.m_files[file]."~op);
log("wait for cancel");
// wait for previous cancel requests to finish
while (safeAtomicLoad(f.status) == ThreadedFileStatus.cancelling)
safeYield();
log("wait for callback");
// wait for previous callbacks to be fired
while (safeAtomicLoad(f.status).among(ThreadedFileStatus.finished, ThreadedFileStatus.cancelled))
safeYield();
assert(safeAtomicLoad(f.status) == ThreadedFileStatus.idle);
log("start processing"); log("start processing");
auto res = safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.processing);
assert(res, "Concurrent file "~op~"s are disallowed."); if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing))
assert(false, "File slot not in initiated state when processor task is started.");
auto bytes = buffer; auto bytes = buffer;
version (Windows) { version (Windows) {
@ -303,7 +304,7 @@ log("check for cancel");
} }
f.ioStatus = IOStatus.ok; f.ioStatus = IOStatus.ok;
log("wait for status set"); log("wait for status set");
while (true) { while (true) {
if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break; if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;
@ -314,15 +315,11 @@ log("wait for status set");
private void onReady(EventID) private void onReady(EventID)
{ {
log("ready event"); log("ready event");
foreach (f; m_activeReads) { foreach (f; m_activeReads)
m_activeReads.remove(f); m_files[f].read.finalize(f, { m_activeReads.remove(f); });
m_files[f].read.flush(f);
}
foreach (f; m_activeWrites) { foreach (f; m_activeWrites)
m_activeWrites.remove(f); m_files[f].write.finalize(f, { m_activeWrites.remove(f); });
m_files[f].write.flush(f);
}
m_waiting = false; m_waiting = false;
startWaiting(); startWaiting();