Allow destructors to run in foreign threads. Fixes #69.

This change modifies destructors to anticipate that they can be called form a foreign thread if the GC is involved. The actual release of the reference will then happen deferred in the original thread.
This commit is contained in:
Sönke Ludwig 2018-03-16 18:06:53 +01:00
parent 14f4e06b8a
commit 2c63aa5c5c
6 changed files with 86 additions and 29 deletions

View file

@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016-2018, rejectedsoftware e.K."
license "MIT"
dependency "eventcore" version="~>0.8.18"
dependency "eventcore" version="~>0.8.32"
dependency "stdx-allocator" version="~>2.77.0"
targetName "vibe_core"

View file

@ -12,6 +12,7 @@ public import vibe.core.task;
import eventcore.core;
import vibe.core.args;
import vibe.core.concurrency;
import vibe.core.internal.release;
import vibe.core.log;
import vibe.core.sync : ManualEvent, createSharedManualEvent;
import vibe.core.taskpool : TaskPool;
@ -940,8 +941,13 @@ struct FileDescriptorEvent {
}
private {
static struct Context {
Trigger trigger;
shared(NativeEventDriver) driver;
}
StreamSocketFD m_socket;
Trigger m_trigger;
Context* m_context;
}
@safe:
@ -949,7 +955,9 @@ struct FileDescriptorEvent {
private this(int fd, Trigger event_mask)
nothrow {
m_socket = eventDriver.sockets.adoptStream(fd);
m_trigger = event_mask;
m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.trigger = event_mask;
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this)
@ -961,7 +969,7 @@ struct FileDescriptorEvent {
~this()
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket);
releaseHandle!"sockets"(m_socket, m_context.driver);
}
@ -982,9 +990,9 @@ struct FileDescriptorEvent {
/// ditto
bool wait(Duration timeout, Trigger which = Trigger.any)
{
if ((which & m_trigger) == Trigger.none) return true;
if ((which & m_context.trigger) == Trigger.none) return true;
assert((which & m_trigger) == Trigger.read, "Waiting for write event not yet supported.");
assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported.");
bool got_data;
@ -1006,7 +1014,7 @@ struct FileDescriptorEvent {
*/
struct Timer {
private {
typeof(eventDriver.timers) m_driver;
NativeEventDriver m_driver;
TimerID m_id;
debug uint m_magicNumber = 0x4d34f916;
}
@ -1016,24 +1024,25 @@ struct Timer {
private this(TimerID id)
nothrow {
assert(id != TimerID.init, "Invalid timer ID.");
m_driver = eventDriver.timers;
m_driver = eventDriver;
m_id = id;
}
this(this)
nothrow {
debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
if (m_driver) m_driver.addRef(m_id);
if (m_driver) m_driver.timers.addRef(m_id);
}
~this()
nothrow {
debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
if (m_driver) m_driver.releaseRef(m_id);
if (m_driver)
releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ());
}
/// True if the timer is yet to fire.
@property bool pending() nothrow { return m_driver.isPending(m_id); }
@property bool pending() nothrow { return m_driver.timers.isPending(m_id); }
/// The internal ID of the timer.
@property size_t id() const nothrow { return m_id; }
@ -1041,25 +1050,25 @@ struct Timer {
bool opCast() const nothrow { return m_driver !is null; }
/// Determines if this reference is the only one
@property bool unique() const nothrow { return m_driver ? m_driver.isUnique(m_id) : false; }
@property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; }
/** Resets the timer to the specified timeout
*/
void rearm(Duration dur, bool periodic = false) nothrow
in { assert(dur > 0.seconds, "Negative timer duration specified."); }
body { m_driver.set(m_id, dur, periodic ? dur : 0.seconds); }
body { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); }
/** Resets the timer and avoids any firing.
*/
void stop() nothrow { if (m_driver) m_driver.stop(m_id); }
void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); }
/** Waits until the timer fires.
*/
void wait()
{
asyncAwait!(TimerCallback,
cb => m_driver.wait(m_id, cb),
cb => m_driver.cancelWait(m_id)
cb => m_driver.timers.wait(m_id, cb),
cb => m_driver.timers.cancelWait(m_id)
);
}
}

View file

@ -7,8 +7,9 @@
*/
module vibe.core.file;
import eventcore.core : eventDriver;
import eventcore.core : NativeEventDriver, eventDriver;
import eventcore.driver;
import vibe.core.internal.release;
import vibe.core.log;
import vibe.core.path;
import vibe.core.stream;
@ -395,6 +396,7 @@ struct FileStream {
ulong size;
FileMode mode;
ulong ptr;
shared(NativeEventDriver) driver;
}
private {
@ -410,6 +412,7 @@ struct FileStream {
m_ctx.path = path;
m_ctx.mode = mode;
m_ctx.size = eventDriver.files.getSize(fd);
m_ctx.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this)
@ -421,7 +424,7 @@ struct FileStream {
~this()
{
if (m_fd != FileFD.invalid)
eventDriver.files.releaseRef(m_fd);
releaseHandle!"files"(m_fd, m_ctx.driver);
}
@property int fd() { return cast(int)m_fd; }
@ -453,9 +456,10 @@ struct FileStream {
void close()
{
if (m_fd != FileFD.init) {
eventDriver.files.close(m_fd);
eventDriver.files.releaseRef(m_fd);
eventDriver.files.close(m_fd); // FIXME: may leave dangling references!
releaseHandle!"files"(m_fd, m_ctx.driver);
m_fd = FileFD.init;
m_ctx = null;
}
}
@ -578,6 +582,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
bool recursive;
Appender!(DirectoryChange[]) changes;
LocalManualEvent changeEvent;
shared(NativeEventDriver) driver;
void onChange(WatcherID, in ref FileChange change)
nothrow {
@ -611,10 +616,15 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
m_context.path = path;
m_context.recursive = recursive;
m_context.changes = appender!(DirectoryChange[]);
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this) nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.addRef(m_watcher); }
~this() nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.releaseRef(m_watcher); }
~this()
nothrow {
if (m_watcher != WatcherID.invalid)
releaseHandle!"watchers"(m_watcher, m_context.driver);
}
/// The path of the watched directory
@property NativePath path() const nothrow { return m_context.path; }

View file

@ -0,0 +1,17 @@
module vibe.core.internal.release;
import eventcore.core;
/// Release a handle in a thread-safe way
void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv)
{
if (drv is (() @trusted => cast(shared)eventDriver)()) {
__traits(getMember, eventDriver, subsys).releaseRef(handle);
} else {
// in case the destructor was called from a foreign thread,
// perform the release in the owner thread
drv.core.runInOwnerThread((h) {
__traits(getMember, eventDriver, subsys).releaseRef(cast(H)h);
}, cast(size_t)handle);
}
}

View file

@ -12,6 +12,7 @@ import std.exception : enforce;
import std.format : format;
import std.functional : toDelegate;
import std.socket : AddressFamily, UnknownAddress;
import vibe.core.internal.release;
import vibe.core.log;
import vibe.core.stream;
import vibe.internal.async;
@ -477,6 +478,7 @@ struct TCPConnection {
bool keepAlive = false;
Duration readTimeout = Duration.max;
string remoteAddressString;
shared(NativeEventDriver) driver;
}
private {
@ -491,6 +493,7 @@ struct TCPConnection {
m_socket = socket;
m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } ();
m_context.readBuffer.capacity = 4096;
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this)
@ -502,7 +505,7 @@ struct TCPConnection {
~this()
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket);
releaseHandle!"sockets"(m_socket, m_context.driver);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; }
@ -543,7 +546,7 @@ struct TCPConnection {
//logInfo("close %s", cast(int)m_fd);
if (m_socket != StreamSocketFD.invalid) {
eventDriver.sockets.shutdown(m_socket, true, true);
eventDriver.sockets.releaseRef(m_socket);
releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = StreamSocketFD.invalid;
m_context = null;
}
@ -783,12 +786,19 @@ struct TCPListener {
// the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
// stopListening() needs to keep working.
private {
static struct Context {
shared(NativeEventDriver) driver;
}
StreamListenSocketFD m_socket;
Context* m_context;
}
this(StreamListenSocketFD socket)
{
m_socket = socket;
m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; }
@ -807,7 +817,7 @@ struct TCPListener {
void stopListening()
{
if (m_socket != StreamListenSocketFD.invalid) {
eventDriver.sockets.releaseRef(m_socket);
releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = StreamListenSocketFD.invalid;
}
}
@ -820,6 +830,7 @@ struct TCPListener {
struct UDPConnection {
static struct Context {
bool canBroadcast;
shared(NativeEventDriver) driver;
}
private {
@ -833,19 +844,20 @@ struct UDPConnection {
m_socket = eventDriver.sockets.createDatagramSocket(baddr, null);
enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket.");
m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
}
this(this)
nothrow {
if (m_socket != StreamSocketFD.invalid)
if (m_socket != DatagramSocketFD.invalid)
eventDriver.sockets.addRef(m_socket);
}
~this()
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket);
if (m_socket != DatagramSocketFD.invalid)
releaseHandle!"sockets"(m_socket, m_context.driver);
}
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; }
@ -896,7 +908,14 @@ struct UDPConnection {
/** Stops listening for datagrams and frees all resources.
*/
void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; }
void close()
{
if (m_socket != DatagramSocketFD.invalid) {
releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = DatagramSocketFD.init;
m_context = null;
}
}
/** Locks the UDP connection to a certain peer.

View file

@ -1267,8 +1267,10 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
static if (EVENT_TRIGGERED) {
~this()
{
import vibe.core.internal.release : releaseHandle;
if (m_event != EventID.invalid)
eventDriver.events.releaseRef(m_event);
releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
}
}