From 2c63aa5c5cf3b8fbe811d72fb766519b4f8c0a1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 16 Mar 2018 18:06:53 +0100 Subject: [PATCH] Allow destructors to run in foreign threads. Fixes #69. This change modifies destructors to anticipate that they can be called form a foreign thread if the GC is involved. The actual release of the reference will then happen deferred in the original thread. --- dub.sdl | 2 +- source/vibe/core/core.d | 39 ++++++++++++++++++----------- source/vibe/core/file.d | 20 +++++++++++---- source/vibe/core/internal/release.d | 17 +++++++++++++ source/vibe/core/net.d | 33 ++++++++++++++++++------ source/vibe/core/sync.d | 4 ++- 6 files changed, 86 insertions(+), 29 deletions(-) create mode 100644 source/vibe/core/internal/release.d diff --git a/dub.sdl b/dub.sdl index 5defb90..dcefdc2 100644 --- a/dub.sdl +++ b/dub.sdl @@ -4,7 +4,7 @@ authors "Sönke Ludwig" copyright "Copyright © 2016-2018, rejectedsoftware e.K." license "MIT" -dependency "eventcore" version="~>0.8.18" +dependency "eventcore" version="~>0.8.32" dependency "stdx-allocator" version="~>2.77.0" targetName "vibe_core" diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 419dd0e..4e1982a 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -12,6 +12,7 @@ public import vibe.core.task; import eventcore.core; import vibe.core.args; import vibe.core.concurrency; +import vibe.core.internal.release; import vibe.core.log; import vibe.core.sync : ManualEvent, createSharedManualEvent; import vibe.core.taskpool : TaskPool; @@ -940,8 +941,13 @@ struct FileDescriptorEvent { } private { + static struct Context { + Trigger trigger; + shared(NativeEventDriver) driver; + } + StreamSocketFD m_socket; - Trigger m_trigger; + Context* m_context; } @safe: @@ -949,7 +955,9 @@ struct FileDescriptorEvent { private this(int fd, Trigger event_mask) nothrow { m_socket = eventDriver.sockets.adoptStream(fd); - m_trigger = event_mask; + m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); + m_context.trigger = event_mask; + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); } this(this) @@ -961,7 +969,7 @@ struct FileDescriptorEvent { ~this() nothrow { if (m_socket != StreamSocketFD.invalid) - eventDriver.sockets.releaseRef(m_socket); + releaseHandle!"sockets"(m_socket, m_context.driver); } @@ -982,9 +990,9 @@ struct FileDescriptorEvent { /// ditto bool wait(Duration timeout, Trigger which = Trigger.any) { - if ((which & m_trigger) == Trigger.none) return true; + if ((which & m_context.trigger) == Trigger.none) return true; - assert((which & m_trigger) == Trigger.read, "Waiting for write event not yet supported."); + assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported."); bool got_data; @@ -1006,7 +1014,7 @@ struct FileDescriptorEvent { */ struct Timer { private { - typeof(eventDriver.timers) m_driver; + NativeEventDriver m_driver; TimerID m_id; debug uint m_magicNumber = 0x4d34f916; } @@ -1016,24 +1024,25 @@ struct Timer { private this(TimerID id) nothrow { assert(id != TimerID.init, "Invalid timer ID."); - m_driver = eventDriver.timers; + m_driver = eventDriver; m_id = id; } this(this) nothrow { debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); - if (m_driver) m_driver.addRef(m_id); + if (m_driver) m_driver.timers.addRef(m_id); } ~this() nothrow { debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); - if (m_driver) m_driver.releaseRef(m_id); + if (m_driver) + releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ()); } /// True if the timer is yet to fire. - @property bool pending() nothrow { return m_driver.isPending(m_id); } + @property bool pending() nothrow { return m_driver.timers.isPending(m_id); } /// The internal ID of the timer. @property size_t id() const nothrow { return m_id; } @@ -1041,25 +1050,25 @@ struct Timer { bool opCast() const nothrow { return m_driver !is null; } /// Determines if this reference is the only one - @property bool unique() const nothrow { return m_driver ? m_driver.isUnique(m_id) : false; } + @property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; } /** Resets the timer to the specified timeout */ void rearm(Duration dur, bool periodic = false) nothrow in { assert(dur > 0.seconds, "Negative timer duration specified."); } - body { m_driver.set(m_id, dur, periodic ? dur : 0.seconds); } + body { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); } /** Resets the timer and avoids any firing. */ - void stop() nothrow { if (m_driver) m_driver.stop(m_id); } + void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); } /** Waits until the timer fires. */ void wait() { asyncAwait!(TimerCallback, - cb => m_driver.wait(m_id, cb), - cb => m_driver.cancelWait(m_id) + cb => m_driver.timers.wait(m_id, cb), + cb => m_driver.timers.cancelWait(m_id) ); } } diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index 7c641c5..ac9cdf0 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -7,8 +7,9 @@ */ module vibe.core.file; -import eventcore.core : eventDriver; +import eventcore.core : NativeEventDriver, eventDriver; import eventcore.driver; +import vibe.core.internal.release; import vibe.core.log; import vibe.core.path; import vibe.core.stream; @@ -395,6 +396,7 @@ struct FileStream { ulong size; FileMode mode; ulong ptr; + shared(NativeEventDriver) driver; } private { @@ -410,6 +412,7 @@ struct FileStream { m_ctx.path = path; m_ctx.mode = mode; m_ctx.size = eventDriver.files.getSize(fd); + m_ctx.driver = () @trusted { return cast(shared)eventDriver; } (); } this(this) @@ -421,7 +424,7 @@ struct FileStream { ~this() { if (m_fd != FileFD.invalid) - eventDriver.files.releaseRef(m_fd); + releaseHandle!"files"(m_fd, m_ctx.driver); } @property int fd() { return cast(int)m_fd; } @@ -453,9 +456,10 @@ struct FileStream { void close() { if (m_fd != FileFD.init) { - eventDriver.files.close(m_fd); - eventDriver.files.releaseRef(m_fd); + eventDriver.files.close(m_fd); // FIXME: may leave dangling references! + releaseHandle!"files"(m_fd, m_ctx.driver); m_fd = FileFD.init; + m_ctx = null; } } @@ -578,6 +582,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations! bool recursive; Appender!(DirectoryChange[]) changes; LocalManualEvent changeEvent; + shared(NativeEventDriver) driver; void onChange(WatcherID, in ref FileChange change) nothrow { @@ -611,10 +616,15 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations! m_context.path = path; m_context.recursive = recursive; m_context.changes = appender!(DirectoryChange[]); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); } this(this) nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.addRef(m_watcher); } - ~this() nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.releaseRef(m_watcher); } + ~this() + nothrow { + if (m_watcher != WatcherID.invalid) + releaseHandle!"watchers"(m_watcher, m_context.driver); + } /// The path of the watched directory @property NativePath path() const nothrow { return m_context.path; } diff --git a/source/vibe/core/internal/release.d b/source/vibe/core/internal/release.d new file mode 100644 index 0000000..73e6323 --- /dev/null +++ b/source/vibe/core/internal/release.d @@ -0,0 +1,17 @@ +module vibe.core.internal.release; + +import eventcore.core; + +/// Release a handle in a thread-safe way +void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv) +{ + if (drv is (() @trusted => cast(shared)eventDriver)()) { + __traits(getMember, eventDriver, subsys).releaseRef(handle); + } else { + // in case the destructor was called from a foreign thread, + // perform the release in the owner thread + drv.core.runInOwnerThread((h) { + __traits(getMember, eventDriver, subsys).releaseRef(cast(H)h); + }, cast(size_t)handle); + } +} diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index e1aeabe..bb1dd67 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -12,6 +12,7 @@ import std.exception : enforce; import std.format : format; import std.functional : toDelegate; import std.socket : AddressFamily, UnknownAddress; +import vibe.core.internal.release; import vibe.core.log; import vibe.core.stream; import vibe.internal.async; @@ -477,6 +478,7 @@ struct TCPConnection { bool keepAlive = false; Duration readTimeout = Duration.max; string remoteAddressString; + shared(NativeEventDriver) driver; } private { @@ -491,6 +493,7 @@ struct TCPConnection { m_socket = socket; m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } (); m_context.readBuffer.capacity = 4096; + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); } this(this) @@ -502,7 +505,7 @@ struct TCPConnection { ~this() nothrow { if (m_socket != StreamSocketFD.invalid) - eventDriver.sockets.releaseRef(m_socket); + releaseHandle!"sockets"(m_socket, m_context.driver); } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } @@ -543,7 +546,7 @@ struct TCPConnection { //logInfo("close %s", cast(int)m_fd); if (m_socket != StreamSocketFD.invalid) { eventDriver.sockets.shutdown(m_socket, true, true); - eventDriver.sockets.releaseRef(m_socket); + releaseHandle!"sockets"(m_socket, m_context.driver); m_socket = StreamSocketFD.invalid; m_context = null; } @@ -783,12 +786,19 @@ struct TCPListener { // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, // stopListening() needs to keep working. private { + static struct Context { + shared(NativeEventDriver) driver; + } + StreamListenSocketFD m_socket; + Context* m_context; } this(StreamListenSocketFD socket) { m_socket = socket; + m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } @@ -807,7 +817,7 @@ struct TCPListener { void stopListening() { if (m_socket != StreamListenSocketFD.invalid) { - eventDriver.sockets.releaseRef(m_socket); + releaseHandle!"sockets"(m_socket, m_context.driver); m_socket = StreamListenSocketFD.invalid; } } @@ -820,6 +830,7 @@ struct TCPListener { struct UDPConnection { static struct Context { bool canBroadcast; + shared(NativeEventDriver) driver; } private { @@ -833,19 +844,20 @@ struct UDPConnection { m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket."); m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); + m_context.driver = () @trusted { return cast(shared)eventDriver; } (); } this(this) nothrow { - if (m_socket != StreamSocketFD.invalid) + if (m_socket != DatagramSocketFD.invalid) eventDriver.sockets.addRef(m_socket); } ~this() nothrow { - if (m_socket != StreamSocketFD.invalid) - eventDriver.sockets.releaseRef(m_socket); + if (m_socket != DatagramSocketFD.invalid) + releaseHandle!"sockets"(m_socket, m_context.driver); } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } @@ -896,7 +908,14 @@ struct UDPConnection { /** Stops listening for datagrams and frees all resources. */ - void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; } + void close() + { + if (m_socket != DatagramSocketFD.invalid) { + releaseHandle!"sockets"(m_socket, m_context.driver); + m_socket = DatagramSocketFD.init; + m_context = null; + } + } /** Locks the UDP connection to a certain peer. diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 6881cd0..edee964 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1267,8 +1267,10 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { static if (EVENT_TRIGGERED) { ~this() { + import vibe.core.internal.release : releaseHandle; + if (m_event != EventID.invalid) - eventDriver.events.releaseRef(m_event); + releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ()); } }