From aa9a08f571d64c3569a1fd65013d8c5a05a32176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 17:18:33 +0100 Subject: [PATCH 01/11] Explicitly dispose the event driver on thread shutdown. Handle multiple calls to dispose() gracefully, because external code may already make an explicit call to dispose(). --- source/eventcore/core.d | 5 +++++ source/eventcore/drivers/posix/driver.d | 2 ++ source/eventcore/drivers/winapi/driver.d | 2 ++ 3 files changed, 9 insertions(+) diff --git a/source/eventcore/core.d b/source/eventcore/core.d index 91179cc..547d515 100644 --- a/source/eventcore/core.d +++ b/source/eventcore/core.d @@ -30,6 +30,11 @@ static if (!is(NativeEventDriver == EventDriver)) { if (!s_driver) s_driver = new NativeEventDriver; } + static ~this() + { + s_driver.dispose(); + } + shared static this() { s_driver = new NativeEventDriver; diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index 2e8f58f..15e9e95 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -89,9 +89,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { final override void dispose() { + if (!m_loop) return; m_files.dispose(); m_dns.dispose(); m_loop.dispose(); + m_loop = null; } } diff --git a/source/eventcore/drivers/winapi/driver.d b/source/eventcore/drivers/winapi/driver.d index a439525..91d0992 100644 --- a/source/eventcore/drivers/winapi/driver.d +++ b/source/eventcore/drivers/winapi/driver.d @@ -72,7 +72,9 @@ final class WinAPIEventDriver : EventDriver { override void dispose() { + if (!m_events) return; m_events.dispose(); + m_events = null; assert(threadInstance !is null); threadInstance = null; } From c8fd340240b244ef054ba4159ca9b5008c39dab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 02:40:39 +0100 Subject: [PATCH 02/11] Fix destruction of PosixEventDriverEvents event. The reference count was one too high and the waiter count was not properly decremented when an event got destroyed. --- source/eventcore/drivers/posix/events.d | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 25ed35e..e6901c8 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -49,6 +49,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); + releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return return id; } else { auto addr = new InternetAddress(0x7F000001, 0); @@ -56,6 +57,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS if (s == DatagramSocketFD.invalid) return EventID.invalid; m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation + m_sockets.releaseRef(s); // receive() increments the reference count, but we need a value of 1 upon return return cast(EventID)s; } } @@ -140,13 +142,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); void destroy() { () @trusted nothrow { - scope (failure) assert(false); - .destroy(getSlot(descriptor).waiters); - assert(getSlot(descriptor).waiters is null); + try .destroy(getSlot(descriptor).waiters); + catch (Exception e) assert(false, e.msg); } (); } version (linux) { if (--getRC(descriptor) == 0) { + if (!isInternal(descriptor)) + m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; destroy(); m_loop.unregisterFD(descriptor, EventMask.read); m_loop.clearFD(descriptor); @@ -155,6 +158,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS } } else { if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) { + if (!isInternal(descriptor)) + m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; destroy(); m_events.remove(cast(DatagramSocketFD)descriptor); return false; From 1d4fbc4fa2c999098cae54eb01c012e6d3299a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 22 Nov 2017 17:52:46 +0100 Subject: [PATCH 03/11] Fixup poll driver. --- source/eventcore/drivers/posix/watchers.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index c36aaf9..b39c822 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -376,7 +376,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat private void addChange(FileChangeKind kind, Key key, bool is_dir) { try synchronized (m_changesMutex) { - m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir); + m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name, is_dir); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); } From 19879712e60ad851af71752c9c7677bf22fdb692 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 02:41:47 +0100 Subject: [PATCH 04/11] Fix shutdown procedure and rename event order for the polling watcher. Renames should always be reported as removed->added instead of added->removed. --- source/eventcore/drivers/posix/watchers.d | 54 ++++++++++++++++------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index b39c822..771aac4 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -216,7 +216,22 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat PollingThread[EventID] m_pollers; } - this(Events events) { m_events = events; } + this(Events events) + { + m_events = events; + } + + void dispose() + @trusted { + foreach (pt; m_pollers.byValue) { + pt.dispose(); + try pt.join(); + catch (Exception e) { + // not bringing down the application here, because not being + // able to join the thread here likely isn't a problem + } + } + } final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { @@ -280,6 +295,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat pt.m_callback(cast(WatcherID)evt, ch); } + private final class PollingThread : Thread { int refCount = 1; EventID changesEvent; @@ -291,7 +307,6 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat immutable string m_basePath; immutable bool m_recursive; immutable FileChangesCallback m_callback; - shared bool m_shutdown = false; size_t m_entryCount; struct Entry { @@ -337,8 +352,6 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat void dispose() nothrow { - import core.atomic : atomicStore; - try synchronized (m_changesMutex) { changesEvent = EventID.invalid; } catch (Exception e) assert(false, e.msg); @@ -346,21 +359,25 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat private void run() nothrow @trusted { - import core.atomic : atomicLoad; import core.time : msecs; import std.algorithm.comparison : min; + import std.datetime : Clock, UTC; try while (true) { - () @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } (); - - try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) break; - } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs; + while (true) { + try synchronized (m_changesMutex) { + if (changesEvent == EventID.invalid) return; + } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); + auto remaining = timeout - Clock.currTime(UTC()); + if (remaining <= 0.msecs) break; + sleep(min(1000.msecs, remaining)); + } scan(true); try synchronized (m_changesMutex) { - if (changesEvent == EventID.invalid) break; + if (changesEvent == EventID.invalid) return; if (m_changes.length) m_eventsDriver.trigger(changesEvent, false); } catch (Exception e) assert(false, "Mutex lock failed: "~e.msg); @@ -385,9 +402,10 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat import std.algorithm.mutation : swap; Entry*[Key] new_entries; + Entry*[] added; size_t ec = 0; - scan(null, generate_changes, new_entries, ec); + scan(null, generate_changes, new_entries, added, ec); foreach (e; m_entries.byKeyValue) { if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { @@ -397,11 +415,14 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat delete e.value; } + foreach (e; added) + addChange(FileChangeKind.added, Key(e.parent, e.name), e.isDir); + swap(m_entries, new_entries); m_entryCount = ec; } - private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec) + private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec) @trusted nothrow { import std.file : SpanMode, dirEntries; import std.path : buildPath, baseName; @@ -413,7 +434,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat if (auto pe = key in m_entries) { if ((*pe).isDir) { if (m_recursive) - scan(*pe, generate_changes, new_entries, ec); + scan(*pe, generate_changes, new_entries, added, ec); } else { if ((*pe).size != de.size || (*pe).lastChange != modified_time) { if (generate_changes) @@ -430,10 +451,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time); new_entries[key] = e; ec++; - if (generate_changes) - addChange(FileChangeKind.added, key, e.isDir); + if (generate_changes) added ~= e; - if (de.isDir && m_recursive) scan(e, false, new_entries, ec); + if (de.isDir && m_recursive) scan(e, false, new_entries, added, ec); } } catch (Exception e) {} // will result in all children being flagged as removed } From e154446a9dfd63ef70a8950ecb2bdfcd84184f03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 02:42:30 +0100 Subject: [PATCH 05/11] Enable directory watcher tests on macOS. Also fixes some timings to work with the polling watcher implementation. --- tests/0-dirwatcher-rec.d | 5 ----- tests/0-dirwatcher.d | 34 +++++++++++++++++----------------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/tests/0-dirwatcher-rec.d b/tests/0-dirwatcher-rec.d index 67cac90..a38a25d 100644 --- a/tests/0-dirwatcher-rec.d +++ b/tests/0-dirwatcher-rec.d @@ -26,9 +26,6 @@ FileChange[] pendingChanges; void main() { - version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test."); - else { - if (exists(testDir)) rmdirRecurse(testDir); @@ -73,8 +70,6 @@ void main() // make sure that no watchers are registered anymore auto er = eventDriver.core.processEvents(10.msecs); assert(er == ExitReason.outOfWaiters); - - } } void testCallback(WatcherID w, in ref FileChange ch) diff --git a/tests/0-dirwatcher.d b/tests/0-dirwatcher.d index c6ff710..c423b74 100644 --- a/tests/0-dirwatcher.d +++ b/tests/0-dirwatcher.d @@ -6,40 +6,42 @@ module test; import eventcore.core; import std.stdio : File, writefln; -import std.file : exists, remove; +import std.file : exists, mkdir, remove, rmdirRecurse; import core.time : Duration, msecs; bool s_done; int s_cnt = 0; +enum testDir = "watcher_test"; enum testFilename = "test.dat"; void main() { - version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test."); - else { + if (exists(testDir)) + rmdirRecurse(testDir); + mkdir(testDir); + scope (exit) rmdirRecurse(testDir); - if (exists(testFilename)) - remove(testFilename); - - auto id = eventDriver.watchers.watchDirectory(".", false, (id, ref change) { + auto id = eventDriver.watchers.watchDirectory(testDir, false, (id, ref change) { switch (s_cnt++) { - default: assert(false); + default: + import std.conv : to; + assert(false, "Unexpected change: "~change.to!string); case 0: assert(change.kind == FileChangeKind.added); - assert(change.baseDirectory == "."); + assert(change.baseDirectory == testDir); assert(change.directory == ""); assert(change.name == testFilename); break; case 1: assert(change.kind == FileChangeKind.modified); - assert(change.baseDirectory == "."); + assert(change.baseDirectory == testDir); assert(change.directory == ""); assert(change.name == testFilename); break; case 2: assert(change.kind == FileChangeKind.removed); - assert(change.baseDirectory == "."); + assert(change.baseDirectory == testDir); assert(change.directory == ""); assert(change.name == testFilename); eventDriver.watchers.releaseRef(id); @@ -48,18 +50,18 @@ void main() } }); - auto fil = File(testFilename, "wt"); + auto fil = File(testDir~"/"~testFilename, "wt"); auto tm = eventDriver.timers.create(); - eventDriver.timers.set(tm, 100.msecs, 0.msecs); + eventDriver.timers.set(tm, 1500.msecs, 0.msecs); eventDriver.timers.wait(tm, (tm) { scope (failure) assert(false); fil.write("test"); fil.close(); - eventDriver.timers.set(tm, 100.msecs, 0.msecs); + eventDriver.timers.set(tm, 1500.msecs, 0.msecs); eventDriver.timers.wait(tm, (tm) { scope (failure) assert(false); - remove(testFilename); + remove(testDir~"/"~testFilename); }); }); @@ -69,6 +71,4 @@ void main() assert(er == ExitReason.outOfWaiters); assert(s_done); s_done = false; - - } } From 7795195ec109bce7ccf0b85843b3fec82a1d2232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 16:05:00 +0100 Subject: [PATCH 06/11] Fix recursive directory watcher test to check for unexpected events. --- tests/0-dirwatcher-rec.d | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/0-dirwatcher-rec.d b/tests/0-dirwatcher-rec.d index a38a25d..15be972 100644 --- a/tests/0-dirwatcher-rec.d +++ b/tests/0-dirwatcher-rec.d @@ -35,7 +35,7 @@ void main() // test non-recursive watcher watcher = eventDriver.watchers.watchDirectory(testDir, false, toDelegate(&testCallback)); assert(watcher != WatcherID.invalid); - Thread.sleep(1000.msecs); // some watcher implementations need time to initialize + Thread.sleep(400.msecs); // some watcher implementations need time to initialize testFile( "file1.dat"); testFile( "file2.dat"); testFile( "dira/file1.dat", false); @@ -50,7 +50,7 @@ void main() // test recursive watcher watcher = eventDriver.watchers.watchDirectory(testDir, true, toDelegate(&testCallback)); assert(watcher != WatcherID.invalid); - Thread.sleep(100.msecs); // some watcher implementations need time to initialize + Thread.sleep(400.msecs); // some watcher implementations need time to initialize testFile( "file1.dat"); testFile( "file2.dat"); testFile( "dira/file1.dat"); @@ -100,6 +100,8 @@ void expectChange(FileChange ch, bool expect_change) return; } } + assert(expect_change, "Got change although none was expected."); + auto pch = pendingChanges[0]; // adjust for Windows behavior From 4d0037687345e3b14514d1f8e7464f50fd13a4c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 22:56:25 +0100 Subject: [PATCH 07/11] Ensure proper reference count for created Posix events. --- source/eventcore/drivers/posix/events.d | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index e6901c8..08c48a2 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -50,6 +50,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return + assert(getRC(id) == 1); return id; } else { auto addr = new InternetAddress(0x7F000001, 0); @@ -58,7 +59,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation m_sockets.releaseRef(s); // receive() increments the reference count, but we need a value of 1 upon return - return cast(EventID)s; + auto id = cast(EventID)s; + assert(getRC(id) == 1); + return id; } } From 31286e45808936c857aee968027c3ae59e098884 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Thu, 23 Nov 2017 22:57:03 +0100 Subject: [PATCH 08/11] Ensure the socket handle passed to callbacks is always valid on Posix. --- source/eventcore/drivers/posix/sockets.d | 27 ++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index f6611ac..773fb24 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -148,14 +148,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return fd; } - private void onConnect(FD sock) + private void onConnect(FD fd) { + auto sock = cast(StreamSocketFD)fd; + auto l = lockHandle(sock); m_loop.setNotifyCallback!(EventType.write)(sock, null); with (m_loop.m_fds[sock].streamSocket) { state = ConnectionState.connected; auto cb = connectCallback; connectCallback = null; - if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.connected); + if (cb) cb(sock, ConnectStatus.connected); } } @@ -349,6 +351,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void finalize()(IOStatus status) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, slot.bytesRead); @@ -446,6 +449,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); if (!err.among!(EAGAIN, EINPROGRESS)) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(socket, IOStatus.error, slot.bytesRead); return; @@ -456,6 +460,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets slot.bytesWritten += ret; slot.writeBuffer = slot.writeBuffer[ret .. $]; if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.write)(socket, null); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); return; @@ -506,6 +511,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void finalize()(IOStatus status) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.read)(socket, null); //m_fds[fd].readBuffer = null; slot.readCallback(socket, status, 0); @@ -683,12 +689,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); if (!err.among!(EAGAIN, EINPROGRESS)) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.read)(socket, null); slot.readCallback(socket, IOStatus.error, 0, null); return; } } + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.read)(socket, null); scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof); () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addrc); @@ -754,12 +762,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (ret < 0) { auto err = getSocketError(); if (!err.among!(EAGAIN, EINPROGRESS)) { + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.write)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); return; } } + auto l = lockHandle(socket); m_loop.setNotifyCallback!(EventType.write)(socket, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); } @@ -844,6 +854,19 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } return sock; } + + // keeps a scoped reference to a handle to avoid it getting destroyed + private auto lockHandle(H)(H handle) + { + addRef(handle); + static struct R { + PosixEventDriverSockets drv; + H handle; + @disable this(this); + ~this() { drv.releaseRef(handle); } + } + return R(this, handle); + } } package struct StreamSocketSlot { From c8eb30f6f01b7f84fd155baf3122d7d2c0f18892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 25 Nov 2017 00:26:20 +0100 Subject: [PATCH 09/11] Remove polling threads from map. --- source/eventcore/drivers/posix/watchers.d | 1 + 1 file changed, 1 insertion(+) diff --git a/source/eventcore/drivers/posix/watchers.d b/source/eventcore/drivers/posix/watchers.d index 771aac4..f298d9f 100644 --- a/source/eventcore/drivers/posix/watchers.d +++ b/source/eventcore/drivers/posix/watchers.d @@ -272,6 +272,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat assert(pt !is null); if (!m_events.releaseRef(evt)) { pt.dispose(); + m_pollers.remove(evt); return false; } return true; From a5d4cf875cebfa5a9b1fe8708b1cdc6c221bbe22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 25 Nov 2017 00:26:44 +0100 Subject: [PATCH 10/11] Add adoptDatagramSocketInternal. --- source/eventcore/drivers/posix/sockets.d | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index 773fb24..d67743f 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -580,12 +580,17 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } final override DatagramSocketFD adoptDatagramSocket(int socket) + { + return adoptDatagramSocketInternal(socket, false); + } + + package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true) { auto fd = DatagramSocketFD(socket); if (m_loop.m_fds[fd].common.refCount) // FD already in use? return DatagramSocketFD.init; setSocketNonBlocking(fd); - m_loop.initFD(fd, FDFlags.none); + m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.m_fds[fd].specific = DgramSocketSlot.init; return fd; From 07d2bafcac30a3b8541863f602ed0ab9bbc44bbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sat, 25 Nov 2017 00:28:48 +0100 Subject: [PATCH 11/11] Fix threading issues in the generic Posix event implementation. Uses socketpair non-Linux systems and two separate UDP sockets on Windows for the cross-thread communication. --- source/eventcore/drivers/posix/events.d | 132 ++++++++++++++++-------- 1 file changed, 90 insertions(+), 42 deletions(-) diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 08c48a2..a8300d0 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -5,13 +5,16 @@ import eventcore.driver; import eventcore.drivers.posix.driver; import eventcore.internal.consumablequeue : ConsumableQueue; -import std.socket : InternetAddress; - version (linux) { nothrow @nogc extern (C) int eventfd(uint initval, int flags); - import core.sys.posix.unistd : close, read, write; enum EFD_NONBLOCK = 0x800; } +version (Posix) { + import core.sys.posix.unistd : close, read, write; +} else { + import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM, + bind, connect, getsockname, send, socket; +} final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents { @@ -19,10 +22,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS private { Loop m_loop; Sockets m_sockets; + ubyte[ulong.sizeof] m_buf; version (linux) {} else { - EventSlot[DatagramSocketFD] m_events; - ubyte[long.sizeof] m_buf; + // TODO: avoid the overhead of a mutex backed map here + import core.sync.mutex : Mutex; + Mutex m_eventsMutex; + EventID[DatagramSocketFD] m_events; } } @@ -30,6 +36,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS { m_loop = loop; m_sockets = sockets; + version (linux) {} + else m_eventsMutex = new Mutex; } package @property Loop loop() { return m_loop; } @@ -53,13 +61,54 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS assert(getRC(id) == 1); return id; } else { - auto addr = new InternetAddress(0x7F000001, 0); - auto s = m_sockets.createDatagramSocketInternal(addr, addr, true); - if (s == DatagramSocketFD.invalid) return EventID.invalid; + sock_t[2] fd; + version (Posix) { + // create a pair of sockets to communicate between threads + import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair; + if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0) + return EventID.invalid; + + assert(fd[0] != fd[1]); + + // use the first socket as the async receiver + auto s = m_sockets.adoptDatagramSocketInternal(fd[0]); + } else { + // fake missing socketpair support on Windows + import std.socket : InternetAddress; + auto addr = new InternetAddress(0x7F000001, 0); + auto s = m_sockets.createDatagramSocketInternal(addr, null, true); + if (s == DatagramSocketFD.invalid) return EventID.invalid; + fd[0] = cast(sock_t)s; + if (!() @trusted { + fd[1] = socket(AF_INET, SOCK_DGRAM, 0); + int nl = addr.nameLen; + import eventcore.internal.utils : print; + if (bind(fd[1], addr.name, addr.nameLen) != 0) + return false; + assert(nl == addr.nameLen); + if (getsockname(fd[0], addr.name, &nl) != 0) + return false; + if (connect(fd[1], addr.name, addr.nameLen) != 0) + return false; + return true; + } ()) + { + m_sockets.releaseRef(s); + return EventID.invalid; + } + } + m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); - m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation - m_sockets.releaseRef(s); // receive() increments the reference count, but we need a value of 1 upon return - auto id = cast(EventID)s; + + // use the second socket as the event ID and as the sending end for + // other threads + auto id = cast(EventID)fd[1]; + try { + synchronized (m_eventsMutex) + m_events[s] = id; + } catch (Exception e) assert(false, e.msg); + m_loop.initFD(id, FDFlags.internal); + m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s); // FIXME: avoid dynamic memory allocation assert(getRC(id) == 1); return id; } @@ -91,8 +140,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS long one = 1; //log("emitting for all threads"); if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (linux) () @trusted { .write(cast(int)event, &one, one.sizeof); } (); - else thisus.m_sockets.send(cast(DatagramSocketFD)event, thisus.m_buf, IOMode.once, null, &thisus.onSocketDataSent); + version (Posix) .write(cast(int)event, &one, one.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -124,13 +173,15 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS version (linux) {} else { - private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress) - { - } private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) { - onEvent(cast(EventID)s); m_sockets.receive(s, m_buf, IOMode.once, &onSocketData); + EventID evt; + try { + synchronized (m_eventsMutex) + evt = m_events[s]; + onEvent(evt); + } catch (Exception e) assert(false, e.msg); } } @@ -143,43 +194,36 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override bool releaseRef(EventID descriptor) { assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); - void destroy() { + if (--getRC(descriptor) == 0) { + if (!isInternal(descriptor)) + m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; () @trusted nothrow { try .destroy(getSlot(descriptor).waiters); catch (Exception e) assert(false, e.msg); } (); - } - version (linux) { - if (--getRC(descriptor) == 0) { - if (!isInternal(descriptor)) - m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; - destroy(); + version (linux) { m_loop.unregisterFD(descriptor, EventMask.read); - m_loop.clearFD(descriptor); - close(cast(int)descriptor); - return false; - } - } else { - if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) { - if (!isInternal(descriptor)) - m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; - destroy(); - m_events.remove(cast(DatagramSocketFD)descriptor); - return false; + } else { + auto rs = getSlot(descriptor).recvSocket; + m_sockets.cancelReceive(rs); + m_sockets.releaseRef(rs); + try { + synchronized (m_eventsMutex) + m_events.remove(rs); + } catch (Exception e) assert(false, e.msg); } + m_loop.clearFD(descriptor); + version (Posix) close(cast(int)descriptor); + else () @trusted { closesocket(cast(SOCKET)descriptor); } (); + return false; } return true; } private EventSlot* getSlot(EventID id) { - version (linux) { - assert(id < m_loop.m_fds.length, "Invalid event ID."); - return () @trusted { return &m_loop.m_fds[id].event(); } (); - } else { - assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID."); - return &m_events[cast(DatagramSocketFD)id]; - } + assert(id < m_loop.m_fds.length, "Invalid event ID."); + return () @trusted { return &m_loop.m_fds[id].event(); } (); } private ref uint getRC(EventID id) @@ -198,4 +242,8 @@ package struct EventSlot { ConsumableQueue!EventCallback waiters; shared bool triggerAll; bool isInternal; + version (linux) {} + else { + DatagramSocketFD recvSocket; + } }