From a63907b9f7965000a4767c119601630a816f1823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 14 Aug 2017 01:58:42 +0200 Subject: [PATCH] Fix multiple bugs in ThreadedFileEventDriver. - Errors were reported as "ok" - The cancel state logic was flawed and could lose cancel requests - Operations could be removed from the active sets before they actually finished, causing the callback to never get called --- source/eventcore/drivers/threadedfile.d | 71 ++++++++++++------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 926aceb..165cb67 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -61,11 +61,12 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil private { enum ThreadedFileStatus { - idle, // -> processing - processing, // -> cancelling, finished - cancelling, // -> cancelled - cancelled, // -> idle - finished // -> idle + idle, // -> initiated (by caller) + initiated, // -> processing (by worker) + processing, // -> cancelling, finished (by caller, worker) + cancelling, // -> cancelled (by worker) + cancelled, // -> idle (by event receiver) + finished // -> idle (by event receiver) } static struct IOInfo { @@ -74,14 +75,18 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil shared size_t bytesWritten; IOStatus ioStatus; - void flush(FileFD fd) + void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb) @safe nothrow { if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) { + pre_cb(); auto cb = this.callback; this.callback = null; - log("fire callback"); - cb(fd, IOStatus.ok, safeAtomicLoad(this.bytesWritten)); + if (cb) { + log("fire callback"); + cb(fd, ioStatus, safeAtomicLoad(this.bytesWritten)); + } } else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) { + pre_cb(); this.callback = null; log("ignore callback due to cancellation"); } @@ -153,7 +158,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } (); if (flags == -1) return FileFD.invalid; } - + if (m_files[system_file_handle].refCount > 0) return FileFD.invalid; m_files[system_file_handle] = FileInfo.init; m_files[system_file_handle].refCount = 1; @@ -181,6 +186,8 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil { //assert(this.writable); auto f = &m_files[file].write; + if (!safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) + assert(false, "Concurrent file writes are not allowed."); assert(f.callback is null, "Concurrent file writes are not allowed."); f.callback = on_write_finish; m_activeWrites.insert(file); @@ -201,16 +208,20 @@ log("start task"); final override void cancelWrite(FileFD file) { + assert(m_activeWrites.contains(file), "Cancelling write when no write is in progress."); + auto f = &m_files[file].write; + f.callback = null; m_activeWrites.remove(file); m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind - auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); - assert(res, "Cancelling write when no write is in progress."); + safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish) { auto f = &m_files[file].read; + if (!safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) + assert(false, "Concurrent file writes are not allowed."); assert(f.callback is null, "Concurrent file reads are not allowed."); f.callback = on_read_finish; m_activeReads.insert(file); @@ -230,11 +241,13 @@ log("start task"); final override void cancelRead(FileFD file) { + assert(m_activeReads.contains(file), "Cancelling read when no read is in progress."); + auto f = &m_files[file].read; + f.callback = null; m_activeReads.remove(file); m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind - auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); - assert(res, "Cancelling read when no write is in progress."); + safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } final override void addRef(FileFD descriptor) @@ -260,22 +273,10 @@ log("start task"); { log("task fun"); IOInfo* f = mixin("&fd.m_files[file]."~op); -log("wait for cancel"); - - // wait for previous cancel requests to finish - while (safeAtomicLoad(f.status) == ThreadedFileStatus.cancelling) - safeYield(); - -log("wait for callback"); - // wait for previous callbacks to be fired - while (safeAtomicLoad(f.status).among(ThreadedFileStatus.finished, ThreadedFileStatus.cancelled)) - safeYield(); - - assert(safeAtomicLoad(f.status) == ThreadedFileStatus.idle); - log("start processing"); - auto res = safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.processing); - assert(res, "Concurrent file "~op~"s are disallowed."); + + if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) + assert(false, "File slot not in initiated state when processor task is started."); auto bytes = buffer; version (Windows) { @@ -303,7 +304,7 @@ log("check for cancel"); } f.ioStatus = IOStatus.ok; - + log("wait for status set"); while (true) { if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break; @@ -314,15 +315,11 @@ log("wait for status set"); private void onReady(EventID) { log("ready event"); - foreach (f; m_activeReads) { - m_activeReads.remove(f); - m_files[f].read.flush(f); - } + foreach (f; m_activeReads) + m_files[f].read.finalize(f, { m_activeReads.remove(f); }); - foreach (f; m_activeWrites) { - m_activeWrites.remove(f); - m_files[f].write.flush(f); - } + foreach (f; m_activeWrites) + m_files[f].write.finalize(f, { m_activeWrites.remove(f); }); m_waiting = false; startWaiting();