Implement a generic polling based directory watcher.

This is used on platforms that don't have a specialized watcher implementation.
This commit is contained in:
Sönke Ludwig 2017-11-20 22:25:19 +01:00
parent 815db0727c
commit 99088e1202
2 changed files with 238 additions and 16 deletions

View file

@ -48,9 +48,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
//version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); //version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver; alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PosixEventDriverWatchers!Loop; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
Loop m_loop; Loop m_loop;
CoreDriver m_core; CoreDriver m_core;
@ -73,7 +73,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_core = new CoreDriver(m_loop, m_timers, m_events); m_core = new CoreDriver(m_loop, m_timers, m_events);
m_dns = new DNSDriver(m_events, m_signals); m_dns = new DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events); m_files = new FileDriver(m_events);
m_watchers = new WatcherDriver(m_loop); m_watchers = new WatcherDriver(m_events);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls

View file

@ -5,7 +5,7 @@ import eventcore.driver;
import eventcore.drivers.posix.driver; import eventcore.drivers.posix.driver;
final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers
{ {
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
@ -13,11 +13,12 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
import std.file; import std.file;
private { private {
alias Loop = typeof(Events.init.loop);
Loop m_loop; Loop m_loop;
string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map
} }
this(Loop loop) { m_loop = loop; } this(Events events) { m_loop = events.loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback)
{ {
@ -126,11 +127,11 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
} }
version (OSX) version (OSX)
final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private Loop m_loop; private Events m_events;
this(Loop loop) { m_loop = loop; } this(Events events) { m_events = events; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{ {
@ -155,25 +156,246 @@ final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatc
} }
} }
final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; } /** Generic directory watcher implementation based on periodic directory
scanning.
Note that this implementation, although it works on all operating systems,
is not efficient for directories with many files, since it has to keep a
representation of the whole directory in memory and needs to list all files
for each polling period, which can result in excessive hard disk activity.
*/
final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
import core.thread : Thread;
import core.sync.mutex : Mutex;
private {
Events m_events;
PollingThread[EventID] m_pollers;
}
this(Events events) { m_events = events; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{ {
assert(false, "TODO!"); import std.file : exists, isDir;
// validate base directory
try if (!isDir(path)) return WatcherID.invalid;
catch (Exception e) return WatcherID.invalid;
// create event to wait on for new changes
auto evt = m_events.create();
assert(evt !is EventID.invalid, "Failed to create event.");
auto pt = new PollingThread(() @trusted { return cast(shared)m_events; } (), evt, path, recursive, on_change);
m_pollers[evt] = pt;
try () @trusted { pt.isDaemon = true; } ();
catch (Exception e) assert(false, e.msg);
() @trusted { pt.start(); } ();
m_events.wait(evt, &onEvent);
return cast(WatcherID)evt;
} }
final override void addRef(WatcherID descriptor) final override void addRef(WatcherID descriptor)
{ {
assert(false, "TODO!"); assert(descriptor != WatcherID.invalid);
auto evt = cast(EventID)descriptor;
auto pt = evt in m_pollers;
assert(pt !is null);
m_events.addRef(evt);
} }
final override bool releaseRef(WatcherID descriptor) final override bool releaseRef(WatcherID descriptor)
{ {
assert(false, "TODO!"); assert(descriptor != WatcherID.invalid);
auto evt = cast(EventID)descriptor;
auto pt = evt in m_pollers;
assert(pt !is null);
if (!m_events.releaseRef(evt)) {
pt.dispose();
return false;
}
return true;
}
private void onEvent(EventID evt)
{
import std.algorithm.mutation : swap;
auto pt = evt in m_pollers;
if (!pt) return;
m_events.wait(evt, &onEvent);
FileChange[] changes;
try synchronized (pt.m_changesMutex)
swap(changes, pt.m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
foreach (ref ch; changes)
pt.m_callback(cast(WatcherID)evt, ch);
}
private final class PollingThread : Thread {
int refCount = 1;
EventID changesEvent;
private {
shared(Events) m_eventsDriver;
Mutex m_changesMutex;
/*shared*/ FileChange[] m_changes;
immutable string m_basePath;
immutable bool m_recursive;
immutable FileChangesCallback m_callback;
shared bool m_shutdown = false;
size_t m_entryCount;
struct Entry {
Entry* parent;
string name;
ulong size;
long lastChange;
string path()
{
import std.path : buildPath;
if (parent)
return buildPath(parent.path, name);
else return name;
}
bool isDir() const { return size == ulong.max; }
}
struct Key {
Entry* parent;
string name;
}
Entry*[Key] m_entries;
}
this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback)
@trusted nothrow {
import core.time : seconds;
m_changesMutex = new Mutex;
m_eventsDriver = event_driver;
changesEvent = event;
m_basePath = path;
m_recursive = recursive;
m_callback = callback;
scan(false);
try super(&run);
catch (Exception e) assert(false, e.msg);
}
void dispose()
nothrow {
import core.atomic : atomicStore;
try synchronized (m_changesMutex) {
changesEvent = EventID.invalid;
} catch (Exception e) assert(false, e.msg);
}
private void run()
nothrow @trusted {
import core.atomic : atomicLoad;
import core.time : msecs;
import std.algorithm.comparison : min;
try while (true) {
() @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } ();
try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) break;
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
scan(true);
try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) break;
if (m_changes.length)
m_eventsDriver.trigger(changesEvent, false);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
} catch (Throwable th) {
import core.stdc.stdio : fprintf, stderr;
import core.stdc.stdlib : abort;
fprintf(stderr, "Fatal error: %.*s\n", th.msg.length, th.msg.ptr);
abort();
}
}
private void addChange(FileChangeKind kind, Key key, bool is_dir)
{
try synchronized (m_changesMutex) {
m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
}
private void scan(bool generate_changes)
@trusted nothrow {
import std.algorithm.mutation : swap;
Entry*[Key] new_entries;
size_t ec = 0;
scan(null, generate_changes, new_entries, ec);
foreach (e; m_entries.byKeyValue) {
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
if (generate_changes)
addChange(FileChangeKind.removed, e.key, e.value.isDir);
}
delete e.value;
}
swap(m_entries, new_entries);
m_entryCount = ec;
}
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec)
@trusted nothrow {
import std.file : SpanMode, dirEntries;
import std.path : buildPath, baseName;
auto ppath = parent ? buildPath(m_basePath, parent.path) : m_basePath;
try foreach (de; dirEntries(ppath, SpanMode.shallow)) {
auto key = Key(parent, de.name.baseName);
auto modified_time = de.timeLastModified.stdTime;
if (auto pe = key in m_entries) {
if ((*pe).isDir) {
if (m_recursive)
scan(*pe, generate_changes, new_entries, ec);
} else {
if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
if (generate_changes)
addChange(FileChangeKind.modified, key, (*pe).isDir);
(*pe).size = de.size;
(*pe).lastChange = modified_time;
}
}
new_entries[key] = *pe;
ec++;
m_entries.remove(key);
} else {
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
new_entries[key] = e;
ec++;
if (generate_changes)
addChange(FileChangeKind.added, key, e.isDir);
if (de.isDir && m_recursive) scan(e, false, new_entries, ec);
}
} catch (Exception e) {} // will result in all children being flagged as removed
}
} }
} }