diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 6bd00e2..dd90f71 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -16,12 +16,12 @@ cancellation function returns, or afterwards. */ module eventcore.driver; -@safe: /*@nogc:*/ nothrow: import core.time : Duration; import std.process : StdProcessConfig = Config; import std.socket : Address; import std.stdint : intptr_t; +import std.typecons : Tuple; import std.variant : Algebraic; @@ -71,14 +71,13 @@ interface EventDriver { /** Provides generic event loop control. */ interface EventDriverCore { -@safe: /*@nogc:*/ nothrow: /** The number of pending callbacks. When this number drops to zero, the event loop can safely be quit. It is guaranteed that no callbacks will be made anymore, unless new callbacks get registered. */ - size_t waiterCount(); + size_t waiterCount() @safe nothrow; /** Runs the event loop to process a chunk of events. @@ -92,7 +91,7 @@ interface EventDriverCore { duration of `Duration.max`, if necessary, will wait indefinitely until an event arrives. */ - ExitReason processEvents(Duration timeout); + ExitReason processEvents(Duration timeout) @safe nothrow; /** Causes `processEvents` to return with `ExitReason.exited` as soon as possible. @@ -101,7 +100,7 @@ interface EventDriverCore { so that it returns immediately. If no call is in progress, the next call to `processEvents` will immediately return with `ExitReason.exited`. */ - void exit(); + void exit() @safe nothrow; /** Resets the exit flag. @@ -110,26 +109,28 @@ interface EventDriverCore { `processEvents`, the next call to `processEvents` will return with `ExitCode.exited` immediately. This function can be used to avoid this. */ - void clearExitFlag(); + void clearExitFlag() @safe nothrow; /** Executes a callback in the thread owning the driver. */ - void runInOwnerThread(ThreadCallback2 fun, intptr_t param1, intptr_t param2) shared; + void runInOwnerThread(ThreadCallbackGen fun, ref ThreadCallbackGenParams params) shared @safe nothrow; /// ditto - final void runInOwnerThread(ThreadCallback1 fun, intptr_t param1) - shared { - runInOwnerThread((p1, p2) { - auto f = () @trusted { return cast(ThreadCallback1)p2; } (); - f(p1); - }, param1, cast(intptr_t)fun); - } - /// ditto - final void runInOwnerThread(ThreadCallback0 fun) - shared { - runInOwnerThread((p1, p2) { - auto f = () @trusted { return cast(ThreadCallback0)p2; } (); - f(); - }, 0, cast(intptr_t)fun); + final void runInOwnerThread(ARGS...)(void function(ARGS) @safe nothrow fun, ARGS args) shared + if (ARGS.length != 1 || !is(ARGS[0] == ThreadCallbackGenParams)) + { + alias F = void function(ARGS) @safe nothrow; + alias T = Tuple!ARGS; + static assert(F.sizeof + T.sizeof <= ThreadCallbackGenParams.length, + "Parameters take up too much space."); + + ThreadCallbackGenParams params; + () @trusted { (cast(F[])params[0 .. F.sizeof])[0] = fun; } (); + (cast(T[])params[F.sizeof .. F.sizeof + T.sizeof])[0] = T(args); + runInOwnerThread((ref ThreadCallbackGenParams p) { + auto f = () @trusted { return cast(F[])p[0 .. F.sizeof]; } ()[0]; + auto pt = () @trusted { return cast(T[])p[F.sizeof .. F.sizeof + T.sizeof]; } (); + f(pt[0].expand); + }, params); } /// Low-level user data access. Use `getUserData` instead. @@ -141,7 +142,7 @@ interface EventDriverCore { */ deprecated("Use `EventDriverSockets.userData` instead.") @property final ref T userData(T, FD)(FD descriptor) - @trusted { + @trusted nothrow { import std.conv : emplace; static void init(void* ptr) { emplace(cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); } @@ -460,12 +461,11 @@ interface EventDriverFiles { /** Disallows any reads/writes and removes any exclusive locks. - Note that this function may not actually close the file handle. The - handle is only guaranteed to be closed one the reference count drops - to zero. However, the remaining effects of calling this function will - be similar to actually closing the file. + Note that the file handle may become invalid at any point after the + call to `close`, regardless of its current reference count. Any + operations on the handle will not have an effect. */ - void close(FileFD file); + void close(FileFD file, FileCloseCallback on_closed); ulong getSize(FileFD file); @@ -882,8 +882,12 @@ interface EventDriverPipes { void waitForData(PipeFD pipe, PipeIOCallback on_data_available); /** Immediately close the pipe. Future read or write operations may fail. + + Note that the file handle may become invalid at any point after the + call to `close`, regardless of its current reference count. Any + operations on the handle will not have an effect. */ - void close(PipeFD pipe); + void close(PipeFD file, PipeCloseCallback on_closed); /** Determines whether the given pipe handle is valid. @@ -954,6 +958,7 @@ final class RefAddress : Address { } } +@safe: /*@nogc:*/ nothrow: alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address); @@ -961,7 +966,9 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); +alias FileCloseCallback = void delegate(FileFD, CloseStatus); alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); +alias PipeCloseCallback = void delegate(PipeFD, CloseStatus); alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias TimerCallback = void delegate(TimerID); @@ -984,6 +991,12 @@ enum ExitReason { exited } +enum CloseStatus { + ok, + ioError, + invalidHandle, +} + enum ConnectStatus { connected, refused, @@ -1093,13 +1106,6 @@ struct FileChange { /// Name of the changed file const(char)[] name; - - /** Determines if the changed entity is a file or a directory. - - Note that depending on the platform this may not be accurate for - `FileChangeKind.removed`. - */ - bool isDirectory; } /** Describes a spawned process @@ -1157,10 +1163,9 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) { alias value this; } -alias ThreadCallback0 = void function() @safe nothrow; -alias ThreadCallback1 = void function(intptr_t param1) @safe nothrow; -alias ThreadCallback2 = void function(intptr_t param1, intptr_t param2) @safe nothrow; -alias ThreadCallback = ThreadCallback1; +alias ThreadCallbackGenParams = ubyte[8 * intptr_t.sizeof]; +alias ThreadCallbackGen = void function(ref ThreadCallbackGenParams param3) @safe nothrow; +deprecated alias ThreadCallback = void function(intptr_t param1) @safe nothrow; struct FD { mixin Handle!("fd", size_t, size_t.max); } struct SocketFD { mixin Handle!("socket", FD); } diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index c5e542e..20c04e1 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -160,7 +160,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime protected alias ExtraEventsCallback = bool delegate(long); - private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams); private { Loop m_loop; @@ -262,7 +262,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_exit = false; } - final override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) + final override void runInOwnerThread(ThreadCallbackGen del, + ref ThreadCallbackGenParams params) shared { auto m = atomicLoad(m_threadCallbackMutex); auto evt = atomicLoad(m_wakeupEvent); @@ -275,7 +276,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(ThreadCallbackEntry(del, param1, param2)); + .put(ThreadCallbackEntry(del, params)); } catch (Exception e) assert(false, e.msg); m_events.trigger(m_wakeupEvent, false); @@ -311,7 +312,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1], del[2]); + del[0](del[1]); } } } diff --git a/source/eventcore/drivers/posix/pipes.d b/source/eventcore/drivers/posix/pipes.d index 36a9e4c..af6710c 100644 --- a/source/eventcore/drivers/posix/pipes.d +++ b/source/eventcore/drivers/posix/pipes.d @@ -10,7 +10,7 @@ import std.algorithm : min, max; final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN; + import core.stdc.errno : errno, EAGAIN, EINTR; import core.sys.posix.unistd : close, read, write; import core.sys.posix.fcntl; import core.sys.posix.poll; @@ -304,12 +304,21 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { return false; } - final override void close(PipeFD pipe) + final override void close(PipeFD pipe, PipeCloseCallback on_closed) { - if (!isValid(pipe)) return; + if (!isValid(pipe)) { + on_closed(pipe, CloseStatus.invalidHandle); + return; + } - // TODO: Maybe actually close here instead of waiting for releaseRef? - close(cast(int)pipe); + int res; + do res = close(cast(int)pipe); + while (res != 0 && errno == EINTR); + m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!PipeSlot(pipe); + + if (on_closed) + on_closed(pipe, res == 0 ? CloseStatus.ok : CloseStatus.ioError); } override bool isValid(PipeFD handle) @@ -337,10 +346,7 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); if (--slot.common.refCount == 0) { - m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!PipeSlot(pipe); - - close(cast(int)pipe); + close(pipe, null); return false; } return true; @@ -350,7 +356,6 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @system { return m_loop.rawUserDataImpl(fd, size, initialize, destroy); } ->>>>>>> 568465d... Make the API robust against using invalid handles. Fixes #105. } final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @@ -387,8 +392,13 @@ final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { assert(false, "TODO!"); } - override void close(PipeFD pipe) + override void close(PipeFD pipe, PipeCloseCallback on_closed) { + if (!isValid(pipe)) { + on_closed(pipe, CloseStatus.invalidHandle); + return; + } + assert(false, "TODO!"); } diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 93f3593..0456939 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -206,7 +206,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); } - private static void onLocalProcessExit(intptr_t system_pid) + private static void onLocalProcessExit(int system_pid) { int exitCode; ProcessWaitCallback[] callbacks; @@ -214,12 +214,12 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces ProcessID pid; PosixEventDriverProcesses driver; - lockedProcessInfoPlain(cast(int)system_pid, (info) { + lockedProcessInfoPlain(system_pid, (info) { assert(info !is null); exitCode = info.exitCode; callbacks = info.callbacks; - pid = ProcessID(cast(int)system_pid, info.validationCounter); + pid = ProcessID(system_pid, info.validationCounter); info.callbacks = null; driver = info.driver; diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index ac2b753..30d7233 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -150,7 +150,6 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver ch.baseDirectory = m_watches[id].basePath; ch.directory = subdir; - ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.name = name; addRef(id); // assure that the id doesn't get invalidated until after the callback auto cb = m_loop.m_fds[id].watcher.callback; @@ -487,9 +486,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat @property size_t entryCount() const { return m_entryCount; } - private void addChange(FileChangeKind kind, Key key, bool is_dir) + private void addChange(FileChangeKind kind, Key key) { - m_onChange(FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name, is_dir)); + m_onChange(FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name)); } private void scan(bool generate_changes) @@ -506,12 +505,12 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat foreach (e; m_entries.byKeyValue) { if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (generate_changes) - addChange(FileChangeKind.removed, e.key, e.value.isDir); + addChange(FileChangeKind.removed, e.key); } } foreach (e; added) - addChange(FileChangeKind.added, Key(e.parent, e.name), e.isDir); + addChange(FileChangeKind.added, Key(e.parent, e.name)); swap(m_entries, new_entries); m_entryCount = ec; @@ -539,7 +538,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat } else { if ((*pe).size != de.size || (*pe).lastChange != modified_time) { if (generate_changes) - addChange(FileChangeKind.modified, key, (*pe).isDir); + addChange(FileChangeKind.modified, key); (*pe).size = de.size; (*pe).lastChange = modified_time; } diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 185c693..9ea9857 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -1,6 +1,7 @@ module eventcore.drivers.threadedfile; import eventcore.driver; +import eventcore.internal.ioworker; import eventcore.internal.utils; import core.atomic; import core.stdc.errno; @@ -101,7 +102,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil static struct FileInfo { IOInfo read; IOInfo write; - bool open = true; uint validationCounter; @@ -110,7 +110,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil ubyte[16*size_t.sizeof] userData; } - TaskPool m_fileThreadPool; + IOWorkerPool m_fileThreadPool; ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop SmallIntegerSet!size_t m_activeReads; SmallIntegerSet!size_t m_activeWrites; @@ -128,10 +128,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil void dispose() { - if (m_fileThreadPool) { - StaticTaskPool.releaseRef(); - m_fileThreadPool = null; - } + m_fileThreadPool = IOWorkerPool.init; if (m_readyEvent != EventID.invalid) { log("finishing file events"); @@ -179,15 +176,22 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil return FileFD(system_file_handle, vc + 1); } - void close(FileFD file) + void close(FileFD file, FileCloseCallback on_closed) { - if (!isValid(file)) return; + if (!isValid(file)) { + on_closed(file, CloseStatus.invalidHandle); + return; + } - // NOTE: The file descriptor itself must stay open until the reference - // count drops to zero, or this would result in dangling handles. - // In case of an exclusive file lock, the lock should be lifted - // here. - m_files[file].open = false; + // TODO: close may block and should be executed in a worker thread + int res; + do res = .close(cast(int)file.value); + while (res != 0 && errno == EINTR); + + m_files[file.value] = FileInfo.init; + + if (on_closed) + on_closed(file, res == 0 ? CloseStatus.ok : CloseStatus.ioError); } ulong getSize(FileFD file) @@ -232,11 +236,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil //assert(this.writable); auto f = () @trusted { return &m_files[file]; } (); - if (!f.open) { - on_write_finish(file, IOStatus.disconnected, 0); - return; - } - if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) assert(false, "Concurrent file writes are not allowed."); assert(f.write.callback is null, "Concurrent file writes are not allowed."); @@ -278,11 +277,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil auto f = () @trusted { return &m_files[file]; } (); - if (!f.open) { - on_read_finish(file, IOStatus.disconnected, 0); - return; - } - if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) assert(false, "Concurrent file reads are not allowed."); assert(f.read.callback is null, "Concurrent file reads are not allowed."); @@ -334,8 +328,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil auto f = () @trusted { return &m_files[descriptor]; } (); if (!--f.refCount) { - .close(cast(int)descriptor); - *f = FileInfo.init; + close(descriptor, null); assert(!m_activeReads.contains(descriptor.value)); assert(!m_activeWrites.contains(descriptor.value)); return false; @@ -435,9 +428,9 @@ log("ready event"); log("create file event"); m_readyEvent = m_events.create(); } - if (m_fileThreadPool is null) { + if (!m_fileThreadPool) { log("aquire thread pool"); - m_fileThreadPool = StaticTaskPool.addRef(); + m_fileThreadPool = acquireIOWorkerPool(); } } } @@ -456,63 +449,3 @@ private void log(ARGS...)(string fmt, ARGS args) writefln(fmt, args); } } - - -// Maintains a single thread pool shared by all driver instances (threads) -private struct StaticTaskPool { - import core.sync.mutex : Mutex; - import std.parallelism : TaskPool; - - private { - static shared Mutex m_mutex; - static __gshared TaskPool m_pool; - static __gshared int m_refCount = 0; - } - - shared static this() - { - m_mutex = new shared Mutex; - } - - static TaskPool addRef() - @trusted nothrow { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!m_refCount++) { - try { - m_pool = mallocT!TaskPool(4); - m_pool.isDaemon = true; - } catch (Exception e) { - assert(false, e.msg); - } - } - - return m_pool; - } - - static void releaseRef() - @trusted nothrow { - TaskPool fin_pool; - - { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - - if (!--m_refCount) { - fin_pool = m_pool; - m_pool = null; - } - } - - if (fin_pool) { - log("finishing thread pool"); - try { - fin_pool.finish(true); - freeT(fin_pool); - } catch (Exception e) { - //log("Failed to shut down file I/O thread pool."); - } - } - } -} diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index 4227082..26c3f45 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -16,7 +16,7 @@ import std.typecons : Tuple, tuple; final class WinAPIEventDriverCore : EventDriverCore { @safe: /*@nogc:*/ nothrow: - private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams); private { bool m_exit; @@ -141,7 +141,8 @@ final class WinAPIEventDriverCore : EventDriverCore { m_exit = false; } - override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) + override void runInOwnerThread(ThreadCallbackGen del, + ref ThreadCallbackGenParams params) shared { import core.atomic : atomicLoad; @@ -155,7 +156,7 @@ final class WinAPIEventDriverCore : EventDriverCore { try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(ThreadCallbackEntry(del, param1, param2)); + .put(ThreadCallbackEntry(del, params)); } catch (Exception e) assert(false, e.msg); () @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } (); @@ -279,8 +280,6 @@ final class WinAPIEventDriverCore : EventDriverCore { private void executeThreadCallbacks() { - import std.stdint : intptr_t; - while (true) { ThreadCallbackEntry del; try { @@ -289,7 +288,7 @@ final class WinAPIEventDriverCore : EventDriverCore { del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1], del[2]); + del[0](del[1]); } } } @@ -360,6 +359,7 @@ package struct FileSlot { } Direction!false read; Direction!true write; + FileCloseCallback closeCallback; } package struct WatcherSlot { diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index ab8c21c..0f35380 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -4,6 +4,7 @@ version (Windows): import eventcore.driver; import eventcore.drivers.winapi.core; +import eventcore.internal.ioworker; import eventcore.internal.win32; private extern(Windows) @trusted nothrow @nogc { @@ -14,6 +15,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { @safe /*@nogc*/ nothrow: private { WinAPIEventDriverCore m_core; + IOWorkerPool m_workerPool; } this(WinAPIEventDriverCore core) @@ -71,15 +73,41 @@ final class WinAPIEventDriverFiles : EventDriverFiles { return FileFD(cast(size_t)handle, m_core.m_handles[handle].validationCounter); } - override void close(FileFD file) + override void close(FileFD file, FileCloseCallback on_closed) { - if (!isValid(file)) return; + static void doCloseCleanup(CloseParams p) + { + p.core.removeWaiter(); + if (p.callback) p.callback(p.file, p.status); + } + + static void doClose(FileFD file, FileCloseCallback on_closed, + shared(WinAPIEventDriverCore) core) + { + CloseParams p; + CloseStatus st; + p.file = file; + p.callback = on_closed; + p.core = () @trusted { return cast(WinAPIEventDriverCore)core; } (); + if (CloseHandle(idToHandle(file))) p.status = CloseStatus.ok; + else p.status = CloseStatus.ioError; + + () @trusted { core.runInOwnerThread(&doCloseCleanup, p); } (); + } + + if (!isValid(file)) { + on_closed(file, CloseStatus.invalidHandle); + return; + } auto h = idToHandle(file); auto slot = () @trusted { return &m_core.m_handles[h]; } (); - if (slot.validationCounter != file.validationCounter) return; if (slot.file.read.overlapped.hEvent != INVALID_HANDLE_VALUE) slot.file.read.overlapped.hEvent = slot.file.write.overlapped.hEvent = INVALID_HANDLE_VALUE; + m_core.addWaiter(); + m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); + m_core.freeSlot(h); + workerPool.run!doClose(file, on_closed, () @trusted { return cast(shared)m_core; } ()); } override ulong getSize(FileFD file) @@ -213,11 +241,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { auto h = idToHandle(descriptor); auto slot = &m_core.m_handles[h]; - return slot.releaseRef({ - CloseHandle(h); - m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); - m_core.freeSlot(h); - }); + return slot.releaseRef({ close(descriptor, null); }); } protected override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @@ -291,8 +315,22 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } } + private @property ref IOWorkerPool workerPool() + { + if (!m_workerPool) + m_workerPool = acquireIOWorkerPool(); + return m_workerPool; + } + private static HANDLE idToHandle(FileFD id) @trusted @nogc { return cast(HANDLE)cast(size_t)id; } } + +private static struct CloseParams { + FileFD file; + FileCloseCallback callback; + CloseStatus status; + WinAPIEventDriverCore core; +} diff --git a/source/eventcore/drivers/winapi/pipes.d b/source/eventcore/drivers/winapi/pipes.d index 07fc43b..74bb6c5 100644 --- a/source/eventcore/drivers/winapi/pipes.d +++ b/source/eventcore/drivers/winapi/pipes.d @@ -53,9 +53,13 @@ final class WinAPIEventDriverPipes : EventDriverPipes { assert(false, "TODO!"); } - override void close(PipeFD pipe) + override void close(PipeFD pipe, PipeCloseCallback on_closed) { - if (!isValid(pipe)) return; + if (!isValid(pipe)) { + if (on_closed) + on_closed(pipe, CloseStatus.invalidHandle); + return; + } assert(false, "TODO!"); } diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index 679d15f..ec58b00 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -193,10 +193,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { ch.directory = dirName(path); if (ch.directory == ".") ch.directory = ""; ch.name = baseName(path); - try ch.isDirectory = isDir(fullpath); - catch (Exception e) {} // FIXME: can happen if the base path is relative and the CWD has changed - if (ch.kind != FileChangeKind.modified || !ch.isDirectory) - slot.callback(id, ch); + slot.callback(id, ch); if (fni.NextEntryOffset == 0 || !slot.callback) break; } diff --git a/source/eventcore/internal/ioworker.d b/source/eventcore/internal/ioworker.d new file mode 100644 index 0000000..48a4946 --- /dev/null +++ b/source/eventcore/internal/ioworker.d @@ -0,0 +1,97 @@ +/** Provides a shared task pool for distributing tasks to worker threads. +*/ +module eventcore.internal.ioworker; + +import eventcore.internal.utils; + +import std.parallelism : TaskPool, Task, task; + + +IOWorkerPool acquireIOWorkerPool() +@safe nothrow { + return IOWorkerPool(true); +} + +struct IOWorkerPool { + private { + TaskPool m_pool; + } + + @safe nothrow: + + private this(bool) { m_pool = StaticTaskPool.addRef(); } + ~this() { if (m_pool) StaticTaskPool.releaseRef(); } + this(this) { if (m_pool) StaticTaskPool.addRef(); } + + bool opCast(T)() const if (is(T == bool)) { return !!m_pool; } + + @property TaskPool pool() { return m_pool; } + + alias pool this; + + auto run(alias fun, ARGS...)(ARGS args) + { + auto t = task!(fun, ARGS)(args); + try m_pool.put(t); + catch (Exception e) assert(false, e.msg); + return t; + } +} + +// Maintains a single thread pool shared by all driver instances (threads) +private struct StaticTaskPool { + import core.sync.mutex : Mutex; + + private { + static shared Mutex m_mutex; + static __gshared TaskPool m_pool; + static __gshared int m_refCount = 0; + } + + shared static this() + { + m_mutex = new shared Mutex; + } + + static TaskPool addRef() + @trusted nothrow { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + if (!m_refCount++) { + try { + m_pool = mallocT!TaskPool(4); + m_pool.isDaemon = true; + } catch (Exception e) { + assert(false, e.msg); + } + } + + return m_pool; + } + + static void releaseRef() + @trusted nothrow { + TaskPool fin_pool; + + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + + if (!--m_refCount) { + fin_pool = m_pool; + m_pool = null; + } + } + + if (fin_pool) { + //log("finishing thread pool"); + try { + fin_pool.finish(true); + freeT(fin_pool); + } catch (Exception e) { + //log("Failed to shut down file I/O thread pool."); + } + } + } +} diff --git a/tests/0-dirwatcher-rec.d b/tests/0-dirwatcher-rec.d index 880a69c..a52eab2 100644 --- a/tests/0-dirwatcher-rec.d +++ b/tests/0-dirwatcher-rec.d @@ -8,7 +8,7 @@ import eventcore.core; import eventcore.internal.utils : print; import core.thread : Thread; import core.time : Duration, MonoTime, msecs; -import std.file : exists, remove, rename, rmdirRecurse, mkdir; +import std.file : exists, remove, rename, rmdirRecurse, mkdir, isDir; import std.format : format; import std.functional : toDelegate; import std.path : baseName, buildPath, dirName; @@ -97,6 +97,20 @@ void expectChange(FileChange ch, bool expect_change) assert(!expect_change, format("Got no change, expected %s.", ch)); return; } + + // ignore different directory modification notifications on Windows as + // opposed to the other systems + while (pendingChanges.length) { + auto pch = pendingChanges[0]; + if (pch.kind == FileChangeKind.modified) { + auto p = buildPath(pch.baseDirectory, pch.directory, pch.name); + if (!exists(p) || isDir(p)) { + pendingChanges = pendingChanges[1 .. $]; + continue; + } + } + break; + } } assert(expect_change, "Got change although none was expected."); @@ -119,43 +133,43 @@ void testFile(string name, bool expect_change = true) { print("test %s CREATE %s", name, expect_change); auto fil = File(buildPath(testDir, name), "wt"); - expectChange(fchange(FileChangeKind.added, name, false), expect_change); + expectChange(fchange(FileChangeKind.added, name), expect_change); print("test %s MODIFY %s", name, expect_change); fil.write("test"); fil.close(); - expectChange(fchange(FileChangeKind.modified, name, false), expect_change); + expectChange(fchange(FileChangeKind.modified, name), expect_change); print("test %s DELETE %s", name, expect_change); remove(buildPath(testDir, name)); - expectChange(fchange(FileChangeKind.removed, name, false), expect_change); + expectChange(fchange(FileChangeKind.removed, name), expect_change); } void testCreateDir(string name, bool expect_change = true) { print("test %s CREATEDIR %s", name, expect_change); mkdir(buildPath(testDir, name)); - expectChange(fchange(FileChangeKind.added, name, true), expect_change); + expectChange(fchange(FileChangeKind.added, name), expect_change); } void testRemoveDir(string name, bool expect_change = true) { print("test %s DELETEDIR %s", name, expect_change); rmdirRecurse(buildPath(testDir, name)); - expectChange(fchange(FileChangeKind.removed, name, true), expect_change); + expectChange(fchange(FileChangeKind.removed, name), expect_change); } void testRename(string from, string to, bool expect_change = true) { print("test %s RENAME %s %s", from, to, expect_change); rename(buildPath(testDir, from), buildPath(testDir, to)); - expectChange(fchange(FileChangeKind.removed, from, true), expect_change); - expectChange(fchange(FileChangeKind.added, to, true), expect_change); + expectChange(fchange(FileChangeKind.removed, from), expect_change); + expectChange(fchange(FileChangeKind.added, to), expect_change); } -FileChange fchange(FileChangeKind kind, string name, bool is_dir) +FileChange fchange(FileChangeKind kind, string name) { auto dn = dirName(name); if (dn == ".") dn = ""; - return FileChange(kind, testDir, dn, baseName(name), is_dir); + return FileChange(kind, testDir, dn, baseName(name)); } diff --git a/tests/0-dirwatcher.d b/tests/0-dirwatcher.d index a7ec09a..5cc21d3 100644 --- a/tests/0-dirwatcher.d +++ b/tests/0-dirwatcher.d @@ -5,8 +5,9 @@ module test; import eventcore.core; +import std.file : exists, isDir, mkdir, remove, rmdirRecurse; +import std.path : buildPath; import std.stdio : File, writefln; -import std.file : exists, mkdir, remove, rmdirRecurse; import core.time : Duration, msecs; bool s_done; @@ -23,6 +24,11 @@ void main() scope (exit) rmdirRecurse(testDir); auto id = eventDriver.watchers.watchDirectory(testDir, false, (id, ref change) { + try { + if (change.kind == FileChangeKind.modified && isDir(buildPath(change.baseDirectory, change.directory, change.name))) + return; + } catch (Exception e) assert(false, e.msg); + switch (s_cnt++) { default: import std.conv : to; diff --git a/tests/0-file.d b/tests/0-file.d index 58b33f5..3a20391 100644 --- a/tests/0-file.d +++ b/tests/0-file.d @@ -36,13 +36,15 @@ void main() assert(status == IOStatus.ok); assert(nbytes == data.length - 5); assert(dst == data); - eventDriver.files.close(f); - () @trusted { - scope (failure) assert(false); - remove("test.txt"); - } (); - eventDriver.files.releaseRef(f); - s_done = true; + eventDriver.files.close(f, (f, s) { + assert(s == CloseStatus.ok); + () @trusted { + scope (failure) assert(false); + remove("test.txt"); + } (); + eventDriver.files.releaseRef(f); + s_done = true; + }); }); }); }); diff --git a/tests/0-runinownerthread.d b/tests/0-runinownerthread.d index bb9208e..157a732 100644 --- a/tests/0-runinownerthread.d +++ b/tests/0-runinownerthread.d @@ -31,7 +31,7 @@ void main() void threadFunc(shared(NativeEventDriver) drv) { - drv.core.runInOwnerThread((id) { + drv.core.runInOwnerThread((int id) { s_id = id; }, 42); }