From 314bd2bb486adcf05ca9fe917fedcabff4b54492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 15:57:31 +0100 Subject: [PATCH 1/9] Fix WinAPI condition for outOfWaiters event loop exit reason. Calling processEvents could previously block indefinitely even if there were no waiters left. --- source/eventcore/drivers/winapi/core.d | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index a0b4179..ed89d7e 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -48,6 +48,8 @@ final class WinAPIEventDriverCore : EventDriverCore { return ExitReason.exited; } + if (!waiterCount) return ExitReason.outOfWaiters; + bool got_event; if (timeout <= 0.seconds) { @@ -109,7 +111,7 @@ final class WinAPIEventDriverCore : EventDriverCore { DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max_wait.total!"msecs", DWORD.max); auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); - + if (ret == WAIT_IO_COMPLETION) got_event = true; else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) { From fdeef38ef498bd8f4405d98b514c50abf334d7fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 15:58:24 +0100 Subject: [PATCH 2/9] Fix waiter count tracking for WinAPI directory watchers and avoid empty callbacks. --- source/eventcore/drivers/winapi/watchers.d | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index dcf60e0..1911cd7 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -31,7 +31,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, null); } (); - + if (handle == INVALID_HANDLE_VALUE) return WatcherID.invalid; @@ -45,12 +45,13 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { try return theAllocator.makeArray!ubyte(16384); catch (Exception e) assert(false, "Failed to allocate directory watcher buffer."); } (); - if (!triggerRead(handle, *slot)) { releaseRef(id); return WatcherID.invalid; } + m_core.addWaiter(); + return id; } @@ -63,6 +64,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { { auto handle = idToHandle(descriptor); return m_core.m_handles[handle].releaseRef(()nothrow{ + m_core.removeWaiter(); CloseHandle(handle); () @trusted { try theAllocator.dispose(m_core.m_handles[handle].watcher.buffer); @@ -93,11 +95,15 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { if (handle !in WinAPIEventDriver.threadInstance.core.m_handles) return; + // NOTE: can be 0 if the buffer overflowed + if (!cbTransferred) + return; + auto slot = () @trusted { return &WinAPIEventDriver.threadInstance.core.m_handles[handle].watcher(); } (); ubyte[] result = slot.buffer[0 .. cbTransferred]; do { - assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); + assert(result.length >= FILE_NOTIFY_INFORMATION._FileName.offsetof); auto fni = () @trusted { return cast(FILE_NOTIFY_INFORMATION*)result.ptr; } (); FileChange ch; switch (fni.Action) { From 33524234142ac605517ccc70296b4a26689b896a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 22:19:00 +0100 Subject: [PATCH 3/9] Fix decrementing the waiter count when destroying a WinAPI event with active waiters. --- source/eventcore/drivers/winapi/events.d | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/eventcore/drivers/winapi/events.d b/source/eventcore/drivers/winapi/events.d index 4500e29..23884e6 100644 --- a/source/eventcore/drivers/winapi/events.d +++ b/source/eventcore/drivers/winapi/events.d @@ -110,6 +110,10 @@ final class WinAPIEventDriverEvents : EventDriverEvents { auto pe = descriptor in m_events; assert(pe.refCount > 0); if (--pe.refCount == 0) { + // make sure to not leak any waiter references for pending waits + foreach (i; 0 .. pe.waiters.length) + m_core.removeWaiter(); + () @trusted nothrow { scope (failure) assert(false); destroy(pe.waiters); From 5bba45485cc6e4dab5de3a6024bfb7987760e151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 12:31:41 +0100 Subject: [PATCH 4/9] Add recursive directory watcher test. --- tests/0-dirwatcher-rec.d | 164 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 tests/0-dirwatcher-rec.d diff --git a/tests/0-dirwatcher-rec.d b/tests/0-dirwatcher-rec.d new file mode 100644 index 0000000..fda5698 --- /dev/null +++ b/tests/0-dirwatcher-rec.d @@ -0,0 +1,164 @@ +/++ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ +module test; + +import eventcore.core; +import eventcore.internal.utils : print; +import core.thread : Thread; +import core.time : Duration, msecs; +import std.file : exists, remove, rename, rmdirRecurse, mkdir; +import std.format : format; +import std.functional : toDelegate; +import std.path : baseName, buildPath, dirName; +import std.stdio : File, writefln; +import std.array : replace; + +bool s_done; +int s_cnt = 0; + +enum testDir = "watcher_test"; + +WatcherID watcher; +FileChange[] pendingChanges; + + +void main() +{ + version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test."); + else { + + if (exists(testDir)) + rmdirRecurse(testDir); + + mkdir(testDir); + mkdir(testDir~"/dira"); + + // test non-recursive watcher + watcher = eventDriver.watchers.watchDirectory(testDir, false, toDelegate(&testCallback)); + assert(watcher != WatcherID.invalid); + Thread.sleep(1000.msecs); // some watcher implementations need time to initialize + testFile( "file1.dat"); + testFile( "file2.dat"); + testFile( "dira/file1.dat", false); + testCreateDir("dirb"); + testFile( "dirb/file1.dat", false); + testRemoveDir("dirb"); + eventDriver.watchers.releaseRef(watcher); + testFile( "file1.dat", false); + testRemoveDir("dira", false); + testCreateDir("dira", false); + + // test recursive watcher + watcher = eventDriver.watchers.watchDirectory(testDir, true, toDelegate(&testCallback)); + assert(watcher != WatcherID.invalid); + Thread.sleep(100.msecs); // some watcher implementations need time to initialize + testFile( "file1.dat"); + testFile( "file2.dat"); + testFile( "dira/file1.dat"); + testCreateDir("dirb"); + testFile( "dirb/file1.dat"); + testRename( "dirb", "dirc"); + testFile( "dirc/file2.dat"); + eventDriver.watchers.releaseRef(watcher); + testFile( "file1.dat", false); + testFile( "dira/file1.dat", false); + testFile( "dirc/file1.dat", false); + testRemoveDir("dirc", false); + testRemoveDir("dira", false); + + rmdirRecurse(testDir); + + // make sure that no watchers are registered anymore + auto er = eventDriver.core.processEvents(10.msecs); + assert(er == ExitReason.outOfWaiters); + + } +} + +void testCallback(WatcherID w, in ref FileChange ch) +@safe nothrow { + assert(w == watcher, "Wrong watcher generated a change"); + pendingChanges ~= ch; +} + +void expectChange(FileChange ch, bool expect_change) +{ + import std.datetime : Clock, UTC; + + auto starttime = Clock.currTime(UTC()); + again: while (!pendingChanges.length) { + auto er = eventDriver.core.processEvents(10.msecs); + switch (er) { + default: assert(false, format("Unexpected event loop exit code: %s", er)); + case ExitReason.idle: break; + case ExitReason.timeout: + assert(!pendingChanges.length); + break; + case ExitReason.outOfWaiters: + assert(!expect_change, "No watcher left, but expected change."); + return; + } + if (!pendingChanges.length && Clock.currTime(UTC()) - starttime >= 2000.msecs) { + assert(!expect_change, format("Got no change, expected %s.", ch)); + return; + } + } + auto pch = pendingChanges[0]; + + // adjust for Windows behavior + pch.directory = pch.directory.replace("\\", "/"); + pch.name = pch.name.replace("\\", "/"); + pendingChanges = pendingChanges[1 .. $]; + if (pch.kind == FileChangeKind.modified && (pch.name == "dira" || pch.name == "dirb")) + goto again; + + // test all field excep the isDir one, which does not work on all systems + assert(pch.kind == ch.kind && pch.baseDirectory == ch.baseDirectory && + pch.directory == ch.directory && pch.name == ch.name, + format("Unexpected change: %s vs %s", pch, ch)); +} + +void testFile(string name, bool expect_change = true) +{ +print("test %s CREATE %s", name, expect_change); + auto fil = File(buildPath(testDir, name), "wt"); + expectChange(fchange(FileChangeKind.added, name, false), expect_change); + +print("test %s MODIFY %s", name, expect_change); + fil.write("test"); + fil.close(); + expectChange(fchange(FileChangeKind.modified, name, false), expect_change); + +print("test %s DELETE %s", name, expect_change); + remove(buildPath(testDir, name)); + expectChange(fchange(FileChangeKind.removed, name, false), expect_change); +} + +void testCreateDir(string name, bool expect_change = true) +{ +print("test %s CREATEDIR %s", name, expect_change); + mkdir(buildPath(testDir, name)); + expectChange(fchange(FileChangeKind.added, name, true), expect_change); +} + +void testRemoveDir(string name, bool expect_change = true) +{ +print("test %s DELETEDIR %s", name, expect_change); + rmdirRecurse(buildPath(testDir, name)); + expectChange(fchange(FileChangeKind.removed, name, true), expect_change); +} + +void testRename(string from, string to, bool expect_change = true) +{ +print("test %s RENAME %s %s", from, to, expect_change); + rename(buildPath(testDir, from), buildPath(testDir, to)); + expectChange(fchange(FileChangeKind.removed, from, true), expect_change); + expectChange(fchange(FileChangeKind.added, to, true), expect_change); +} + +FileChange fchange(FileChangeKind kind, string name, bool is_dir) +{ + return FileChange(kind, testDir, dirName(name), baseName(name), is_dir); +} From 815db0727c6374a2fcfb6e2fb6795b1e4ed953a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 22:23:30 +0100 Subject: [PATCH 5/9] Change all invalid handle values to ~0 and improve FileChange. FileChange now has the full path of a file split into the base path (as specified when creating the watcher), the sub directory, and the file name. This allows to work with less dynamic memory allocations internally. --- source/eventcore/driver.d | 25 ++++++++++++++-------- source/eventcore/drivers/winapi/watchers.d | 8 ++++--- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 8b4f07c..a411bd1 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -652,14 +652,21 @@ struct FileChange { /// The type of change FileChangeKind kind; - /// Directory containing the changed file - string directory; + /// The root directory of the watcher + string baseDirectory; - /// Determines if the changed entity is a file or a directory. - bool isDirectory; + /// Subdirectory containing the changed file + string directory; /// Name of the changed file const(char)[] name; + + /** Determines if the changed entity is a file or a directory. + + Note that depending on the platform this may not be accurate for + `FileChangeKind.removed`. + */ + bool isDirectory; } struct Handle(string NAME, T, T invalid_value = T.init) { @@ -695,8 +702,8 @@ alias StreamListenSocketFD = Handle!("streamListen", SocketFD); alias DatagramSocketFD = Handle!("datagramSocket", SocketFD); alias FileFD = Handle!("file", FD); alias EventID = Handle!("event", FD); -alias TimerID = Handle!("timer", size_t); -alias WatcherID = Handle!("watcher", size_t); -alias EventWaitID = Handle!("eventWait", size_t); -alias SignalListenID = Handle!("signal", size_t); -alias DNSLookupID = Handle!("dns", size_t); +alias TimerID = Handle!("timer", size_t, size_t.max); +alias WatcherID = Handle!("watcher", size_t, size_t.max); +alias EventWaitID = Handle!("eventWait", size_t, size_t.max); +alias SignalListenID = Handle!("signal", size_t, size_t.max); +alias DNSLookupID = Handle!("dns", size_t, size_t.max); diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index 1911cd7..f7fc0a4 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -78,6 +78,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) { import std.conv : to; + import std.path : dirName, baseName; if (dwError != 0) { // FIXME: this must be propagated to the caller @@ -114,9 +115,10 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { case 0x4: ch.kind = FileChangeKind.removed; break; case 0x5: ch.kind = FileChangeKind.added; break; } - ch.directory = slot.directory; - ch.isDirectory = false; // FIXME: is this right? - ch.name = () @trusted { scope (failure) assert(false); return to!string(fni.FileName[0 .. fni.FileNameLength/2]); } (); + ch.baseDirectory = slot.directory; + auto path = () @trusted { scope (failure) assert(false); return to!string(fni.FileName[0 .. fni.FileNameLength/2]); } (); + ch.directory = dirName(path); + ch.name = baseName(path); slot.callback(id, ch); if (fni.NextEntryOffset == 0) break; result = result[fni.NextEntryOffset .. $]; From 99088e1202d18130220d8337a1c3e11f8d7e17aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 22:25:19 +0100 Subject: [PATCH 6/9] Implement a generic polling based directory watcher. This is used on platforms that don't have a specialized watcher implementation. --- source/eventcore/drivers/posix/driver.d | 8 +- source/eventcore/drivers/posix/watchers.d | 246 ++++++++++++++++++++-- 2 files changed, 238 insertions(+), 16 deletions(-) diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index b523d67..2e8f58f 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -48,9 +48,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { //version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); alias FileDriver = ThreadedFileEventDriver!EventsDriver; - version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop; - else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop; - else alias WatcherDriver = PosixEventDriverWatchers!Loop; + version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; + //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; + else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; Loop m_loop; CoreDriver m_core; @@ -73,7 +73,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_core = new CoreDriver(m_loop, m_timers, m_events); m_dns = new DNSDriver(m_events, m_signals); m_files = new FileDriver(m_events); - m_watchers = new WatcherDriver(m_loop); + m_watchers = new WatcherDriver(m_events); } // force overriding these in the (final) sub classes to avoid virtual calls diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 385294c..6dddaf1 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -5,7 +5,7 @@ import eventcore.driver; import eventcore.drivers.posix.driver; -final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers +final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; @@ -13,11 +13,12 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch import std.file; private { + alias Loop = typeof(Events.init.loop); Loop m_loop; string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map } - this(Loop loop) { m_loop = loop; } + this(Events events) { m_loop = events.loop; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback) { @@ -126,11 +127,11 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch } version (OSX) -final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { +final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { @safe: /*@nogc:*/ nothrow: - private Loop m_loop; + private Events m_events; - this(Loop loop) { m_loop = loop; } + this(Events events) { m_events = events; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { @@ -155,25 +156,246 @@ final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatc } } -final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { -@safe: /*@nogc:*/ nothrow: - private Loop m_loop; - this(Loop loop) { m_loop = loop; } +/** Generic directory watcher implementation based on periodic directory + scanning. + + Note that this implementation, although it works on all operating systems, + is not efficient for directories with many files, since it has to keep a + representation of the whole directory in memory and needs to list all files + for each polling period, which can result in excessive hard disk activity. +*/ +final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { +@safe: /*@nogc:*/ nothrow: + import core.thread : Thread; + import core.sync.mutex : Mutex; + + private { + Events m_events; + PollingThread[EventID] m_pollers; + } + + this(Events events) { m_events = events; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { - assert(false, "TODO!"); + import std.file : exists, isDir; + + // validate base directory + try if (!isDir(path)) return WatcherID.invalid; + catch (Exception e) return WatcherID.invalid; + + // create event to wait on for new changes + auto evt = m_events.create(); + assert(evt !is EventID.invalid, "Failed to create event."); + auto pt = new PollingThread(() @trusted { return cast(shared)m_events; } (), evt, path, recursive, on_change); + m_pollers[evt] = pt; + try () @trusted { pt.isDaemon = true; } (); + catch (Exception e) assert(false, e.msg); + () @trusted { pt.start(); } (); + + m_events.wait(evt, &onEvent); + + return cast(WatcherID)evt; } final override void addRef(WatcherID descriptor) { - assert(false, "TODO!"); + assert(descriptor != WatcherID.invalid); + auto evt = cast(EventID)descriptor; + auto pt = evt in m_pollers; + assert(pt !is null); + m_events.addRef(evt); } final override bool releaseRef(WatcherID descriptor) { - assert(false, "TODO!"); + assert(descriptor != WatcherID.invalid); + auto evt = cast(EventID)descriptor; + auto pt = evt in m_pollers; + assert(pt !is null); + if (!m_events.releaseRef(evt)) { + pt.dispose(); + return false; + } + return true; + } + + private void onEvent(EventID evt) + { + import std.algorithm.mutation : swap; + + auto pt = evt in m_pollers; + if (!pt) return; + + m_events.wait(evt, &onEvent); + + FileChange[] changes; + try synchronized (pt.m_changesMutex) + swap(changes, pt.m_changes); + catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); + + foreach (ref ch; changes) + pt.m_callback(cast(WatcherID)evt, ch); + } + + private final class PollingThread : Thread { + int refCount = 1; + EventID changesEvent; + + private { + shared(Events) m_eventsDriver; + Mutex m_changesMutex; + /*shared*/ FileChange[] m_changes; + immutable string m_basePath; + immutable bool m_recursive; + immutable FileChangesCallback m_callback; + shared bool m_shutdown = false; + size_t m_entryCount; + + struct Entry { + Entry* parent; + string name; + ulong size; + long lastChange; + + string path() + { + import std.path : buildPath; + if (parent) + return buildPath(parent.path, name); + else return name; + } + + bool isDir() const { return size == ulong.max; } + } + + struct Key { + Entry* parent; + string name; + } + + Entry*[Key] m_entries; + } + + this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) + @trusted nothrow { + import core.time : seconds; + + m_changesMutex = new Mutex; + m_eventsDriver = event_driver; + changesEvent = event; + m_basePath = path; + m_recursive = recursive; + m_callback = callback; + scan(false); + + try super(&run); + catch (Exception e) assert(false, e.msg); + } + + void dispose() + nothrow { + import core.atomic : atomicStore; + + try synchronized (m_changesMutex) { + changesEvent = EventID.invalid; + } catch (Exception e) assert(false, e.msg); + } + + private void run() + nothrow @trusted { + import core.atomic : atomicLoad; + import core.time : msecs; + import std.algorithm.comparison : min; + + try while (true) { + () @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } (); + + try synchronized (m_changesMutex) { + if (changesEvent == EventID.invalid) break; + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + + scan(true); + + try synchronized (m_changesMutex) { + if (changesEvent == EventID.invalid) break; + if (m_changes.length) + m_eventsDriver.trigger(changesEvent, false); + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + } catch (Throwable th) { + import core.stdc.stdio : fprintf, stderr; + import core.stdc.stdlib : abort; + + fprintf(stderr, "Fatal error: %.*s\n", th.msg.length, th.msg.ptr); + abort(); + } + } + + private void addChange(FileChangeKind kind, Key key, bool is_dir) + { + try synchronized (m_changesMutex) { + m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir); + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + } + + private void scan(bool generate_changes) + @trusted nothrow { + import std.algorithm.mutation : swap; + + Entry*[Key] new_entries; + size_t ec = 0; + + scan(null, generate_changes, new_entries, ec); + + foreach (e; m_entries.byKeyValue) { + if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { + if (generate_changes) + addChange(FileChangeKind.removed, e.key, e.value.isDir); + } + delete e.value; + } + + swap(m_entries, new_entries); + m_entryCount = ec; + } + + private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec) + @trusted nothrow { + import std.file : SpanMode, dirEntries; + import std.path : buildPath, baseName; + + auto ppath = parent ? buildPath(m_basePath, parent.path) : m_basePath; + try foreach (de; dirEntries(ppath, SpanMode.shallow)) { + auto key = Key(parent, de.name.baseName); + auto modified_time = de.timeLastModified.stdTime; + if (auto pe = key in m_entries) { + if ((*pe).isDir) { + if (m_recursive) + scan(*pe, generate_changes, new_entries, ec); + } else { + if ((*pe).size != de.size || (*pe).lastChange != modified_time) { + if (generate_changes) + addChange(FileChangeKind.modified, key, (*pe).isDir); + (*pe).size = de.size; + (*pe).lastChange = modified_time; + } + } + + new_entries[key] = *pe; + ec++; + m_entries.remove(key); + } else { + auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); + new_entries[key] = e; + ec++; + if (generate_changes) + addChange(FileChangeKind.added, key, e.isDir); + + if (de.isDir && m_recursive) scan(e, false, new_entries, ec); + } + } catch (Exception e) {} // will result in all children being flagged as removed + } } } From 0ec498207d28b283d516ccd34495cd71b16cae1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 21 Nov 2017 10:38:11 +0100 Subject: [PATCH 7/9] Gag directory modified events in win32. --- source/eventcore/drivers/winapi/watchers.d | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/source/eventcore/drivers/winapi/watchers.d b/source/eventcore/drivers/winapi/watchers.d index f7fc0a4..2eff613 100644 --- a/source/eventcore/drivers/winapi/watchers.d +++ b/source/eventcore/drivers/winapi/watchers.d @@ -78,7 +78,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) { import std.conv : to; - import std.path : dirName, baseName; + import std.file : isDir; + import std.path : dirName, baseName, buildPath; if (dwError != 0) { // FIXME: this must be propagated to the caller @@ -106,6 +107,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { do { assert(result.length >= FILE_NOTIFY_INFORMATION._FileName.offsetof); auto fni = () @trusted { return cast(FILE_NOTIFY_INFORMATION*)result.ptr; } (); + result = result[fni.NextEntryOffset .. $]; + FileChange ch; switch (fni.Action) { default: ch.kind = FileChangeKind.modified; break; @@ -115,13 +118,17 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers { case 0x4: ch.kind = FileChangeKind.removed; break; case 0x5: ch.kind = FileChangeKind.added; break; } + ch.baseDirectory = slot.directory; auto path = () @trusted { scope (failure) assert(false); return to!string(fni.FileName[0 .. fni.FileNameLength/2]); } (); + auto fullpath = buildPath(slot.directory, path); ch.directory = dirName(path); ch.name = baseName(path); - slot.callback(id, ch); + try ch.isDirectory = isDir(fullpath); + catch (Exception e) {} // FIXME: can happen if the base path is relative and the CWD has changed + if (ch.kind != FileChangeKind.modified || !ch.isDirectory) + slot.callback(id, ch); if (fni.NextEntryOffset == 0) break; - result = result[fni.NextEntryOffset .. $]; } while (result.length > 0); triggerRead(handle, *slot); From 5246593432fa559a1bd5526a26e477e400d63dcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 20 Nov 2017 23:59:52 +0100 Subject: [PATCH 8/9] Fix handling recursive inotify based directory watchers. --- source/eventcore/drivers/posix/watchers.d | 61 +++++++++++++++++++---- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 6dddaf1..ad6c777 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -15,25 +15,42 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver private { alias Loop = typeof(Events.init.loop); Loop m_loop; - string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map + + struct WatchState { + string[int] watcherPaths; + string basePath; + bool recursive; + } + + WatchState[WatcherID] m_watches; // TODO: use a @nogc (allocator based) map } this(Events events) { m_loop = events.loop; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback) { + import std.path : buildPath, pathSplitter; + import std.range : drop; + import std.range.primitives : walkLength; + enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect auto handle = () @trusted { return inotify_init1(IN_NONBLOCK); } (); if (handle == -1) return WatcherID.invalid; auto ret = WatcherID(handle); - addWatch(ret, path); + m_watches[ret] = WatchState(null, path, recursive); + + addWatch(ret, path, "."); if (recursive) { try { + auto base_segements = path.pathSplitter.walkLength; if (path.isDir) () @trusted { - foreach (de; path.dirEntries(SpanMode.shallow)) - if (de.isDir) addWatch(ret, de.name); + foreach (de; path.dirEntries(SpanMode.depth)) + if (de.isDir) { + auto subdir = de.name.pathSplitter.drop(base_segements).buildPath; + addWatch(ret, path, subdir); + } } (); } catch (Exception e) { // TODO: decide if this should be ignored or if the error should be forwarded @@ -78,6 +95,7 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver private void processEvents(WatcherID id) { + import std.path : buildPath, dirName; import core.stdc.stdio : FILENAME_MAX; import core.stdc.string : strlen; @@ -89,9 +107,16 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver break; assert(ret <= buf.length); + auto w = m_watches[id]; + auto rem = buf[0 .. ret]; while (rem.length > 0) { auto ev = () @trusted { return cast(inotify_event*)rem.ptr; } (); + rem = rem[inotify_event.sizeof + ev.len .. $]; + + // is the watch already deleted? + if (ev.mask & IN_IGNORED) continue; + FileChange ch; if (ev.mask & (IN_CREATE|IN_MOVED_TO)) ch.kind = FileChangeKind.added; @@ -100,28 +125,44 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver else if (ev.mask & IN_MODIFY) ch.kind = FileChangeKind.modified; + if (ev.mask & IN_DELETE_SELF) { + () @trusted { inotify_rm_watch(cast(int)id, ev.wd); } (); + w.watcherPaths.remove(ev.wd); + continue; + } else if (ev.mask & IN_MOVE_SELF) { + // NOTE: the should have been updated by a previous IN_MOVED_TO + continue; + } + auto name = () @trusted { return ev.name.ptr[0 .. strlen(ev.name.ptr)]; } (); - ch.directory = m_watches[id][ev.wd]; + auto subdir = w.watcherPaths[ev.wd]; + + if (w.recursive && ev.mask & (IN_CREATE|IN_MOVED_TO) && ev.mask & IN_ISDIR) { + addWatch(id, w.basePath, subdir == "." ? name.idup : buildPath(subdir, name)); + } + + ch.baseDirectory = m_watches[id].basePath; + ch.directory = subdir; ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.name = name; addRef(id); // assure that the id doesn't get invalidated until after the callback auto cb = m_loop.m_fds[id].watcher.callback; cb(id, ch); if (!releaseRef(id)) return; - - rem = rem[inotify_event.sizeof + ev.len .. $]; } } } - private bool addWatch(WatcherID handle, string path) + private bool addWatch(WatcherID handle, string base_path, string path) { + import std.path : buildPath; import std.string : toStringz; + enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO; - immutable wd = () @trusted { return inotify_add_watch(cast(int)handle, path.toStringz, EVENTS); } (); + immutable wd = () @trusted { return inotify_add_watch(cast(int)handle, buildPath(base_path, path).toStringz, EVENTS); } (); if (wd == -1) return false; - m_watches[handle][wd] = path; + m_watches[handle].watcherPaths[wd] = path; return true; } } From c3d49db8c5880cc31e0f4aa5bcf0ef0d7da66708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 21 Nov 2017 00:02:33 +0100 Subject: [PATCH 9/9] Update README. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 58e1cdf..ad307e6 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ Events | yes | yes | yes | ye Unix Signals | yes² | yes | — | — Files | yes | yes | yes | yes UI Integration | yes¹ | yes¹ | yes | yes¹ -File watcher | yes² | yes | yes | — +File watcher | yes² | yes | yes | yes² Feature | LibasyncEventDriver -----------------|--------------------- @@ -69,7 +69,7 @@ File watcher | — ¹ Manually, by adopting the X11 display connection socket -² Currently only supported on Linux +² Systems other than Linux use a polling implementation ### Open questions