Refactor PollEventDriverWatchers.

- Better encapsulates the mutex protection inside PollingThread
- Uses mallocT/freeT to allocate snapshot nodes
This commit is contained in:
Sönke Ludwig 2018-10-24 10:44:38 +02:00
parent 0b73eda8d5
commit 6e839de7e2

View file

@ -3,7 +3,7 @@ module eventcore.drivers.posix.watchers;
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.posix.driver; import eventcore.drivers.posix.driver;
import eventcore.internal.utils : nogc_assert; import eventcore.internal.utils : mallocT, freeT, nogc_assert;
final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriverWatchers
@ -313,42 +313,40 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
private void onEvent(EventID evt) private void onEvent(EventID evt)
{ {
import std.algorithm.mutation : swap;
auto pt = evt in m_pollers; auto pt = evt in m_pollers;
if (!pt) return; if (!pt) return;
m_events.wait(evt, &onEvent); m_events.wait(evt, &onEvent);
FileChange[] changes; foreach (ref ch; pt.readChanges())
try synchronized (pt.m_changesMutex)
swap(changes, pt.m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
foreach (ref ch; changes)
pt.m_callback(cast(WatcherID)evt, ch); pt.m_callback(cast(WatcherID)evt, ch);
} }
private final class PollingThread : Thread { private final class PollingThread : Thread {
int refCount = 1;
EventID changesEvent;
private { private {
shared(Events) m_eventsDriver; shared(Events) m_eventsDriver;
Mutex m_changesMutex; Mutex m_changesMutex;
/*shared*/ FileChange[] m_changes; /*shared*/ FileChange[] m_changes; // protected by m_changesMutex
EventID m_changesEvent; // protected by m_changesMutex
immutable string m_basePath; immutable string m_basePath;
immutable bool m_recursive; immutable bool m_recursive;
immutable FileChangesCallback m_callback; immutable FileChangesCallback m_callback;
size_t m_entryCount;
struct Entry { final static class Entry {
Entry* parent; Entry parent;
string name; string name;
ulong size; ulong size;
long lastChange; long lastChange;
this(Entry parent, string name, ulong size, long lastChange)
{
this.parent = parent;
this.name = name;
this.size = size;
this.lastChange = lastChange;
}
string path() string path()
{ {
import std.path : buildPath; import std.path : buildPath;
@ -361,11 +359,13 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
} }
struct Key { struct Key {
Entry* parent; Entry parent;
string name; string name;
} }
Entry*[Key] m_entries; // used only within the polling thread
Entry[Key] m_entries;
size_t m_entryCount;
} }
this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback) this(shared(Events) event_driver, EventID event, string path, bool recursive, FileChangesCallback callback)
@ -374,7 +374,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
m_changesMutex = new Mutex; m_changesMutex = new Mutex;
m_eventsDriver = event_driver; m_eventsDriver = event_driver;
changesEvent = event; m_changesEvent = event;
m_basePath = path; m_basePath = path;
m_recursive = recursive; m_recursive = recursive;
m_callback = callback; m_callback = callback;
@ -387,10 +387,21 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
void dispose() void dispose()
nothrow { nothrow {
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
changesEvent = EventID.invalid; m_changesEvent = EventID.invalid;
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
FileChange[] readChanges()
nothrow {
import std.algorithm.mutation : swap;
FileChange[] changes;
try synchronized (m_changesMutex)
swap(changes, m_changes);
catch (Exception e) assert(false, "Failed to acquire mutex: "~e.msg);
return changes;
}
private void run() private void run()
nothrow @trusted { nothrow @trusted {
import core.time : msecs; import core.time : msecs;
@ -401,7 +412,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs; auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs;
while (true) { while (true) {
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) return; if (m_changesEvent == EventID.invalid) return;
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
auto remaining = timeout - Clock.currTime(UTC()); auto remaining = timeout - Clock.currTime(UTC());
if (remaining <= 0.msecs) break; if (remaining <= 0.msecs) break;
@ -411,9 +422,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
scan(true); scan(true);
try synchronized (m_changesMutex) { try synchronized (m_changesMutex) {
if (changesEvent == EventID.invalid) return; if (m_changesEvent == EventID.invalid) return;
if (m_changes.length) if (m_changes.length)
m_eventsDriver.trigger(changesEvent, false); m_eventsDriver.trigger(m_changesEvent, false);
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
} catch (Throwable th) { } catch (Throwable th) {
import core.stdc.stdio : fprintf, stderr; import core.stdc.stdio : fprintf, stderr;
@ -435,8 +446,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
@trusted nothrow { @trusted nothrow {
import std.algorithm.mutation : swap; import std.algorithm.mutation : swap;
Entry*[Key] new_entries; Entry[Key] new_entries;
Entry*[] added; Entry[] added;
size_t ec = 0; size_t ec = 0;
scan(null, generate_changes, new_entries, added, ec); scan(null, generate_changes, new_entries, added, ec);
@ -445,12 +456,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
if (generate_changes) if (generate_changes)
addChange(FileChangeKind.removed, e.key, e.value.isDir); addChange(FileChangeKind.removed, e.key, e.value.isDir);
try freeT(e.value);
catch (Exception e) assert(false, e.msg);
} }
static if (__VERSION__ >= 2079) {
import core.memory : __delete;
__delete(e.value);
} else mixin("delete e.value;");
} }
foreach (e; added) foreach (e; added)
@ -460,7 +468,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
m_entryCount = ec; m_entryCount = ec;
} }
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec) private void scan(Entry parent, bool generate_changes, ref Entry[Key] new_entries, ref Entry[] added, ref size_t ec)
@trusted nothrow { @trusted nothrow {
import std.file : SpanMode, dirEntries; import std.file : SpanMode, dirEntries;
import std.path : buildPath, baseName; import std.path : buildPath, baseName;
@ -486,7 +494,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
ec++; ec++;
m_entries.remove(key); m_entries.remove(key);
} else { } else {
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); auto e = mallocT!Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
new_entries[key] = e; new_entries[key] = e;
ec++; ec++;
if (generate_changes) added ~= e; if (generate_changes) added ~= e;