From 2a8c52f34732b87479cb50ed1cf05d12b6bdfb9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 6 Oct 2016 22:04:33 +0200 Subject: [PATCH] Add a thread pool based async file implementation. --- README.md | 2 +- source/eventcore/driver.d | 9 +- source/eventcore/drivers/epoll.d | 2 +- source/eventcore/drivers/posix.d | 54 +--- source/eventcore/drivers/select.d | 5 +- source/eventcore/drivers/threadedfile.d | 329 ++++++++++++++++++++++++ 6 files changed, 350 insertions(+), 51 deletions(-) create mode 100644 source/eventcore/drivers/threadedfile.d diff --git a/README.md b/README.md index 3968be0..7325e13 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,6 @@ DNS | no | no | no | no Timers | yes | yes | no | no Events | yes | yes | no | no Signals | no | no | no | no -Files | no | no | no | no +Files | yes | yes | no | no UI Integration | no | no | no | no File watcher | no | no | no | no diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index a18d8ca..a79723a 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -104,8 +104,12 @@ interface EventDriverFiles { @safe: /*@nogc:*/ nothrow: FileFD open(string path, FileOpenMode mode); FileFD createTemp(); - void write(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_write_finish); - void read(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_read_finish); + void close(FileFD file); + + ulong getSize(FileFD file); + + void write(FileFD file, ulong offset, const(ubyte)[] buffer, FileIOCallback on_write_finish); + void read(FileFD file, ulong offset, ubyte[] buffer, FileIOCallback on_read_finish); void cancelWrite(FileFD file); void cancelRead(FileFD file); @@ -191,6 +195,7 @@ interface EventDriverWatchers { alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); +alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(int); alias TimerCallback = void delegate(TimerID); diff --git a/source/eventcore/drivers/epoll.d b/source/eventcore/drivers/epoll.d index 3499b4b..1f24acd 100644 --- a/source/eventcore/drivers/epoll.d +++ b/source/eventcore/drivers/epoll.d @@ -31,7 +31,6 @@ final class EpollEventDriver : PosixEventDriver { nothrow @safe { override @property EpollEventDriver core() { return this; } - override @property EpollEventDriver files() { return this; } override @property EpollEventDriver sockets() { return this; } override @property EpollEventDriver timers() { return this; } override @property EpollEventDriver events() { return this; } @@ -67,6 +66,7 @@ final class EpollEventDriver : PosixEventDriver { override void dispose() { import core.sys.posix.unistd : close; + super.dispose(); close(m_epoll); } diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index f3c75f9..568196c 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -8,6 +8,7 @@ module eventcore.drivers.posix; public import eventcore.driver; import eventcore.drivers.timer; +import eventcore.drivers.threadedfile; import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.utils; @@ -37,7 +38,7 @@ private long currStdTime() } abstract class PosixEventDriver : EventDriver, - EventDriverCore, EventDriverFiles, EventDriverSockets, EventDriverTimers, + EventDriverCore, EventDriverSockets, EventDriverTimers, EventDriverEvents, EventDriverSignals, EventDriverWatchers { @safe: /*@nogc:*/ nothrow: @@ -47,6 +48,7 @@ abstract class PosixEventDriver : EventDriver, size_t m_waiterCount = 0; bool m_exit = false; FD m_wakeupEvent; + ThreadedFileEventDriver!PosixEventDriver m_files; } protected this() @@ -55,11 +57,12 @@ abstract class PosixEventDriver : EventDriver, initFD(m_wakeupEvent); registerFD(m_wakeupEvent, EventMask.read); //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD + m_files = new ThreadedFileEventDriver!PosixEventDriver(this); } // force overriding these in the (final) sub classes to avoid virtual calls abstract override @property PosixEventDriver core(); - abstract override @property PosixEventDriver files(); + final override @property ThreadedFileEventDriver!PosixEventDriver files() { return m_files; } abstract override @property PosixEventDriver sockets(); abstract override @property PosixEventDriver timers(); abstract override @property PosixEventDriver events(); @@ -124,7 +127,11 @@ abstract class PosixEventDriver : EventDriver, } protected abstract bool doProcessEvents(Duration dur); - abstract void dispose(); + + abstract void dispose() + { + m_files.dispose(); + } final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) { @@ -512,47 +519,6 @@ abstract class PosixEventDriver : EventDriver, } } - final override FileFD open(string path, FileOpenMode mode) - { - assert(false, "TODO!"); - } - - final override FileFD createTemp() - { - assert(false, "TODO!"); - } - - final override void write(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_write_finish) - { - assert(false, "TODO!"); - } - - final override void read(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_read_finish) - { - assert(false, "TODO!"); - } - - final override void cancelWrite(FileFD file) - { - assert(false, "TODO!"); - } - - final override void cancelRead(FileFD file) - { - assert(false, "TODO!"); - } - - final override void addRef(FileFD descriptor) - { - assert(false); - } - - final override void releaseRef(FileFD descriptor) - { - assert(false); - } - - final override EventID create() { auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); diff --git a/source/eventcore/drivers/select.d b/source/eventcore/drivers/select.d index 1c53ecc..5c6e7b1 100644 --- a/source/eventcore/drivers/select.d +++ b/source/eventcore/drivers/select.d @@ -25,7 +25,6 @@ version (Windows) { final class SelectEventDriver : PosixEventDriver { override @property SelectEventDriver core() { return this; } - override @property SelectEventDriver files() { return this; } override @property SelectEventDriver sockets() { return this; } override @property SelectEventDriver timers() { return this; } override @property SelectEventDriver events() { return this; } @@ -51,7 +50,7 @@ final class SelectEventDriver : PosixEventDriver { enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); }); enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); }); -//print("Wait for event..."); +//print("Wait for event... %s", timeout); //writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3); auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } (); //writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3); @@ -75,7 +74,7 @@ final class SelectEventDriver : PosixEventDriver { override void dispose() { - + super.dispose(); } override void registerFD(FD fd, EventMask mask) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d new file mode 100644 index 0000000..95d03e1 --- /dev/null +++ b/source/eventcore/drivers/threadedfile.d @@ -0,0 +1,329 @@ +module eventcore.drivers.threadedfile; + +import eventcore.driver; +import eventcore.internal.utils; +import core.atomic; +import core.stdc.errno; +import std.algorithm.comparison : among, min; + +version(Posix){ + import core.sys.posix.fcntl; + import core.sys.posix.sys.stat; + import core.sys.posix.unistd; +} +version(Windows){ + static if (__VERSION__ >= 2070) + import core.sys.windows.stat; + else + import std.c.windows.stat; + + private { + // TODO: use CreateFile/HANDLE instead of the Posix API on Windows + + extern(C) { + alias off_t = sizediff_t; + int open(in char* name, int mode, ...); + int chmod(in char* name, int mode); + int close(int fd); + int read(int fd, void *buffer, uint count); + int write(int fd, in void *buffer, uint count); + off_t lseek(int fd, off_t offset, int whence); + } + + enum O_RDONLY = 0; + enum O_WRONLY = 1; + enum O_RDWR = 2; + enum O_APPEND = 8; + enum O_CREAT = 0x0100; + enum O_TRUNC = 0x0200; + enum O_BINARY = 0x8000; + + enum _S_IREAD = 0x0100; /* read permission, owner */ + enum _S_IWRITE = 0x0080; /* write permission, owner */ + alias stat_t = struct_stat; + } +} +else +{ + enum O_BINARY = 0; +} + +private { + enum SEEK_SET = 0; + enum SEEK_CUR = 1; + enum SEEK_END = 2; +} + + +final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFiles +{ + import std.parallelism; + + private { + enum ThreadedFileStatus { + idle, // -> processing + processing, // -> cancelling, finished + cancelling, // -> cancelled + cancelled, // -> idle + finished // -> idle + } + + static struct IOInfo { + FileIOCallback callback; + shared ThreadedFileStatus status; + shared size_t bytesWritten; + IOStatus ioStatus; + + void flush(FileFD fd) + @safe nothrow { + if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) { + auto cb = this.callback; + this.callback = null; + log("fire callback"); + cb(fd, IOStatus.ok, safeAtomicLoad(this.bytesWritten)); + } else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) { + this.callback = null; + log("ignore callback due to cancellation"); + } + } + } + + static struct FileInfo { + IOInfo read; + IOInfo write; + int refCount; + } + + TaskPool m_fileThreadPool; + ChoppedVector!FileInfo m_files; + SmallIntegerSet!FileFD m_activeReads; + SmallIntegerSet!FileFD m_activeWrites; + EventID m_readyEvent; + Events m_events; + } + + @safe: nothrow: + + this(Events events) + { + m_events = events; + m_readyEvent = events.create(); + m_events.wait(m_readyEvent, &onReady); + } + + void dispose() + { + if (m_fileThreadPool) { + log("finishing thread pool"); + try m_fileThreadPool.finish(); + catch (Exception e) { + //logError("Failed to shut down file I/O thread pool."); + } + } + log("finishing file events"); + m_events.cancelWait(m_readyEvent, &onReady); + onReady(m_readyEvent); + m_events.releaseRef(m_readyEvent); + log("finished file events"); + } + + final override FileFD open(string path, FileOpenMode mode) + { + import std.string : toStringz; + + import std.conv : octal; + int flags; + int amode; + final switch (mode) { + case FileOpenMode.read: flags = O_RDONLY|O_BINARY; break; + case FileOpenMode.readWrite: flags = O_RDWR|O_BINARY; break; + case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC|O_BINARY; amode = octal!644; break; + case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND|O_BINARY; amode = octal!644; break; + } + auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } (); + if (fd < 0) return FileFD.init; + return FileFD(fd); + } + + final override FileFD createTemp() + { + assert(false, "TODO!"); + } + + void close(FileFD file) + { + .close(file); + } + + ulong getSize(FileFD file) + { + version (linux) { + // stat_t seems to be defined wrong on linux/64 + return .lseek(file, 0, SEEK_END); + } else { + stat_t st; + fstat(file, &st); + return st.st_size; + } + } + + final override void write(FileFD file, ulong offset, const(ubyte)[] buffer, FileIOCallback on_write_finish) + { + //assert(this.writable); + auto f = &m_files[file].write; + assert(f.callback is null, "Concurrent file writes are not allowed."); + f.callback = on_write_finish; + m_activeWrites.insert(file); +log("start task"); + try { + if (m_fileThreadPool is null) { + m_fileThreadPool = new TaskPool(4); + m_fileThreadPool.isDaemon = true; + } + m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + } catch (Exception e) { + on_write_finish(file, IOStatus.error, 0); + return; + } + } + + final override void cancelWrite(FileFD file) + { + auto f = &m_files[file].write; + m_activeWrites.remove(file); + auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); + assert(res, "Cancelling write when no write is in progress."); + } + + final override void read(FileFD file, ulong offset, ubyte[] buffer, FileIOCallback on_read_finish) + { + auto f = &m_files[file].read; + assert(f.callback is null, "Concurrent file reads are not allowed."); + f.callback = on_read_finish; + m_activeReads.insert(file); + try { + if (m_fileThreadPool is null) { + m_fileThreadPool = new TaskPool(4); + m_fileThreadPool.isDaemon = true; + } + m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + } catch (Exception e) { + on_read_finish(file, IOStatus.error, 0); + return; + } + } + + final override void cancelRead(FileFD file) + { + auto f = &m_files[file].read; + m_activeReads.remove(file); + auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } (); + assert(res, "Cancelling read when no write is in progress."); + } + + final override void addRef(FileFD descriptor) + { + auto f = &m_files[descriptor]; + f.refCount++; + } + + final override void releaseRef(FileFD descriptor) + { + auto f = &m_files[descriptor]; + if (!--f.refCount) { + .close(descriptor); + m_files[descriptor] = FileInfo.init; + assert(!m_activeReads.contains(descriptor)); + assert(!m_activeWrites.contains(descriptor)); + } + } + + /// private + static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) + { +log("ready event"); + IOInfo* f = mixin("&fd.m_files[file]."~op); +log("wait for cancel"); + + // wait for previous cancel requests to finish + while (safeAtomicLoad(f.status) == ThreadedFileStatus.cancelling) + safeYield(); + +log("wait for callback"); + // wait for previous callbacks to be fired + while (safeAtomicLoad(f.status).among(ThreadedFileStatus.finished, ThreadedFileStatus.cancelled)) + safeYield(); + + assert(safeAtomicLoad(f.status) == ThreadedFileStatus.idle); + +log("start processing"); + auto res = safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.processing); + assert(res, "Concurrent file "~op~"s are disallowed."); + + auto bytes = buffer; + .lseek(file, offset, SEEK_SET); + + scope (exit) { +log("trigger event"); + safeAtomicStore(f.bytesWritten, buffer.length - bytes.length); + () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent); + } + + while (bytes.length > 0) { + auto sz = min(bytes.length, 4096); + auto ret = () @trusted { return mixin("."~op)(file, bytes.ptr, cast(int)sz); } (); + if (ret != sz) { + f.ioStatus = IOStatus.error; +log("error"); + break; + } + bytes = bytes[sz .. $]; +log("check for cancel"); + if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return; + } + + f.ioStatus = IOStatus.ok; + +log("wait for status set"); + while (true) { + if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break; + if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) break; + } + } + + private void onReady(EventID) + { +log("ready event"); + foreach (f; m_activeReads) { + m_activeReads.remove(f); + m_files[f].read.flush(f); + } + + foreach (f; m_activeWrites) { + m_activeWrites.remove(f); + m_files[f].write.flush(f); + } + + m_events.wait(m_readyEvent, &onReady); + } +} + +private auto safeAtomicLoad(T)(ref shared(T) v) @trusted { return atomicLoad(v); } +private auto safeAtomicStore(T)(ref shared(T) v, T a) @trusted { return atomicStore(v, a); } +private auto safeCAS(T, U, V)(ref shared(T) v, U a, V b) @trusted { return cas(&v, a, b); } +private void safeYield() @trusted nothrow { + import core.thread : Thread; + import core.time : seconds; + Thread.sleep(0.seconds); +} + +private void log(ARGS...)(string fmt, ARGS args) +@trusted nothrow { + debug (EventCoreLogFiles) { + scope (failure) assert(false); + import core.thread : Thread; + import std.stdio : writef, writefln; + writef("[%s] ", Thread.getThis().name); + writefln(fmt, args); + } +}