Fix waiter count tracking in the Posix driver. Fixes #8.

This commit is contained in:
Sönke Ludwig 2017-06-10 10:27:55 +02:00
parent abf8587078
commit d99eb1be34
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C
8 changed files with 108 additions and 38 deletions

View file

@ -45,7 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
this(Events events, Signals signals)
{
m_events = events;
m_event = events.create();
m_event = events.createInternal();
m_events.wait(m_event, &onDNSSignal);
}
@ -70,6 +70,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
try t.executeInNewThread();//taskPool.put(t);
catch (Exception e) return DNSLookupID.invalid;
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
m_events.loop.m_waiterCount++;
return handle;
}
@ -89,6 +90,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
override void cancelLookup(DNSLookupID handle)
{
m_lookups[handle].callback = null;
m_events.loop.m_waiterCount--;
}
private void onDNSSignal(EventID event)
@ -113,6 +115,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
l.result = null;
l.retcode = 0;
if (i == m_maxHandle) m_maxHandle = lastmax;
m_events.loop.m_waiterCount--;
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
} else lastmax = i;
}
@ -152,7 +155,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
{
m_signals = signals;
m_dnsSignal = () @trusted { return SIGRTMIN; } ();
m_sighandle = signals.listen(m_dnsSignal, &onDNSSignal);
m_sighandle = signals.listenInternal(m_dnsSignal, &onDNSSignal);
}
void dispose()
@ -177,6 +180,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
return DNSLookupID.invalid;
m_lookups[handle].callback = on_lookup_finished;
m_events.loop.m_waiterCount++;
return handle;
}
@ -185,6 +189,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
{
gai_cancel(&m_lookups[handle].ctx);
m_lookups[handle].callback = null;
m_events.loop.m_waiterCount--;
}
private void onDNSSignal(SignalListenID, SignalStatus status, int signal)
@ -204,6 +209,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
auto ai = l.ctx.ar_result;
l.callback = null;
l.ctx.ar_result = null;
m_events.loop.m_waiterCount--;
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
}
}

View file

@ -115,10 +115,10 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_loop = loop;
m_timers = timers;
m_events = events;
m_wakeupEvent = events.create();
m_wakeupEvent = events.createInternal();
}
@property size_t waiterCount() const { return m_loop.m_waiterCount; }
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
final override ExitReason processEvents(Duration timeout)
{
@ -241,25 +241,34 @@ package class PosixEventLoop {
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
"Overwriting notification callback.");
// ensure that the FD doesn't get closed before the callback gets called.
with (m_fds[fd.value]) {
if (callback !is null) {
m_waiterCount++;
m_fds[fd.value].common.refCount++;
if (!(common.flags & FDFlags.internal)) m_waiterCount++;
common.refCount++;
} else {
m_fds[fd.value].common.refCount--;
m_waiterCount--;
common.refCount--;
if (!(common.flags & FDFlags.internal)) m_waiterCount--;
}
common.callback[evt] = callback;
}
m_fds[fd.value].common.callback[evt] = callback;
}
package void initFD(FD fd)
package void initFD(FD fd, FDFlags flags)
{
m_fds[fd.value].common.refCount = 1;
with (m_fds[fd.value]) {
common.refCount = 1;
common.flags = flags;
}
}
package void clearFD(FD fd)
{
if (m_fds[fd.value].common.userDataDestructor)
() @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } ();
if (!(m_fds[fd.value].common.flags & FDFlags.internal))
foreach (cb; m_fds[fd.value].common.callback)
if (cb !is null)
m_waiterCount--;
m_fds[fd.value] = m_fds.FullField.init;
}
}
@ -272,6 +281,7 @@ alias FDSlotCallback = void delegate(FD);
private struct FDSlot {
FDSlotCallback[EventType.max+1] callback;
uint refCount;
FDFlags flags;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
@ -285,6 +295,11 @@ private struct FDSlot {
}
}
enum FDFlags {
none = 0,
internal = 1<<0,
}
enum EventType {
read,
write,

View file

@ -32,23 +32,30 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
m_sockets = sockets;
}
package @property Loop loop() { return m_loop; }
final override EventID create()
{
return createInternal(false);
}
package(eventcore) EventID createInternal(bool is_internal = true)
{
version (linux) {
auto eid = eventfd(0, EFD_NONBLOCK);
if (eid == -1) return EventID.invalid;
auto id = cast(EventID)eid;
m_loop.initFD(id);
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
m_loop.initFD(id, FDFlags.internal);
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
return id;
} else {
auto addr = new InternetAddress(0x7F000001, 0);
auto s = m_sockets.createDatagramSocket(addr, addr);
auto s = m_sockets.createDatagramSocketInternal(addr, addr, true);
if (s == DatagramSocketFD.invalid) return EventID.invalid;
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
return cast(EventID)s;
}
}
@ -60,12 +67,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
foreach (w; slot.waiters.consume) {
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
m_loop.m_waiterCount--;
if (!isInternal(event)) m_loop.m_waiterCount--;
w(event);
}
} else {
if (!slot.waiters.empty) {
m_loop.m_waiterCount--;
if (!isInternal(event)) m_loop.m_waiterCount--;
slot.waiters.consumeOne()(event);
}
}
@ -85,7 +92,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
final override void wait(EventID event, EventCallback on_event)
{
m_loop.m_waiterCount++;
if (!isInternal(event)) m_loop.m_waiterCount++;
getSlot(event).waiters.put(on_event);
}
@ -94,7 +101,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove;
m_loop.m_waiterCount--;
if (!isInternal(event)) m_loop.m_waiterCount--;
getSlot(event).waiters.removePending(on_event);
}
@ -171,10 +178,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
{
return m_loop.m_fds[id].common.refCount;
}
private bool isInternal(EventID id)
{
return getSlot(id).isInternal;
}
}
package struct EventSlot {
alias Handle = EventID;
ConsumableQueue!EventCallback waiters;
shared bool triggerAll;
bool isInternal;
}

View file

@ -19,6 +19,11 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
this(Loop loop) { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{
return listenInternal(sig, on_signal, false);
}
package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true)
{
auto fd = () @trusted {
sigset_t sset;
@ -32,7 +37,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
} ();
m_loop.initFD(cast(FD)fd);
m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none);
m_loop.m_fds[fd].specific = SignalSlot(on_signal);
m_loop.registerFD(cast(FD)fd, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
@ -52,7 +57,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
{
FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference
m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd);
close(cast(int)fd);
@ -89,6 +94,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals
this(Loop loop) { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{
return listenInternal(sig, on_signal, false);
}
package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true)
{
assert(false);
}

View file

@ -62,7 +62,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return sock;
}
m_loop.initFD(sock);
m_loop.initFD(sock, FDFlags.none);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
@ -98,7 +98,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return StreamSocketFD.invalid;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.initFD(fd, FDFlags.none);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
return fd;
@ -168,7 +168,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (sock == StreamListenSocketFD.invalid)
return sock;
m_loop.initFD(sock);
m_loop.initFD(sock, FDFlags.none);
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
if (on_accept) waitForConnections(sock, on_accept);
@ -196,7 +196,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
setSocketNonBlocking(cast(SocketFD)sockfd);
auto fd = cast(StreamSocketFD)sockfd;
m_loop.initFD(fd);
m_loop.initFD(fd, FDFlags.none);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
@ -502,6 +502,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
{
return createDatagramSocketInternal(bind_address, target_address, false);
}
package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address, scope Address target_address, bool is_internal = true)
{
auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM);
if (sockfd == -1) return DatagramSocketFD.invalid;
@ -533,7 +538,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
}
}
m_loop.initFD(sock);
m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none);
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
@ -546,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.initFD(fd, FDFlags.none);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
return fd;
@ -710,8 +715,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
final override bool releaseRef(SocketFD fd)
{
import taggedalgebraic : hasType;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
// listening sockets have an incremented the reference count because of setNotifyCallback
int base_refcount = m_loop.m_fds[fd].specific.hasType!StreamListenSocketSlot ? 1 : 0;
if (--m_loop.m_fds[fd].common.refCount == base_refcount) {
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD(fd);
closeSocket(cast(sock_t)fd);

View file

@ -39,7 +39,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
}
}
m_loop.initFD(FD(handle));
m_loop.initFD(FD(handle), FDFlags.none);
m_loop.registerFD(FD(handle), EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
m_loop.m_fds[handle].specific = WatcherSlot(callback);
@ -59,7 +59,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
{
FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count
m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd);
m_watches.remove(descriptor);
@ -103,10 +103,10 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
ch.directory = m_watches[id][ev.wd];
ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
ch.name = name;
addRef(id);
auto cb = m_loop.m_fds[ id].watcher.callback;
addRef(id); // assure that the id doesn't get invalidated until after the callback
auto cb = m_loop.m_fds[id].watcher.callback;
cb(id, ch);
if (!releaseRef(id)) break;
if (!releaseRef(id)) return;
rem = rem[inotify_event.sizeof + ev.len .. $];
}

View file

@ -99,6 +99,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
SmallIntegerSet!FileFD m_activeReads;
SmallIntegerSet!FileFD m_activeWrites;
EventID m_readyEvent;
bool m_waiting;
Events m_events;
}
@ -108,7 +109,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
{
m_events = events;
m_readyEvent = events.create();
m_events.wait(m_readyEvent, &onReady);
}
void dispose()
@ -191,7 +191,9 @@ log("start task");
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
startWaiting();
} catch (Exception e) {
m_activeWrites.remove(file);
on_write_finish(file, IOStatus.error, 0);
return;
}
@ -201,6 +203,7 @@ log("start task");
{
auto f = &m_files[file].write;
m_activeWrites.remove(file);
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
assert(res, "Cancelling write when no write is in progress.");
}
@ -217,7 +220,9 @@ log("start task");
m_fileThreadPool.isDaemon = true;
}
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
startWaiting();
} catch (Exception e) {
m_activeReads.remove(file);
on_read_finish(file, IOStatus.error, 0);
return;
}
@ -227,6 +232,7 @@ log("start task");
{
auto f = &m_files[file].read;
m_activeReads.remove(file);
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
assert(res, "Cancelling read when no write is in progress.");
}
@ -252,7 +258,7 @@ log("start task");
/// private
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer)
{
log("ready event");
log("task fun");
IOInfo* f = mixin("&fd.m_files[file]."~op);
log("wait for cancel");
@ -318,7 +324,17 @@ log("ready event");
m_files[f].write.flush(f);
}
m_waiting = false;
startWaiting();
}
private void startWaiting()
{
if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) {
log("wait for ready");
m_events.wait(m_readyEvent, &onReady);
m_waiting = true;
}
}
}

View file

@ -30,6 +30,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers {
ms_allocator.parent = Mallocator.instance;
}
package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; }
final package Duration getNextTimeout(long stdtime)
@safe nothrow {
if (m_timerQueue.length == 0) return Duration.max;