Make the Posix driver initialization nogc.

This commit is contained in:
Sönke Ludwig 2018-10-21 20:16:26 +02:00
parent a4eaafce9a
commit e7e4a0f5f5
9 changed files with 128 additions and 85 deletions

View file

@ -352,13 +352,31 @@ interface EventDriverSockets {
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T, FD)(FD descriptor)
@trusted {
@property final ref T userData(T, FD)(FD descriptor) @trusted
if (!hasNoGCLifetime!T)
{
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
/// ditto
@property final ref T userData(T, FD)(FD descriptor) @trusted @nogc
if (hasNoGCLifetime!T)
{
import std.conv : emplace;
static void init(void* ptr) @nogc { emplace(cast(T*)ptr); }
static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); }
scope getter = {
return cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
};
static if (__traits(compiles, () nothrow @trusted { getter(); }))
return *(cast(T* delegate() @nogc nothrow)getter)();
else
return *(cast(T* delegate() @nogc)getter)();
}
/// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
@ -368,6 +386,14 @@ interface EventDriverSockets {
protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
}
enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); });
unittest {
static struct S1 {}
static struct S2 { ~this() { new int; } }
static assert(hasNoGCLifetime!S1);
static assert(!hasNoGCLifetime!S2);
}
/** Performs asynchronous DNS queries.
*/

View file

@ -43,7 +43,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
}
this(Events events, Signals signals)
{
@nogc {
m_events = events;
setupEvent();
}
@ -139,7 +139,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
}
private void setupEvent()
{
@nogc {
if (m_event == EventID.invalid) {
m_event = m_events.createInternal();
m_events.wait(m_event, &onDNSSignal);

View file

@ -64,16 +64,16 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
}
this()
{
m_loop = new Loop;
m_sockets = new SocketsDriver(m_loop);
m_events = new EventsDriver(m_loop, m_sockets);
m_signals = new SignalsDriver(m_loop);
m_timers = new TimerDriver;
m_core = new CoreDriver(m_loop, m_timers, m_events);
m_dns = new DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events);
m_watchers = new WatcherDriver(m_events);
@nogc @trusted {
m_loop = mallocT!Loop;
m_sockets = mallocT!SocketsDriver(m_loop);
m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver;
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events);
m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events);
m_watchers = mallocT!WatcherDriver(m_events);
}
// force overriding these in the (final) sub classes to avoid virtual calls
@ -96,6 +96,19 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_core.dispose();
m_loop.dispose();
m_loop = null;
try () @trusted {
freeT(m_watchers);
freeT(m_files);
freeT(m_dns);
freeT(m_core);
freeT(m_timers);
freeT(m_signals);
freeT(m_events);
freeT(m_sockets);
freeT(m_loop);
} ();
catch (Exception e) assert(false, e.msg);
}
}
@ -121,29 +134,32 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
}
protected this(Loop loop, Timers timers, Events events)
{
this(Loop loop, Timers timers, Events events)
@nogc {
m_loop = loop;
m_timers = timers;
m_events = events;
m_wakeupEvent = events.createInternal();
static if (__VERSION__ >= 2074)
m_threadCallbackMutex = new shared Mutex;
m_threadCallbackMutex = mallocT!(shared(Mutex));
else {
() @trusted { m_threadCallbackMutex = cast(shared)new Mutex; } ();
() @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } ();
}
m_threadCallbacks = new ConsumableQueue!(Tuple!(ThreadCallback, intptr_t));
m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)));
m_threadCallbacks.reserve(1000);
}
protected final void dispose()
final void dispose()
{
executeThreadCallbacks();
m_events.releaseRef(m_wakeupEvent);
atomicStore(m_threadCallbackMutex, null);
m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized!
try {
() @trusted { freeT(m_threadCallbackMutex); } ();
() @trusted { freeT(m_threadCallbacks); } ();
} catch (Exception e) assert(false, e.msg);
}
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
@ -274,11 +290,11 @@ package class PosixEventLoop {
protected abstract bool doProcessEvents(Duration dur);
/// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true);
protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc;
/// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd, EventMask mask);
protected abstract void unregisterFD(FD fd, EventMask mask) @nogc;
/// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true);
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc;
final protected void notify(EventType evt)(FD fd)
{
@ -342,7 +358,7 @@ package class PosixEventLoop {
}
package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
@system @nogc {
FDSlot* fds = &m_fds[descriptor].common;
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");

View file

@ -5,7 +5,7 @@
numbers of concurrently open sockets.
*/
module eventcore.drivers.posix.epoll;
@safe: /*@nogc:*/ nothrow:
@safe @nogc nothrow:
version (linux):
@ -22,17 +22,16 @@ static if (!is(typeof(SOCK_CLOEXEC)))
enum SOCK_CLOEXEC = 0x80000;
final class EpollEventLoop : PosixEventLoop {
@safe: nothrow:
@safe nothrow:
private {
int m_epoll;
epoll_event[] m_events;
epoll_event[100] m_events;
}
this()
{
@nogc {
m_epoll = () @trusted { return epoll_create1(SOCK_CLOEXEC); } ();
m_events.length = 100;
}
override bool doProcessEvents(Duration timeout)
@ -60,7 +59,7 @@ final class EpollEventLoop : PosixEventLoop {
}
override void dispose()
{
@nogc {
import core.sys.posix.unistd : close;
close(m_epoll);
}

View file

@ -4,7 +4,8 @@ module eventcore.drivers.posix.events;
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils : nogc_assert;
import eventcore.internal.utils : nogc_assert, mallocT, freeT;
version (linux) {
nothrow @nogc extern (C) int eventfd(uint initval, int flags);
@ -25,21 +26,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
Loop m_loop;
Sockets m_sockets;
ubyte[ulong.sizeof] m_buf;
version (linux) {}
else {
// TODO: avoid the overhead of a mutex backed map here
import core.sync.mutex : Mutex;
Mutex m_eventsMutex;
EventID[DatagramSocketFD] m_events;
}
}
this(Loop loop, Sockets sockets)
{
@nogc {
m_loop = loop;
m_sockets = sockets;
version (linux) {}
else m_eventsMutex = new Mutex;
}
package @property Loop loop() { return m_loop; }
@ -50,14 +42,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
package(eventcore) EventID createInternal(bool is_internal = true)
{
@nogc {
version (linux) {
auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (eid == -1) return EventID.invalid;
auto id = cast(EventID)eid;
// FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal));
EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal));
m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
@ -83,7 +75,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
} else {
// fake missing socketpair support on Windows
import std.socket : InternetAddress;
auto addr = new InternetAddress(0x7F000001, 0);
scope addr = new InternetAddress(0x7F000001, 0);
auto s = m_sockets.createDatagramSocketInternal(addr, null, true);
if (s == DatagramSocketFD.invalid) return EventID.invalid;
fd[0] = cast(sock_t)s;
@ -106,18 +98,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
}
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
// use the second socket as the event ID and as the sending end for
// other threads
auto id = cast(EventID)fd[1];
try {
synchronized (m_eventsMutex)
m_events[s] = id;
} catch (Exception e) assert(false, e.msg);
try m_sockets.userData!EventID(s) = id;
catch (Exception e) assert(false, e.msg);
// FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s));
EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s));
assert(getRC(id) == 1);
return id;
}
@ -142,7 +132,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
final override void trigger(EventID event, bool notify_all)
shared @trusted {
shared @trusted @nogc {
import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriverEvents)this;
assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
@ -154,7 +144,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
final override void wait(EventID event, EventCallback on_event)
{
@nogc {
if (!isInternal(event)) m_loop.m_waiterCount++;
getSlot(event).waiters.put(on_event);
}
@ -183,13 +173,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
version (linux) {}
else {
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
{
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
EventID evt;
@nogc {
m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
try {
synchronized (m_eventsMutex)
evt = m_events[s];
onEvent(evt);
EventID evt = m_sockets.userData!EventID(s);
scope doit = { onEvent(evt); }; // cast to nogc
() @trusted { (cast(void delegate() @nogc)doit)(); } ();
} catch (Exception e) assert(false, e.msg);
}
}
@ -201,13 +190,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
final override bool releaseRef(EventID descriptor)
{
@nogc {
nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
if (--getRC(descriptor) == 0) {
if (!isInternal(descriptor))
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
() @trusted nothrow {
try .destroy(getSlot(descriptor).waiters);
try freeT(getSlot(descriptor).waiters);
catch (Exception e) nogc_assert(false, e.msg);
} ();
version (linux) {
@ -216,10 +205,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
auto rs = getSlot(descriptor).recvSocket;
m_sockets.cancelReceive(rs);
m_sockets.releaseRef(rs);
try {
synchronized (m_eventsMutex)
m_events.remove(rs);
} catch (Exception e) nogc_assert(false, e.msg);
}
m_loop.clearFD!EventSlot(descriptor);
version (Posix) close(cast(int)descriptor);
@ -235,9 +220,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
private EventSlot* getSlot(EventID id)
{
@nogc {
nogc_assert(id < m_loop.m_fds.length, "Invalid event ID.");
return () @trusted { return &m_loop.m_fds[id].event(); } ();
return () @trusted @nogc { return &m_loop.m_fds[id].event(); } ();
}
private ref uint getRC(EventID id)
@ -246,7 +231,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
}
private bool isInternal(EventID id)
{
@nogc {
return getSlot(id).isInternal;
}
}

View file

@ -34,14 +34,14 @@ alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop;
final class KqueueEventLoop : PosixEventLoop {
private {
int m_queue;
kevent_t[] m_changes;
kevent_t[] m_events;
size_t m_changeCount = 0;
kevent_t[100] m_changes;
kevent_t[100] m_events;
}
this()
@safe nothrow {
@safe nothrow @nogc {
m_queue = () @trusted { return kqueue(); } ();
m_events.length = 100;
assert(m_queue >= 0, "Failed to create kqueue.");
}
@ -57,9 +57,8 @@ final class KqueueEventLoop : PosixEventLoop {
ts.tv_sec = cast(time_t)secs;
ts.tv_nsec = cast(uint)hnsecs * 100;
auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changes.length, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts);
m_changes.length = 0;
m_changes.assumeSafeAppend();
auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts);
m_changeCount = 0;
//print("kevent returned %s", ret);
@ -97,11 +96,11 @@ final class KqueueEventLoop : PosixEventLoop {
if (edge_triggered) ev.flags |= EV_CLEAR;
if (mask & EventMask.read) {
ev.filter = EVFILT_READ;
m_changes ~= ev;
putChange(ev);
}
if (mask & EventMask.write) {
ev.filter = EVFILT_WRITE;
m_changes ~= ev;
putChange(ev);
}
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
}
@ -111,7 +110,7 @@ final class KqueueEventLoop : PosixEventLoop {
kevent_t ev;
ev.ident = fd;
ev.flags = EV_DELETE;
m_changes ~= ev;
putChange(ev);
}
override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true)
@ -124,16 +123,26 @@ final class KqueueEventLoop : PosixEventLoop {
ev.filter = EVFILT_READ;
ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev;
putChange(ev);
}
if (changes & EventMask.write) {
ev.filter = EVFILT_WRITE;
ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE;
if (edge_triggered) ev.flags |= EV_CLEAR;
m_changes ~= ev;
putChange(ev);
}
//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
}
private void putChange(ref kevent_t ev)
@safe nothrow @nogc {
m_changes[m_changeCount++] = ev;
if (m_changeCount == m_changes.length) {
auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) ();
assert(ret == 0);
m_changeCount = 0;
}
}
}

View file

@ -17,7 +17,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
this(Loop loop) @nogc { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{

View file

@ -654,7 +654,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false)
{
@nogc {
auto fd = DatagramSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init;
@ -742,8 +742,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
on_receive_finish(socket, IOStatus.ok, ret, src_addrc);
}
package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish)
@trusted @nogc {
scope void delegate() @safe nothrow do_it = {
receive(socket, buffer, mode, on_receive_finish);
};
(cast(void delegate() @safe nothrow @nogc)do_it)();
}
void cancelReceive(DatagramSocketFD socket)
{
@nogc {
assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null);
m_loop.m_fds[socket].datagramSocket.readBuffer = null;
@ -855,7 +863,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
final override bool releaseRef(SocketFD fd)
{
@nogc {
import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");

View file

@ -245,7 +245,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
}
this(Events events)
{
@nogc {
m_events = events;
}