Merge pull request #32 from vibe-d/recursive_dirwatch

Improve directory watching
This commit is contained in:
l-kramer 2017-11-21 21:26:39 +01:00 committed by GitHub
commit faf4bbcdc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 501 additions and 46 deletions

View file

@ -52,7 +52,7 @@ Events | yes | yes | yes | ye
Unix Signals | yes² | yes | — | — Unix Signals | yes² | yes | — | —
Files | yes | yes | yes | yes Files | yes | yes | yes | yes
UI Integration | yes¹ | yes¹ | yes | yes¹ UI Integration | yes¹ | yes¹ | yes | yes¹
File watcher | yes² | yes | yes | — File watcher | yes² | yes | yes | yes²
Feature | LibasyncEventDriver Feature | LibasyncEventDriver
-----------------|--------------------- -----------------|---------------------
@ -69,7 +69,7 @@ File watcher | —
¹ Manually, by adopting the X11 display connection socket ¹ Manually, by adopting the X11 display connection socket
² Currently only supported on Linux ² Systems other than Linux use a polling implementation
### Open questions ### Open questions

View file

@ -652,14 +652,21 @@ struct FileChange {
/// The type of change /// The type of change
FileChangeKind kind; FileChangeKind kind;
/// Directory containing the changed file /// The root directory of the watcher
string directory; string baseDirectory;
/// Determines if the changed entity is a file or a directory. /// Subdirectory containing the changed file
bool isDirectory; string directory;
/// Name of the changed file /// Name of the changed file
const(char)[] name; const(char)[] name;
/** Determines if the changed entity is a file or a directory.
Note that depending on the platform this may not be accurate for
`FileChangeKind.removed`.
*/
bool isDirectory;
} }
struct Handle(string NAME, T, T invalid_value = T.init) { struct Handle(string NAME, T, T invalid_value = T.init) {
@ -695,8 +702,8 @@ alias StreamListenSocketFD = Handle!("streamListen", SocketFD);
alias DatagramSocketFD = Handle!("datagramSocket", SocketFD); alias DatagramSocketFD = Handle!("datagramSocket", SocketFD);
alias FileFD = Handle!("file", FD); alias FileFD = Handle!("file", FD);
alias EventID = Handle!("event", FD); alias EventID = Handle!("event", FD);
alias TimerID = Handle!("timer", size_t); alias TimerID = Handle!("timer", size_t, size_t.max);
alias WatcherID = Handle!("watcher", size_t); alias WatcherID = Handle!("watcher", size_t, size_t.max);
alias EventWaitID = Handle!("eventWait", size_t); alias EventWaitID = Handle!("eventWait", size_t, size_t.max);
alias SignalListenID = Handle!("signal", size_t); alias SignalListenID = Handle!("signal", size_t, size_t.max);
alias DNSLookupID = Handle!("dns", size_t); alias DNSLookupID = Handle!("dns", size_t, size_t.max);

View file

@ -48,9 +48,9 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
//version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); //version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver; alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PosixEventDriverWatchers!Loop; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
Loop m_loop; Loop m_loop;
CoreDriver m_core; CoreDriver m_core;
@ -73,7 +73,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_core = new CoreDriver(m_loop, m_timers, m_events); m_core = new CoreDriver(m_loop, m_timers, m_events);
m_dns = new DNSDriver(m_events, m_signals); m_dns = new DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events); m_files = new FileDriver(m_events);
m_watchers = new WatcherDriver(m_loop); m_watchers = new WatcherDriver(m_events);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls

View file

@ -5,7 +5,7 @@ import eventcore.driver;
import eventcore.drivers.posix.driver; import eventcore.drivers.posix.driver;
final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers
{ {
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
@ -13,26 +13,44 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
import std.file; import std.file;
private { private {
alias Loop = typeof(Events.init.loop);
Loop m_loop; Loop m_loop;
string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map
struct WatchState {
string[int] watcherPaths;
string basePath;
bool recursive;
}
WatchState[WatcherID] m_watches; // TODO: use a @nogc (allocator based) map
} }
this(Loop loop) { m_loop = loop; } this(Events events) { m_loop = events.loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback)
{ {
import std.path : buildPath, pathSplitter;
import std.range : drop;
import std.range.primitives : walkLength;
enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
auto handle = () @trusted { return inotify_init1(IN_NONBLOCK); } (); auto handle = () @trusted { return inotify_init1(IN_NONBLOCK); } ();
if (handle == -1) return WatcherID.invalid; if (handle == -1) return WatcherID.invalid;
auto ret = WatcherID(handle); auto ret = WatcherID(handle);
addWatch(ret, path); m_watches[ret] = WatchState(null, path, recursive);
addWatch(ret, path, ".");
if (recursive) { if (recursive) {
try { try {
auto base_segements = path.pathSplitter.walkLength;
if (path.isDir) () @trusted { if (path.isDir) () @trusted {
foreach (de; path.dirEntries(SpanMode.shallow)) foreach (de; path.dirEntries(SpanMode.depth))
if (de.isDir) addWatch(ret, de.name); if (de.isDir) {
auto subdir = de.name.pathSplitter.drop(base_segements).buildPath;
addWatch(ret, path, subdir);
}
} (); } ();
} catch (Exception e) { } catch (Exception e) {
// TODO: decide if this should be ignored or if the error should be forwarded // TODO: decide if this should be ignored or if the error should be forwarded
@ -77,6 +95,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
private void processEvents(WatcherID id) private void processEvents(WatcherID id)
{ {
import std.path : buildPath, dirName;
import core.stdc.stdio : FILENAME_MAX; import core.stdc.stdio : FILENAME_MAX;
import core.stdc.string : strlen; import core.stdc.string : strlen;
@ -88,9 +107,16 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
break; break;
assert(ret <= buf.length); assert(ret <= buf.length);
auto w = m_watches[id];
auto rem = buf[0 .. ret]; auto rem = buf[0 .. ret];
while (rem.length > 0) { while (rem.length > 0) {
auto ev = () @trusted { return cast(inotify_event*)rem.ptr; } (); auto ev = () @trusted { return cast(inotify_event*)rem.ptr; } ();
rem = rem[inotify_event.sizeof + ev.len .. $];
// is the watch already deleted?
if (ev.mask & IN_IGNORED) continue;
FileChange ch; FileChange ch;
if (ev.mask & (IN_CREATE|IN_MOVED_TO)) if (ev.mask & (IN_CREATE|IN_MOVED_TO))
ch.kind = FileChangeKind.added; ch.kind = FileChangeKind.added;
@ -99,38 +125,54 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
else if (ev.mask & IN_MODIFY) else if (ev.mask & IN_MODIFY)
ch.kind = FileChangeKind.modified; ch.kind = FileChangeKind.modified;
if (ev.mask & IN_DELETE_SELF) {
() @trusted { inotify_rm_watch(cast(int)id, ev.wd); } ();
w.watcherPaths.remove(ev.wd);
continue;
} else if (ev.mask & IN_MOVE_SELF) {
// NOTE: the should have been updated by a previous IN_MOVED_TO
continue;
}
auto name = () @trusted { return ev.name.ptr[0 .. strlen(ev.name.ptr)]; } (); auto name = () @trusted { return ev.name.ptr[0 .. strlen(ev.name.ptr)]; } ();
ch.directory = m_watches[id][ev.wd]; auto subdir = w.watcherPaths[ev.wd];
if (w.recursive && ev.mask & (IN_CREATE|IN_MOVED_TO) && ev.mask & IN_ISDIR) {
addWatch(id, w.basePath, subdir == "." ? name.idup : buildPath(subdir, name));
}
ch.baseDirectory = m_watches[id].basePath;
ch.directory = subdir;
ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
ch.name = name; ch.name = name;
addRef(id); // assure that the id doesn't get invalidated until after the callback addRef(id); // assure that the id doesn't get invalidated until after the callback
auto cb = m_loop.m_fds[id].watcher.callback; auto cb = m_loop.m_fds[id].watcher.callback;
cb(id, ch); cb(id, ch);
if (!releaseRef(id)) return; if (!releaseRef(id)) return;
rem = rem[inotify_event.sizeof + ev.len .. $];
} }
} }
} }
private bool addWatch(WatcherID handle, string path) private bool addWatch(WatcherID handle, string base_path, string path)
{ {
import std.path : buildPath;
import std.string : toStringz; import std.string : toStringz;
enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY |
IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO; IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO;
immutable wd = () @trusted { return inotify_add_watch(cast(int)handle, path.toStringz, EVENTS); } (); immutable wd = () @trusted { return inotify_add_watch(cast(int)handle, buildPath(base_path, path).toStringz, EVENTS); } ();
if (wd == -1) return false; if (wd == -1) return false;
m_watches[handle][wd] = path; m_watches[handle].watcherPaths[wd] = path;
return true; return true;
} }
} }
version (OSX) version (OSX)
final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private Loop m_loop; private Events m_events;
this(Loop loop) { m_loop = loop; } this(Events events) { m_events = events; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{ {
@ -155,25 +197,246 @@ final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatc
} }
} }
final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; } /** Generic directory watcher implementation based on periodic directory
scanning.
Note that this implementation, although it works on all operating systems,
is not efficient for directories with many files, since it has to keep a
representation of the whole directory in memory and needs to list all files
for each polling period, which can result in excessive hard disk activity.
*/
final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
import core.thread : Thread;
import core.sync.mutex : Mutex;
private {
Events m_events;
PollingThread[EventID] m_pollers;
}
this(Events events) { m_events = events; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{ {
assert(false, "TODO!"); import std.file : exists, isDir;
// validate base directory
try if (!isDir(path)) return WatcherID.invalid;
catch (Exception e) return WatcherID.invalid;
// create event to wait on for new changes
auto evt = m_events.create();
assert(evt !is EventID.invalid, "Failed to create event.");
auto pt = new PollingThread(() @trusted { return cast(shared)m_events; } (), evt, path, recursive, on_change);
m_pollers[evt] = pt;
try () @trusted { pt.isDaemon = true; } ();
catch (Exception e) assert(false, e.msg);
() @trusted { pt.start(); } ();
m_events.wait(evt, &onEvent);
return cast(WatcherID)evt;
} }
final override void addRef(WatcherID descriptor) final override void addRef(WatcherID descriptor)
{ {
assert(false, "TODO!"); assert(descriptor != WatcherID.invalid);
auto evt = cast(EventID)descriptor;
auto pt = evt in m_pollers;
assert(pt !is null);
m_events.addRef(evt);
} }
final override bool releaseRef(WatcherID descriptor) final override bool releaseRef(WatcherID descriptor)
{ {
assert(false, "TODO!"); assert(descriptor != WatcherID.invalid);
auto evt = cast(EventID)descriptor;
auto pt = evt in m_pollers;
assert(pt !is null);
if (!m_events.releaseRef(evt)) {
pt.dispose();
return false;
}
return true;
}
private void onEvent(EventID evt)
{
import std.algorithm.mutation : swap;
auto pt = evt in m_pollers;
if (!pt) return;
m_events.wait(evt, &onEvent);
FileChange[] changes;
try synchronized (pt.m_changesMutex)
swap(changes, pt.m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
foreach (ref ch; changes)
pt.m_callback(cast(WatcherID)evt, ch);
}
private final class PollingThread : Thread {
int refCount = 1;
EventID changesEvent;
private {
shared(Events) m_eventsDriver;
Mutex m_changesMutex;
/*shared*/ FileChange[] m_changes;
immutable string m_basePath;
immutable bool m_recursive;
immutable FileChangesCallback m_callback;
shared bool m_shutdown = false;
size_t m_entryCount;
struct Entry {
Entry* parent;
string name;
ulong size;
long lastChange;
string path()
{
import std.path : buildPath;
if (parent)
return buildPath(parent.path, name);
else return name;
}
bool isDir() const { return size == ulong.max; }
}
struct Key {
Entry* parent;
string name;
}
Entry*[Key] m_entries;
}
this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback)
@trusted nothrow {
import core.time : seconds;
m_changesMutex = new Mutex;
m_eventsDriver = event_driver;
changesEvent = event;
m_basePath = path;
m_recursive = recursive;
m_callback = callback;
scan(false);
try super(&run);
catch (Exception e) assert(false, e.msg);
}
void dispose()
nothrow {
import core.atomic : atomicStore;
try synchronized (m_changesMutex) {
changesEvent = EventID.invalid;
} catch (Exception e) assert(false, e.msg);
}
private void run()
nothrow @trusted {
import core.atomic : atomicLoad;
import core.time : msecs;
import std.algorithm.comparison : min;
try while (true) {
() @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } ();
try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) break;
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
scan(true);
try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) break;
if (m_changes.length)
m_eventsDriver.trigger(changesEvent, false);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
} catch (Throwable th) {
import core.stdc.stdio : fprintf, stderr;
import core.stdc.stdlib : abort;
fprintf(stderr, "Fatal error: %.*s\n", th.msg.length, th.msg.ptr);
abort();
}
}
private void addChange(FileChangeKind kind, Key key, bool is_dir)
{
try synchronized (m_changesMutex) {
m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
}
private void scan(bool generate_changes)
@trusted nothrow {
import std.algorithm.mutation : swap;
Entry*[Key] new_entries;
size_t ec = 0;
scan(null, generate_changes, new_entries, ec);
foreach (e; m_entries.byKeyValue) {
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
if (generate_changes)
addChange(FileChangeKind.removed, e.key, e.value.isDir);
}
delete e.value;
}
swap(m_entries, new_entries);
m_entryCount = ec;
}
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec)
@trusted nothrow {
import std.file : SpanMode, dirEntries;
import std.path : buildPath, baseName;
auto ppath = parent ? buildPath(m_basePath, parent.path) : m_basePath;
try foreach (de; dirEntries(ppath, SpanMode.shallow)) {
auto key = Key(parent, de.name.baseName);
auto modified_time = de.timeLastModified.stdTime;
if (auto pe = key in m_entries) {
if ((*pe).isDir) {
if (m_recursive)
scan(*pe, generate_changes, new_entries, ec);
} else {
if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
if (generate_changes)
addChange(FileChangeKind.modified, key, (*pe).isDir);
(*pe).size = de.size;
(*pe).lastChange = modified_time;
}
}
new_entries[key] = *pe;
ec++;
m_entries.remove(key);
} else {
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
new_entries[key] = e;
ec++;
if (generate_changes)
addChange(FileChangeKind.added, key, e.isDir);
if (de.isDir && m_recursive) scan(e, false, new_entries, ec);
}
} catch (Exception e) {} // will result in all children being flagged as removed
}
} }
} }

View file

@ -48,6 +48,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
return ExitReason.exited; return ExitReason.exited;
} }
if (!waiterCount) return ExitReason.outOfWaiters;
bool got_event; bool got_event;
if (timeout <= 0.seconds) { if (timeout <= 0.seconds) {
@ -109,7 +111,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max_wait.total!"msecs", DWORD.max); DWORD timeout_msecs = max_wait == Duration.max ? INFINITE : cast(DWORD)min(max_wait.total!"msecs", DWORD.max);
auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, auto ret = () @trusted { return MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr,
timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } (); timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); } ();
if (ret == WAIT_IO_COMPLETION) got_event = true; if (ret == WAIT_IO_COMPLETION) got_event = true;
else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) { else if (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + m_registeredEvents.length) {
if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) { if (auto pc = m_registeredEvents[ret - WAIT_OBJECT_0] in m_eventCallbacks) {

View file

@ -110,6 +110,10 @@ final class WinAPIEventDriverEvents : EventDriverEvents {
auto pe = descriptor in m_events; auto pe = descriptor in m_events;
assert(pe.refCount > 0); assert(pe.refCount > 0);
if (--pe.refCount == 0) { if (--pe.refCount == 0) {
// make sure to not leak any waiter references for pending waits
foreach (i; 0 .. pe.waiters.length)
m_core.removeWaiter();
() @trusted nothrow { () @trusted nothrow {
scope (failure) assert(false); scope (failure) assert(false);
destroy(pe.waiters); destroy(pe.waiters);

View file

@ -31,7 +31,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
null); null);
} (); } ();
if (handle == INVALID_HANDLE_VALUE) if (handle == INVALID_HANDLE_VALUE)
return WatcherID.invalid; return WatcherID.invalid;
@ -45,12 +45,13 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
try return theAllocator.makeArray!ubyte(16384); try return theAllocator.makeArray!ubyte(16384);
catch (Exception e) assert(false, "Failed to allocate directory watcher buffer."); catch (Exception e) assert(false, "Failed to allocate directory watcher buffer.");
} (); } ();
if (!triggerRead(handle, *slot)) { if (!triggerRead(handle, *slot)) {
releaseRef(id); releaseRef(id);
return WatcherID.invalid; return WatcherID.invalid;
} }
m_core.addWaiter();
return id; return id;
} }
@ -63,6 +64,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
{ {
auto handle = idToHandle(descriptor); auto handle = idToHandle(descriptor);
return m_core.m_handles[handle].releaseRef(()nothrow{ return m_core.m_handles[handle].releaseRef(()nothrow{
m_core.removeWaiter();
CloseHandle(handle); CloseHandle(handle);
() @trusted { () @trusted {
try theAllocator.dispose(m_core.m_handles[handle].watcher.buffer); try theAllocator.dispose(m_core.m_handles[handle].watcher.buffer);
@ -76,6 +78,8 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
{ {
import std.conv : to; import std.conv : to;
import std.file : isDir;
import std.path : dirName, baseName, buildPath;
if (dwError != 0) { if (dwError != 0) {
// FIXME: this must be propagated to the caller // FIXME: this must be propagated to the caller
@ -93,12 +97,18 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
if (handle !in WinAPIEventDriver.threadInstance.core.m_handles) if (handle !in WinAPIEventDriver.threadInstance.core.m_handles)
return; return;
// NOTE: can be 0 if the buffer overflowed
if (!cbTransferred)
return;
auto slot = () @trusted { return &WinAPIEventDriver.threadInstance.core.m_handles[handle].watcher(); } (); auto slot = () @trusted { return &WinAPIEventDriver.threadInstance.core.m_handles[handle].watcher(); } ();
ubyte[] result = slot.buffer[0 .. cbTransferred]; ubyte[] result = slot.buffer[0 .. cbTransferred];
do { do {
assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); assert(result.length >= FILE_NOTIFY_INFORMATION._FileName.offsetof);
auto fni = () @trusted { return cast(FILE_NOTIFY_INFORMATION*)result.ptr; } (); auto fni = () @trusted { return cast(FILE_NOTIFY_INFORMATION*)result.ptr; } ();
result = result[fni.NextEntryOffset .. $];
FileChange ch; FileChange ch;
switch (fni.Action) { switch (fni.Action) {
default: ch.kind = FileChangeKind.modified; break; default: ch.kind = FileChangeKind.modified; break;
@ -108,12 +118,17 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
case 0x4: ch.kind = FileChangeKind.removed; break; case 0x4: ch.kind = FileChangeKind.removed; break;
case 0x5: ch.kind = FileChangeKind.added; break; case 0x5: ch.kind = FileChangeKind.added; break;
} }
ch.directory = slot.directory;
ch.isDirectory = false; // FIXME: is this right? ch.baseDirectory = slot.directory;
ch.name = () @trusted { scope (failure) assert(false); return to!string(fni.FileName[0 .. fni.FileNameLength/2]); } (); auto path = () @trusted { scope (failure) assert(false); return to!string(fni.FileName[0 .. fni.FileNameLength/2]); } ();
slot.callback(id, ch); auto fullpath = buildPath(slot.directory, path);
ch.directory = dirName(path);
ch.name = baseName(path);
try ch.isDirectory = isDir(fullpath);
catch (Exception e) {} // FIXME: can happen if the base path is relative and the CWD has changed
if (ch.kind != FileChangeKind.modified || !ch.isDirectory)
slot.callback(id, ch);
if (fni.NextEntryOffset == 0) break; if (fni.NextEntryOffset == 0) break;
result = result[fni.NextEntryOffset .. $];
} while (result.length > 0); } while (result.length > 0);
triggerRead(handle, *slot); triggerRead(handle, *slot);

164
tests/0-dirwatcher-rec.d Normal file
View file

@ -0,0 +1,164 @@
/++ dub.sdl:
name "test"
dependency "eventcore" path=".."
+/
module test;
import eventcore.core;
import eventcore.internal.utils : print;
import core.thread : Thread;
import core.time : Duration, msecs;
import std.file : exists, remove, rename, rmdirRecurse, mkdir;
import std.format : format;
import std.functional : toDelegate;
import std.path : baseName, buildPath, dirName;
import std.stdio : File, writefln;
import std.array : replace;
bool s_done;
int s_cnt = 0;
enum testDir = "watcher_test";
WatcherID watcher;
FileChange[] pendingChanges;
void main()
{
version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test.");
else {
if (exists(testDir))
rmdirRecurse(testDir);
mkdir(testDir);
mkdir(testDir~"/dira");
// test non-recursive watcher
watcher = eventDriver.watchers.watchDirectory(testDir, false, toDelegate(&testCallback));
assert(watcher != WatcherID.invalid);
Thread.sleep(1000.msecs); // some watcher implementations need time to initialize
testFile( "file1.dat");
testFile( "file2.dat");
testFile( "dira/file1.dat", false);
testCreateDir("dirb");
testFile( "dirb/file1.dat", false);
testRemoveDir("dirb");
eventDriver.watchers.releaseRef(watcher);
testFile( "file1.dat", false);
testRemoveDir("dira", false);
testCreateDir("dira", false);
// test recursive watcher
watcher = eventDriver.watchers.watchDirectory(testDir, true, toDelegate(&testCallback));
assert(watcher != WatcherID.invalid);
Thread.sleep(100.msecs); // some watcher implementations need time to initialize
testFile( "file1.dat");
testFile( "file2.dat");
testFile( "dira/file1.dat");
testCreateDir("dirb");
testFile( "dirb/file1.dat");
testRename( "dirb", "dirc");
testFile( "dirc/file2.dat");
eventDriver.watchers.releaseRef(watcher);
testFile( "file1.dat", false);
testFile( "dira/file1.dat", false);
testFile( "dirc/file1.dat", false);
testRemoveDir("dirc", false);
testRemoveDir("dira", false);
rmdirRecurse(testDir);
// make sure that no watchers are registered anymore
auto er = eventDriver.core.processEvents(10.msecs);
assert(er == ExitReason.outOfWaiters);
}
}
void testCallback(WatcherID w, in ref FileChange ch)
@safe nothrow {
assert(w == watcher, "Wrong watcher generated a change");
pendingChanges ~= ch;
}
void expectChange(FileChange ch, bool expect_change)
{
import std.datetime : Clock, UTC;
auto starttime = Clock.currTime(UTC());
again: while (!pendingChanges.length) {
auto er = eventDriver.core.processEvents(10.msecs);
switch (er) {
default: assert(false, format("Unexpected event loop exit code: %s", er));
case ExitReason.idle: break;
case ExitReason.timeout:
assert(!pendingChanges.length);
break;
case ExitReason.outOfWaiters:
assert(!expect_change, "No watcher left, but expected change.");
return;
}
if (!pendingChanges.length && Clock.currTime(UTC()) - starttime >= 2000.msecs) {
assert(!expect_change, format("Got no change, expected %s.", ch));
return;
}
}
auto pch = pendingChanges[0];
// adjust for Windows behavior
pch.directory = pch.directory.replace("\\", "/");
pch.name = pch.name.replace("\\", "/");
pendingChanges = pendingChanges[1 .. $];
if (pch.kind == FileChangeKind.modified && (pch.name == "dira" || pch.name == "dirb"))
goto again;
// test all field excep the isDir one, which does not work on all systems
assert(pch.kind == ch.kind && pch.baseDirectory == ch.baseDirectory &&
pch.directory == ch.directory && pch.name == ch.name,
format("Unexpected change: %s vs %s", pch, ch));
}
void testFile(string name, bool expect_change = true)
{
print("test %s CREATE %s", name, expect_change);
auto fil = File(buildPath(testDir, name), "wt");
expectChange(fchange(FileChangeKind.added, name, false), expect_change);
print("test %s MODIFY %s", name, expect_change);
fil.write("test");
fil.close();
expectChange(fchange(FileChangeKind.modified, name, false), expect_change);
print("test %s DELETE %s", name, expect_change);
remove(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.removed, name, false), expect_change);
}
void testCreateDir(string name, bool expect_change = true)
{
print("test %s CREATEDIR %s", name, expect_change);
mkdir(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.added, name, true), expect_change);
}
void testRemoveDir(string name, bool expect_change = true)
{
print("test %s DELETEDIR %s", name, expect_change);
rmdirRecurse(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.removed, name, true), expect_change);
}
void testRename(string from, string to, bool expect_change = true)
{
print("test %s RENAME %s %s", from, to, expect_change);
rename(buildPath(testDir, from), buildPath(testDir, to));
expectChange(fchange(FileChangeKind.removed, from, true), expect_change);
expectChange(fchange(FileChangeKind.added, to, true), expect_change);
}
FileChange fchange(FileChangeKind kind, string name, bool is_dir)
{
return FileChange(kind, testDir, dirName(name), baseName(name), is_dir);
}