From 99088e1202d18130220d8337a1c3e11f8d7e17aa Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= <sludwig@rejectedsoftware.com>
Date: Mon, 20 Nov 2017 22:25:19 +0100
Subject: [PATCH] Implement a generic polling based directory watcher.

This is used on platforms that don't have a specialized watcher implementation.
---
 source/eventcore/drivers/posix/driver.d   |   8 +-
 source/eventcore/drivers/posix/watchers.d | 246 ++++++++++++++++++++--
 2 files changed, 238 insertions(+), 16 deletions(-)

diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d
index b523d67..2e8f58f 100644
--- a/source/eventcore/drivers/posix/driver.d
+++ b/source/eventcore/drivers/posix/driver.d
@@ -48,9 +48,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
 		//version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
 		else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
 		alias FileDriver = ThreadedFileEventDriver!EventsDriver;
-		version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop;
-		else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop;
-		else alias WatcherDriver = PosixEventDriverWatchers!Loop;
+		version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
+		//else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
+		else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
 
 		Loop m_loop;
 		CoreDriver m_core;
@@ -73,7 +73,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
 		m_core = new CoreDriver(m_loop, m_timers, m_events);
 		m_dns = new DNSDriver(m_events, m_signals);
 		m_files = new FileDriver(m_events);
-		m_watchers = new WatcherDriver(m_loop);
+		m_watchers = new WatcherDriver(m_events);
 	}
 
 	// force overriding these in the (final) sub classes to avoid virtual calls
diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d
index 385294c..6dddaf1 100644
--- a/source/eventcore/drivers/posix/watchers.d
+++ b/source/eventcore/drivers/posix/watchers.d
@@ -5,7 +5,7 @@ import eventcore.driver;
 import eventcore.drivers.posix.driver;
 
 
-final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers
+final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers
 {
 	import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
 	import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
@@ -13,11 +13,12 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
 	import std.file;
 
 	private {
+		alias Loop = typeof(Events.init.loop);
 		Loop m_loop;
 		string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map
 	}
 
-	this(Loop loop) { m_loop = loop; }
+	this(Events events) { m_loop = events.loop; }
 
 	final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback)
 	{
@@ -126,11 +127,11 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
 }
 
 version (OSX)
-final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
+final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
 @safe: /*@nogc:*/ nothrow:
-	private Loop m_loop;
+	private Events m_events;
 
-	this(Loop loop) { m_loop = loop; }
+	this(Events events) { m_events = events; }
 
 	final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
 	{
@@ -155,25 +156,246 @@ final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatc
 	}
 }
 
-final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
-@safe: /*@nogc:*/ nothrow:
-	private Loop m_loop;
 
-	this(Loop loop) { m_loop = loop; }
+/** Generic directory watcher implementation based on periodic directory
+	scanning.
+
+	Note that this implementation, although it works on all operating systems,
+	is not efficient for directories with many files, since it has to keep a
+	representation of the whole directory in memory and needs to list all files
+	for each polling period, which can result in excessive hard disk activity.
+*/
+final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
+@safe: /*@nogc:*/ nothrow:
+	import core.thread : Thread;
+	import core.sync.mutex : Mutex;
+
+	private {
+		Events m_events;
+		PollingThread[EventID] m_pollers;
+	}
+
+	this(Events events) { m_events = events; }
 
 	final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
 	{
-		assert(false, "TODO!");
+		import std.file : exists, isDir;
+
+		// validate base directory
+		try if (!isDir(path)) return WatcherID.invalid;
+		catch (Exception e) return WatcherID.invalid;
+
+		// create event to wait on for new changes
+		auto evt = m_events.create();
+		assert(evt !is EventID.invalid, "Failed to create event.");
+		auto pt = new PollingThread(() @trusted { return cast(shared)m_events; } (), evt, path, recursive, on_change);
+		m_pollers[evt] = pt;
+		try () @trusted { pt.isDaemon = true; } ();
+		catch (Exception e) assert(false, e.msg);
+		() @trusted { pt.start(); } ();
+
+		m_events.wait(evt, &onEvent);
+
+		return cast(WatcherID)evt;
 	}
 
 	final override void addRef(WatcherID descriptor)
 	{
-		assert(false, "TODO!");
+		assert(descriptor != WatcherID.invalid);
+		auto evt = cast(EventID)descriptor;
+		auto pt = evt in m_pollers;
+		assert(pt !is null);
+		m_events.addRef(evt);
 	}
 
 	final override bool releaseRef(WatcherID descriptor)
 	{
-		assert(false, "TODO!");
+		assert(descriptor != WatcherID.invalid);
+		auto evt = cast(EventID)descriptor;
+		auto pt = evt in m_pollers;
+		assert(pt !is null);
+		if (!m_events.releaseRef(evt)) {
+			pt.dispose();
+			return false;
+		}
+		return true;
+	}
+
+	private void onEvent(EventID evt)
+	{
+		import std.algorithm.mutation : swap;
+
+		auto pt = evt in m_pollers;
+		if (!pt) return;
+
+		m_events.wait(evt, &onEvent);
+
+		FileChange[] changes;
+		try synchronized (pt.m_changesMutex)
+			swap(changes, pt.m_changes);
+		catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
+
+		foreach (ref ch; changes)
+			pt.m_callback(cast(WatcherID)evt, ch);
+	}
+
+	private final class PollingThread : Thread {
+		int refCount = 1;
+		EventID changesEvent;
+
+		private {
+			shared(Events) m_eventsDriver;
+			Mutex m_changesMutex;
+			/*shared*/ FileChange[] m_changes;
+			immutable string m_basePath;
+			immutable bool m_recursive;
+			immutable FileChangesCallback m_callback;
+			shared bool m_shutdown = false;
+			size_t m_entryCount;
+
+			struct Entry {
+				Entry* parent;
+				string name;
+				ulong size;
+				long lastChange;
+
+				string path()
+				{
+					import std.path : buildPath;
+					if (parent)
+						return buildPath(parent.path, name);
+					else return name;
+				}
+
+				bool isDir() const { return size == ulong.max; }
+			}
+
+			struct Key {
+				Entry* parent;
+				string name;
+			}
+
+			Entry*[Key] m_entries;
+		}
+
+		this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback)
+		@trusted nothrow {
+			import core.time : seconds;
+
+			m_changesMutex = new Mutex;
+			m_eventsDriver = event_driver;
+			changesEvent = event;
+			m_basePath = path;
+			m_recursive = recursive;
+			m_callback = callback;
+			scan(false);
+
+			try super(&run);
+			catch (Exception e) assert(false, e.msg);
+		}
+
+		void dispose()
+		nothrow {
+			import core.atomic : atomicStore;
+
+			try synchronized (m_changesMutex) {
+				changesEvent = EventID.invalid;
+			} catch (Exception e) assert(false, e.msg);
+		}
+
+		private void run()
+		nothrow @trusted {
+			import core.atomic : atomicLoad;
+			import core.time : msecs;
+			import std.algorithm.comparison : min;
+
+			try while (true) {
+				() @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } ();
+
+				try synchronized (m_changesMutex) {
+					if (changesEvent == EventID.invalid) break;
+				} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
+
+				scan(true);
+
+				try synchronized (m_changesMutex) {
+					if (changesEvent == EventID.invalid) break;
+					if (m_changes.length)
+						m_eventsDriver.trigger(changesEvent, false);
+				} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
+			} catch (Throwable th) {
+				import core.stdc.stdio : fprintf, stderr;
+				import core.stdc.stdlib : abort;
+
+				fprintf(stderr, "Fatal error: %.*s\n", th.msg.length, th.msg.ptr);
+				abort();
+			}
+		}
+
+		private void addChange(FileChangeKind kind, Key key, bool is_dir)
+		{
+			try synchronized (m_changesMutex) {
+				m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir);
+			} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
+		}
+
+		private void scan(bool generate_changes)
+		@trusted nothrow {
+			import std.algorithm.mutation : swap;
+
+			Entry*[Key] new_entries;
+			size_t ec = 0;
+
+			scan(null, generate_changes, new_entries, ec);
+
+			foreach (e; m_entries.byKeyValue) {
+				if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
+					if (generate_changes)
+						addChange(FileChangeKind.removed, e.key, e.value.isDir);
+				}
+				delete e.value;
+			}
+
+			swap(m_entries, new_entries);
+			m_entryCount = ec;
+		}
+
+		private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec)
+		@trusted nothrow {
+			import std.file : SpanMode, dirEntries;
+			import std.path : buildPath, baseName;
+
+			auto ppath = parent ? buildPath(m_basePath, parent.path) : m_basePath;
+			try foreach (de; dirEntries(ppath, SpanMode.shallow)) {
+				auto key = Key(parent, de.name.baseName);
+				auto modified_time = de.timeLastModified.stdTime;
+				if (auto pe = key in m_entries) {
+					if ((*pe).isDir) {
+						if (m_recursive)
+							scan(*pe, generate_changes, new_entries, ec);
+					} else {
+						if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
+							if (generate_changes)
+								addChange(FileChangeKind.modified, key, (*pe).isDir);
+							(*pe).size = de.size;
+							(*pe).lastChange = modified_time;
+						}
+					}
+
+					new_entries[key] = *pe;
+					ec++;
+					m_entries.remove(key);
+				} else {
+					auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
+					new_entries[key] = e;
+					ec++;
+					if (generate_changes)
+						addChange(FileChangeKind.added, key, e.isDir);
+
+					if (de.isDir && m_recursive) scan(e, false, new_entries, ec);
+				}
+			} catch (Exception e) {} // will result in all children being flagged as removed
+		}
 	}
 }