diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index b523d67..2e8f58f 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -48,9 +48,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { //version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); alias FileDriver = ThreadedFileEventDriver!EventsDriver; - version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop; - else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop; - else alias WatcherDriver = PosixEventDriverWatchers!Loop; + version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; + //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; + else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; Loop m_loop; CoreDriver m_core; @@ -73,7 +73,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_core = new CoreDriver(m_loop, m_timers, m_events); m_dns = new DNSDriver(m_events, m_signals); m_files = new FileDriver(m_events); - m_watchers = new WatcherDriver(m_loop); + m_watchers = new WatcherDriver(m_events); } // force overriding these in the (final) sub classes to avoid virtual calls diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 385294c..6dddaf1 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -5,7 +5,7 @@ import eventcore.driver; import eventcore.drivers.posix.driver; -final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers +final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; @@ -13,11 +13,12 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch import std.file; private { + alias Loop = typeof(Events.init.loop); Loop m_loop; string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map } - this(Loop loop) { m_loop = loop; } + this(Events events) { m_loop = events.loop; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback) { @@ -126,11 +127,11 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch } version (OSX) -final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { +final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { @safe: /*@nogc:*/ nothrow: - private Loop m_loop; + private Events m_events; - this(Loop loop) { m_loop = loop; } + this(Events events) { m_events = events; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { @@ -155,25 +156,246 @@ final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatc } } -final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { -@safe: /*@nogc:*/ nothrow: - private Loop m_loop; - this(Loop loop) { m_loop = loop; } +/** Generic directory watcher implementation based on periodic directory + scanning. + + Note that this implementation, although it works on all operating systems, + is not efficient for directories with many files, since it has to keep a + representation of the whole directory in memory and needs to list all files + for each polling period, which can result in excessive hard disk activity. +*/ +final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers { +@safe: /*@nogc:*/ nothrow: + import core.thread : Thread; + import core.sync.mutex : Mutex; + + private { + Events m_events; + PollingThread[EventID] m_pollers; + } + + this(Events events) { m_events = events; } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { - assert(false, "TODO!"); + import std.file : exists, isDir; + + // validate base directory + try if (!isDir(path)) return WatcherID.invalid; + catch (Exception e) return WatcherID.invalid; + + // create event to wait on for new changes + auto evt = m_events.create(); + assert(evt !is EventID.invalid, "Failed to create event."); + auto pt = new PollingThread(() @trusted { return cast(shared)m_events; } (), evt, path, recursive, on_change); + m_pollers[evt] = pt; + try () @trusted { pt.isDaemon = true; } (); + catch (Exception e) assert(false, e.msg); + () @trusted { pt.start(); } (); + + m_events.wait(evt, &onEvent); + + return cast(WatcherID)evt; } final override void addRef(WatcherID descriptor) { - assert(false, "TODO!"); + assert(descriptor != WatcherID.invalid); + auto evt = cast(EventID)descriptor; + auto pt = evt in m_pollers; + assert(pt !is null); + m_events.addRef(evt); } final override bool releaseRef(WatcherID descriptor) { - assert(false, "TODO!"); + assert(descriptor != WatcherID.invalid); + auto evt = cast(EventID)descriptor; + auto pt = evt in m_pollers; + assert(pt !is null); + if (!m_events.releaseRef(evt)) { + pt.dispose(); + return false; + } + return true; + } + + private void onEvent(EventID evt) + { + import std.algorithm.mutation : swap; + + auto pt = evt in m_pollers; + if (!pt) return; + + m_events.wait(evt, &onEvent); + + FileChange[] changes; + try synchronized (pt.m_changesMutex) + swap(changes, pt.m_changes); + catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); + + foreach (ref ch; changes) + pt.m_callback(cast(WatcherID)evt, ch); + } + + private final class PollingThread : Thread { + int refCount = 1; + EventID changesEvent; + + private { + shared(Events) m_eventsDriver; + Mutex m_changesMutex; + /*shared*/ FileChange[] m_changes; + immutable string m_basePath; + immutable bool m_recursive; + immutable FileChangesCallback m_callback; + shared bool m_shutdown = false; + size_t m_entryCount; + + struct Entry { + Entry* parent; + string name; + ulong size; + long lastChange; + + string path() + { + import std.path : buildPath; + if (parent) + return buildPath(parent.path, name); + else return name; + } + + bool isDir() const { return size == ulong.max; } + } + + struct Key { + Entry* parent; + string name; + } + + Entry*[Key] m_entries; + } + + this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) + @trusted nothrow { + import core.time : seconds; + + m_changesMutex = new Mutex; + m_eventsDriver = event_driver; + changesEvent = event; + m_basePath = path; + m_recursive = recursive; + m_callback = callback; + scan(false); + + try super(&run); + catch (Exception e) assert(false, e.msg); + } + + void dispose() + nothrow { + import core.atomic : atomicStore; + + try synchronized (m_changesMutex) { + changesEvent = EventID.invalid; + } catch (Exception e) assert(false, e.msg); + } + + private void run() + nothrow @trusted { + import core.atomic : atomicLoad; + import core.time : msecs; + import std.algorithm.comparison : min; + + try while (true) { + () @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } (); + + try synchronized (m_changesMutex) { + if (changesEvent == EventID.invalid) break; + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + + scan(true); + + try synchronized (m_changesMutex) { + if (changesEvent == EventID.invalid) break; + if (m_changes.length) + m_eventsDriver.trigger(changesEvent, false); + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + } catch (Throwable th) { + import core.stdc.stdio : fprintf, stderr; + import core.stdc.stdlib : abort; + + fprintf(stderr, "Fatal error: %.*s\n", th.msg.length, th.msg.ptr); + abort(); + } + } + + private void addChange(FileChangeKind kind, Key key, bool is_dir) + { + try synchronized (m_changesMutex) { + m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir); + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + } + + private void scan(bool generate_changes) + @trusted nothrow { + import std.algorithm.mutation : swap; + + Entry*[Key] new_entries; + size_t ec = 0; + + scan(null, generate_changes, new_entries, ec); + + foreach (e; m_entries.byKeyValue) { + if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { + if (generate_changes) + addChange(FileChangeKind.removed, e.key, e.value.isDir); + } + delete e.value; + } + + swap(m_entries, new_entries); + m_entryCount = ec; + } + + private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec) + @trusted nothrow { + import std.file : SpanMode, dirEntries; + import std.path : buildPath, baseName; + + auto ppath = parent ? buildPath(m_basePath, parent.path) : m_basePath; + try foreach (de; dirEntries(ppath, SpanMode.shallow)) { + auto key = Key(parent, de.name.baseName); + auto modified_time = de.timeLastModified.stdTime; + if (auto pe = key in m_entries) { + if ((*pe).isDir) { + if (m_recursive) + scan(*pe, generate_changes, new_entries, ec); + } else { + if ((*pe).size != de.size || (*pe).lastChange != modified_time) { + if (generate_changes) + addChange(FileChangeKind.modified, key, (*pe).isDir); + (*pe).size = de.size; + (*pe).lastChange = modified_time; + } + } + + new_entries[key] = *pe; + ec++; + m_entries.remove(key); + } else { + auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); + new_entries[key] = e; + ec++; + if (generate_changes) + addChange(FileChangeKind.added, key, e.isDir); + + if (de.isDir && m_recursive) scan(e, false, new_entries, ec); + } + } catch (Exception e) {} // will result in all children being flagged as removed + } } }