diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 3dd22a1..5cf899c 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -73,21 +73,26 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil FileIOCallback callback; shared ThreadedFileStatus status; shared size_t bytesWritten; - IOStatus ioStatus; + shared IOStatus ioStatus; void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb) @safe nothrow { - if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) { - pre_cb(); + auto st = safeAtomicLoad(this.status); + if (st == ThreadedFileStatus.finished) { + auto ios = safeAtomicLoad(this.ioStatus); + auto btw = safeAtomicLoad(this.bytesWritten); auto cb = this.callback; this.callback = null; + safeAtomicStore(this.status, ThreadedFileStatus.idle); + pre_cb(); if (cb) { log("fire callback"); - cb(fd, ioStatus, safeAtomicLoad(this.bytesWritten)); + cb(fd, ios, btw); } - } else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) { - pre_cb(); + } else if (st == ThreadedFileStatus.cancelled) { this.callback = null; + safeAtomicStore(this.status, ThreadedFileStatus.idle); + pre_cb(); log("ignore callback due to cancellation"); } } @@ -286,15 +291,16 @@ log("start processing"); scope (exit) { log("trigger event"); - safeAtomicStore(f.bytesWritten, buffer.length - bytes.length); () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); } + if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); + while (bytes.length > 0) { auto sz = min(bytes.length, 4096); auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } (); if (ret != sz) { - f.ioStatus = IOStatus.error; + safeAtomicStore(f.ioStatus, IOStatus.error); log("error"); break; } @@ -303,8 +309,7 @@ log("check for cancel"); if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return; } - f.ioStatus = IOStatus.ok; - + safeAtomicStore(f.bytesWritten, buffer.length - bytes.length); log("wait for status set"); while (true) { if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;