Merge pull request #74 from vibe-d/fix_multithread_destruction

Allow destructors to run in foreign threads. Fixes #69.
This commit is contained in:
Sönke Ludwig 2018-03-18 11:09:43 +01:00 committed by GitHub
commit 88bcae036c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 29 deletions

View file

@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016-2018, rejectedsoftware e.K." copyright "Copyright © 2016-2018, rejectedsoftware e.K."
license "MIT" license "MIT"
dependency "eventcore" version="~>0.8.18" dependency "eventcore" version="~>0.8.32"
dependency "stdx-allocator" version="~>2.77.0" dependency "stdx-allocator" version="~>2.77.0"
targetName "vibe_core" targetName "vibe_core"

View file

@ -12,6 +12,7 @@ public import vibe.core.task;
import eventcore.core; import eventcore.core;
import vibe.core.args; import vibe.core.args;
import vibe.core.concurrency; import vibe.core.concurrency;
import vibe.core.internal.release;
import vibe.core.log; import vibe.core.log;
import vibe.core.sync : ManualEvent, createSharedManualEvent; import vibe.core.sync : ManualEvent, createSharedManualEvent;
import vibe.core.taskpool : TaskPool; import vibe.core.taskpool : TaskPool;
@ -940,8 +941,13 @@ struct FileDescriptorEvent {
} }
private { private {
static struct Context {
Trigger trigger;
shared(NativeEventDriver) driver;
}
StreamSocketFD m_socket; StreamSocketFD m_socket;
Trigger m_trigger; Context* m_context;
} }
@safe: @safe:
@ -949,7 +955,9 @@ struct FileDescriptorEvent {
private this(int fd, Trigger event_mask) private this(int fd, Trigger event_mask)
nothrow { nothrow {
m_socket = eventDriver.sockets.adoptStream(fd); m_socket = eventDriver.sockets.adoptStream(fd);
m_trigger = event_mask; m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.trigger = event_mask;
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
this(this) this(this)
@ -961,7 +969,7 @@ struct FileDescriptorEvent {
~this() ~this()
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket); releaseHandle!"sockets"(m_socket, m_context.driver);
} }
@ -982,9 +990,9 @@ struct FileDescriptorEvent {
/// ditto /// ditto
bool wait(Duration timeout, Trigger which = Trigger.any) bool wait(Duration timeout, Trigger which = Trigger.any)
{ {
if ((which & m_trigger) == Trigger.none) return true; if ((which & m_context.trigger) == Trigger.none) return true;
assert((which & m_trigger) == Trigger.read, "Waiting for write event not yet supported."); assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported.");
bool got_data; bool got_data;
@ -1006,7 +1014,7 @@ struct FileDescriptorEvent {
*/ */
struct Timer { struct Timer {
private { private {
typeof(eventDriver.timers) m_driver; NativeEventDriver m_driver;
TimerID m_id; TimerID m_id;
debug uint m_magicNumber = 0x4d34f916; debug uint m_magicNumber = 0x4d34f916;
} }
@ -1016,24 +1024,25 @@ struct Timer {
private this(TimerID id) private this(TimerID id)
nothrow { nothrow {
assert(id != TimerID.init, "Invalid timer ID."); assert(id != TimerID.init, "Invalid timer ID.");
m_driver = eventDriver.timers; m_driver = eventDriver;
m_id = id; m_id = id;
} }
this(this) this(this)
nothrow { nothrow {
debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
if (m_driver) m_driver.addRef(m_id); if (m_driver) m_driver.timers.addRef(m_id);
} }
~this() ~this()
nothrow { nothrow {
debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
if (m_driver) m_driver.releaseRef(m_id); if (m_driver)
releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ());
} }
/// True if the timer is yet to fire. /// True if the timer is yet to fire.
@property bool pending() nothrow { return m_driver.isPending(m_id); } @property bool pending() nothrow { return m_driver.timers.isPending(m_id); }
/// The internal ID of the timer. /// The internal ID of the timer.
@property size_t id() const nothrow { return m_id; } @property size_t id() const nothrow { return m_id; }
@ -1041,25 +1050,25 @@ struct Timer {
bool opCast() const nothrow { return m_driver !is null; } bool opCast() const nothrow { return m_driver !is null; }
/// Determines if this reference is the only one /// Determines if this reference is the only one
@property bool unique() const nothrow { return m_driver ? m_driver.isUnique(m_id) : false; } @property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; }
/** Resets the timer to the specified timeout /** Resets the timer to the specified timeout
*/ */
void rearm(Duration dur, bool periodic = false) nothrow void rearm(Duration dur, bool periodic = false) nothrow
in { assert(dur > 0.seconds, "Negative timer duration specified."); } in { assert(dur > 0.seconds, "Negative timer duration specified."); }
body { m_driver.set(m_id, dur, periodic ? dur : 0.seconds); } body { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); }
/** Resets the timer and avoids any firing. /** Resets the timer and avoids any firing.
*/ */
void stop() nothrow { if (m_driver) m_driver.stop(m_id); } void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); }
/** Waits until the timer fires. /** Waits until the timer fires.
*/ */
void wait() void wait()
{ {
asyncAwait!(TimerCallback, asyncAwait!(TimerCallback,
cb => m_driver.wait(m_id, cb), cb => m_driver.timers.wait(m_id, cb),
cb => m_driver.cancelWait(m_id) cb => m_driver.timers.cancelWait(m_id)
); );
} }
} }

View file

@ -7,8 +7,9 @@
*/ */
module vibe.core.file; module vibe.core.file;
import eventcore.core : eventDriver; import eventcore.core : NativeEventDriver, eventDriver;
import eventcore.driver; import eventcore.driver;
import vibe.core.internal.release;
import vibe.core.log; import vibe.core.log;
import vibe.core.path; import vibe.core.path;
import vibe.core.stream; import vibe.core.stream;
@ -395,6 +396,7 @@ struct FileStream {
ulong size; ulong size;
FileMode mode; FileMode mode;
ulong ptr; ulong ptr;
shared(NativeEventDriver) driver;
} }
private { private {
@ -410,6 +412,7 @@ struct FileStream {
m_ctx.path = path; m_ctx.path = path;
m_ctx.mode = mode; m_ctx.mode = mode;
m_ctx.size = eventDriver.files.getSize(fd); m_ctx.size = eventDriver.files.getSize(fd);
m_ctx.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
this(this) this(this)
@ -421,7 +424,7 @@ struct FileStream {
~this() ~this()
{ {
if (m_fd != FileFD.invalid) if (m_fd != FileFD.invalid)
eventDriver.files.releaseRef(m_fd); releaseHandle!"files"(m_fd, m_ctx.driver);
} }
@property int fd() { return cast(int)m_fd; } @property int fd() { return cast(int)m_fd; }
@ -453,9 +456,10 @@ struct FileStream {
void close() void close()
{ {
if (m_fd != FileFD.init) { if (m_fd != FileFD.init) {
eventDriver.files.close(m_fd); eventDriver.files.close(m_fd); // FIXME: may leave dangling references!
eventDriver.files.releaseRef(m_fd); releaseHandle!"files"(m_fd, m_ctx.driver);
m_fd = FileFD.init; m_fd = FileFD.init;
m_ctx = null;
} }
} }
@ -578,6 +582,7 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
bool recursive; bool recursive;
Appender!(DirectoryChange[]) changes; Appender!(DirectoryChange[]) changes;
LocalManualEvent changeEvent; LocalManualEvent changeEvent;
shared(NativeEventDriver) driver;
void onChange(WatcherID, in ref FileChange change) void onChange(WatcherID, in ref FileChange change)
nothrow { nothrow {
@ -611,10 +616,15 @@ struct DirectoryWatcher { // TODO: avoid all those heap allocations!
m_context.path = path; m_context.path = path;
m_context.recursive = recursive; m_context.recursive = recursive;
m_context.changes = appender!(DirectoryChange[]); m_context.changes = appender!(DirectoryChange[]);
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
this(this) nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.addRef(m_watcher); } this(this) nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.addRef(m_watcher); }
~this() nothrow { if (m_watcher != WatcherID.invalid) eventDriver.watchers.releaseRef(m_watcher); } ~this()
nothrow {
if (m_watcher != WatcherID.invalid)
releaseHandle!"watchers"(m_watcher, m_context.driver);
}
/// The path of the watched directory /// The path of the watched directory
@property NativePath path() const nothrow { return m_context.path; } @property NativePath path() const nothrow { return m_context.path; }

View file

@ -0,0 +1,17 @@
module vibe.core.internal.release;
import eventcore.core;
/// Release a handle in a thread-safe way
void releaseHandle(string subsys, H)(H handle, shared(NativeEventDriver) drv)
{
if (drv is (() @trusted => cast(shared)eventDriver)()) {
__traits(getMember, eventDriver, subsys).releaseRef(handle);
} else {
// in case the destructor was called from a foreign thread,
// perform the release in the owner thread
drv.core.runInOwnerThread((h) {
__traits(getMember, eventDriver, subsys).releaseRef(cast(H)h);
}, cast(size_t)handle);
}
}

View file

@ -12,6 +12,7 @@ import std.exception : enforce;
import std.format : format; import std.format : format;
import std.functional : toDelegate; import std.functional : toDelegate;
import std.socket : AddressFamily, UnknownAddress; import std.socket : AddressFamily, UnknownAddress;
import vibe.core.internal.release;
import vibe.core.log; import vibe.core.log;
import vibe.core.stream; import vibe.core.stream;
import vibe.internal.async; import vibe.internal.async;
@ -477,6 +478,7 @@ struct TCPConnection {
bool keepAlive = false; bool keepAlive = false;
Duration readTimeout = Duration.max; Duration readTimeout = Duration.max;
string remoteAddressString; string remoteAddressString;
shared(NativeEventDriver) driver;
} }
private { private {
@ -491,6 +493,7 @@ struct TCPConnection {
m_socket = socket; m_socket = socket;
m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } (); m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } ();
m_context.readBuffer.capacity = 4096; m_context.readBuffer.capacity = 4096;
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
this(this) this(this)
@ -502,7 +505,7 @@ struct TCPConnection {
~this() ~this()
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != StreamSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket); releaseHandle!"sockets"(m_socket, m_context.driver);
} }
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; }
@ -543,7 +546,7 @@ struct TCPConnection {
//logInfo("close %s", cast(int)m_fd); //logInfo("close %s", cast(int)m_fd);
if (m_socket != StreamSocketFD.invalid) { if (m_socket != StreamSocketFD.invalid) {
eventDriver.sockets.shutdown(m_socket, true, true); eventDriver.sockets.shutdown(m_socket, true, true);
eventDriver.sockets.releaseRef(m_socket); releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = StreamSocketFD.invalid; m_socket = StreamSocketFD.invalid;
m_context = null; m_context = null;
} }
@ -783,12 +786,19 @@ struct TCPListener {
// the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
// stopListening() needs to keep working. // stopListening() needs to keep working.
private { private {
static struct Context {
shared(NativeEventDriver) driver;
}
StreamListenSocketFD m_socket; StreamListenSocketFD m_socket;
Context* m_context;
} }
this(StreamListenSocketFD socket) this(StreamListenSocketFD socket)
{ {
m_socket = socket; m_socket = socket;
m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; }
@ -807,7 +817,7 @@ struct TCPListener {
void stopListening() void stopListening()
{ {
if (m_socket != StreamListenSocketFD.invalid) { if (m_socket != StreamListenSocketFD.invalid) {
eventDriver.sockets.releaseRef(m_socket); releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = StreamListenSocketFD.invalid; m_socket = StreamListenSocketFD.invalid;
} }
} }
@ -820,6 +830,7 @@ struct TCPListener {
struct UDPConnection { struct UDPConnection {
static struct Context { static struct Context {
bool canBroadcast; bool canBroadcast;
shared(NativeEventDriver) driver;
} }
private { private {
@ -833,19 +844,20 @@ struct UDPConnection {
m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); m_socket = eventDriver.sockets.createDatagramSocket(baddr, null);
enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket."); enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket.");
m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
} }
this(this) this(this)
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != DatagramSocketFD.invalid)
eventDriver.sockets.addRef(m_socket); eventDriver.sockets.addRef(m_socket);
} }
~this() ~this()
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != DatagramSocketFD.invalid)
eventDriver.sockets.releaseRef(m_socket); releaseHandle!"sockets"(m_socket, m_context.driver);
} }
bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; }
@ -896,7 +908,14 @@ struct UDPConnection {
/** Stops listening for datagrams and frees all resources. /** Stops listening for datagrams and frees all resources.
*/ */
void close() { eventDriver.sockets.releaseRef(m_socket); m_socket = DatagramSocketFD.init; } void close()
{
if (m_socket != DatagramSocketFD.invalid) {
releaseHandle!"sockets"(m_socket, m_context.driver);
m_socket = DatagramSocketFD.init;
m_context = null;
}
}
/** Locks the UDP connection to a certain peer. /** Locks the UDP connection to a certain peer.

View file

@ -1267,8 +1267,10 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
static if (EVENT_TRIGGERED) { static if (EVENT_TRIGGERED) {
~this() ~this()
{ {
import vibe.core.internal.release : releaseHandle;
if (m_event != EventID.invalid) if (m_event != EventID.invalid)
eventDriver.events.releaseRef(m_event); releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
} }
} }