Fix waiterCount not including waiting processes

This commit is contained in:
Benjamin Schaaf 2019-02-21 22:13:05 +11:00
parent 7ef5c6686e
commit 1afc5ca1c0
2 changed files with 15 additions and 9 deletions

View file

@ -40,7 +40,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
private { private {
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver); alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver);
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver); alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop; version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop; else alias SignalsDriver = DummyEventDriverSignals!Loop;
@ -78,12 +78,12 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_events = mallocT!EventsDriver(m_loop, m_sockets); m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop); m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver; m_timers = mallocT!TimerDriver;
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events); m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, m_pipes);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_dns = mallocT!DNSDriver(m_events, m_signals); m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events); m_files = mallocT!FileDriver(m_events);
m_pipes = mallocT!PipeDriver(m_loop);
m_watchers = mallocT!WatcherDriver(m_events); m_watchers = mallocT!WatcherDriver(m_events);
m_processes = mallocT!ProcessDriver(m_loop, m_pipes);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls
@ -149,7 +149,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
} }
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore { final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore {
@safe nothrow: @safe nothrow:
import core.atomic : atomicLoad, atomicStore; import core.atomic : atomicLoad, atomicStore;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
@ -163,6 +163,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Loop m_loop; Loop m_loop;
Timers m_timers; Timers m_timers;
Events m_events; Events m_events;
Processes m_processes;
bool m_exit = false; bool m_exit = false;
EventID m_wakeupEvent; EventID m_wakeupEvent;
@ -170,11 +171,12 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks; ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
} }
this(Loop loop, Timers timers, Events events) this(Loop loop, Timers timers, Events events, Processes processes)
@nogc { @nogc {
m_loop = loop; m_loop = loop;
m_timers = timers; m_timers = timers;
m_events = events; m_events = events;
m_processes = processes;
m_wakeupEvent = events.createInternal(); m_wakeupEvent = events.createInternal();
static if (__VERSION__ >= 2074) static if (__VERSION__ >= 2074)
@ -198,7 +200,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; } @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
final override ExitReason processEvents(Duration timeout) final override ExitReason processEvents(Duration timeout)
{ {
@ -357,11 +359,11 @@ package class PosixEventLoop {
// ensure that the FD doesn't get closed before the callback gets called. // ensure that the FD doesn't get closed before the callback gets called.
with (m_fds[fd.value]) { with (m_fds[fd.value]) {
if (callback !is null) { if (callback !is null) {
m_waiterCount++; if (!(common.flags & FDFlags.internal)) m_waiterCount++;
common.refCount++; common.refCount++;
} else { } else {
common.refCount--; common.refCount--;
m_waiterCount--; if (!(common.flags & FDFlags.internal)) m_waiterCount--;
} }
common.callback[evt] = callback; common.callback[evt] = callback;
} }

View file

@ -291,6 +291,8 @@ final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProce
} }
return info.userData.ptr; return info.userData.ptr;
} }
package final @property size_t pendingCount() const nothrow { return m_processes.length; }
} }
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@ -344,4 +346,6 @@ final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
@system { @system {
assert(false, "TODO!"); assert(false, "TODO!");
} }
package final @property size_t pendingCount() const nothrow { return 0; }
} }