From 6e839de7e22b4ca28aa0ca035797beb8e771b6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 24 Oct 2018 10:44:38 +0200 Subject: [PATCH] Refactor PollEventDriverWatchers. - Better encapsulates the mutex protection inside PollingThread - Uses mallocT/freeT to allocate snapshot nodes --- source/eventcore/drivers/posix/watchers.d | 72 +++++++++++++---------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 4af45a0..c747299 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -3,7 +3,7 @@ module eventcore.drivers.posix.watchers; import eventcore.driver; import eventcore.drivers.posix.driver; -import eventcore.internal.utils : nogc_assert; +import eventcore.internal.utils : mallocT, freeT, nogc_assert; final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers @@ -313,42 +313,40 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat private void onEvent(EventID evt) { - import std.algorithm.mutation : swap; - auto pt = evt in m_pollers; if (!pt) return; m_events.wait(evt, &onEvent); - FileChange[] changes; - try synchronized (pt.m_changesMutex) - swap(changes, pt.m_changes); - catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); - - foreach (ref ch; changes) + foreach (ref ch; pt.readChanges()) pt.m_callback(cast(WatcherID)evt, ch); } private final class PollingThread : Thread { - int refCount = 1; - EventID changesEvent; - private { shared(Events) m_eventsDriver; Mutex m_changesMutex; - /*shared*/ FileChange[] m_changes; + /*shared*/ FileChange[] m_changes; // protected by m_changesMutex + EventID m_changesEvent; // protected by m_changesMutex immutable string m_basePath; immutable bool m_recursive; immutable FileChangesCallback m_callback; - size_t m_entryCount; - struct Entry { - Entry* parent; + final static class Entry { + Entry parent; string name; ulong size; long lastChange; + this(Entry parent, string name, ulong size, long lastChange) + { + this.parent = parent; + this.name = name; + this.size = size; + this.lastChange = lastChange; + } + string path() { import std.path : buildPath; @@ -361,11 +359,13 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat } struct Key { - Entry* parent; + Entry parent; string name; } - Entry*[Key] m_entries; + // used only within the polling thread + Entry[Key] m_entries; + size_t m_entryCount; } this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) @@ -374,7 +374,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat m_changesMutex = new Mutex; m_eventsDriver = event_driver; - changesEvent = event; + m_changesEvent = event; m_basePath = path; m_recursive = recursive; m_callback = callback; @@ -387,10 +387,21 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat void dispose() nothrow { try synchronized (m_changesMutex) { - changesEvent = EventID.invalid; + m_changesEvent = EventID.invalid; } catch (Exception e) assert(false, e.msg); } + FileChange[] readChanges() + nothrow { + import std.algorithm.mutation : swap; + + FileChange[] changes; + try synchronized (m_changesMutex) + swap(changes, m_changes); + catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg); + return changes; + } + private void run() nothrow @trusted { import core.time : msecs; @@ -401,7 +412,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs; while (true) { try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) return; + if (m_changesEvent == EventID.invalid) return; } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); auto remaining = timeout - Clock.currTime(UTC()); if (remaining <= 0.msecs) break; @@ -411,9 +422,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat scan(true); try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) return; + if (m_changesEvent == EventID.invalid) return; if (m_changes.length) - m_eventsDriver.trigger(changesEvent, false); + m_eventsDriver.trigger(m_changesEvent, false); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Throwable th) { import core.stdc.stdio : fprintf, stderr; @@ -435,8 +446,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat @trusted nothrow { import std.algorithm.mutation : swap; - Entry*[Key] new_entries; - Entry*[] added; + Entry[Key] new_entries; + Entry[] added; size_t ec = 0; scan(null, generate_changes, new_entries, added, ec); @@ -445,12 +456,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (generate_changes) addChange(FileChangeKind.removed, e.key, e.value.isDir); + try freeT(e.value); + catch (Exception e) assert(false, e.msg); } - - static if (__VERSION__ >= 2079) { - import core.memory : __delete; - __delete(e.value); - } else mixin("delete e.value;"); } foreach (e; added) @@ -460,7 +468,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat m_entryCount = ec; } - private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec) + private void scan(Entry parent, bool generate_changes, ref Entry[Key] new_entries, ref Entry[] added, ref size_t ec) @trusted nothrow { import std.file : SpanMode, dirEntries; import std.path : buildPath, baseName; @@ -486,7 +494,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat ec++; m_entries.remove(key); } else { - auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); + auto e = mallocT!Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); new_entries[key] = e; ec++; if (generate_changes) added ~= e;