Merge pull request #11 from vibe-d/waiter_count
Fix automatic exiting of the event loop/waiter count tracking
This commit is contained in:
commit
68e8fcb21d
|
@ -45,7 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
||||||
this(Events events, Signals signals)
|
this(Events events, Signals signals)
|
||||||
{
|
{
|
||||||
m_events = events;
|
m_events = events;
|
||||||
m_event = events.create();
|
m_event = events.createInternal();
|
||||||
m_events.wait(m_event, &onDNSSignal);
|
m_events.wait(m_event, &onDNSSignal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
||||||
try t.executeInNewThread();//taskPool.put(t);
|
try t.executeInNewThread();//taskPool.put(t);
|
||||||
catch (Exception e) return DNSLookupID.invalid;
|
catch (Exception e) return DNSLookupID.invalid;
|
||||||
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
|
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
|
||||||
|
m_events.loop.m_waiterCount++;
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,6 +90,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
||||||
override void cancelLookup(DNSLookupID handle)
|
override void cancelLookup(DNSLookupID handle)
|
||||||
{
|
{
|
||||||
m_lookups[handle].callback = null;
|
m_lookups[handle].callback = null;
|
||||||
|
m_events.loop.m_waiterCount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onDNSSignal(EventID event)
|
private void onDNSSignal(EventID event)
|
||||||
|
@ -113,6 +115,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
||||||
l.result = null;
|
l.result = null;
|
||||||
l.retcode = 0;
|
l.retcode = 0;
|
||||||
if (i == m_maxHandle) m_maxHandle = lastmax;
|
if (i == m_maxHandle) m_maxHandle = lastmax;
|
||||||
|
m_events.loop.m_waiterCount--;
|
||||||
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
|
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
|
||||||
} else lastmax = i;
|
} else lastmax = i;
|
||||||
}
|
}
|
||||||
|
@ -152,7 +155,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
||||||
{
|
{
|
||||||
m_signals = signals;
|
m_signals = signals;
|
||||||
m_dnsSignal = () @trusted { return SIGRTMIN; } ();
|
m_dnsSignal = () @trusted { return SIGRTMIN; } ();
|
||||||
m_sighandle = signals.listen(m_dnsSignal, &onDNSSignal);
|
m_sighandle = signals.listenInternal(m_dnsSignal, &onDNSSignal);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dispose()
|
void dispose()
|
||||||
|
@ -177,6 +180,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
||||||
return DNSLookupID.invalid;
|
return DNSLookupID.invalid;
|
||||||
|
|
||||||
m_lookups[handle].callback = on_lookup_finished;
|
m_lookups[handle].callback = on_lookup_finished;
|
||||||
|
m_events.loop.m_waiterCount++;
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
@ -185,6 +189,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
||||||
{
|
{
|
||||||
gai_cancel(&m_lookups[handle].ctx);
|
gai_cancel(&m_lookups[handle].ctx);
|
||||||
m_lookups[handle].callback = null;
|
m_lookups[handle].callback = null;
|
||||||
|
m_events.loop.m_waiterCount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onDNSSignal(SignalListenID, SignalStatus status, int signal)
|
private void onDNSSignal(SignalListenID, SignalStatus status, int signal)
|
||||||
|
@ -204,6 +209,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
||||||
auto ai = l.ctx.ar_result;
|
auto ai = l.ctx.ar_result;
|
||||||
l.callback = null;
|
l.callback = null;
|
||||||
l.ctx.ar_result = null;
|
l.ctx.ar_result = null;
|
||||||
|
m_events.loop.m_waiterCount--;
|
||||||
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
|
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,10 +115,10 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
||||||
m_loop = loop;
|
m_loop = loop;
|
||||||
m_timers = timers;
|
m_timers = timers;
|
||||||
m_events = events;
|
m_events = events;
|
||||||
m_wakeupEvent = events.create();
|
m_wakeupEvent = events.createInternal();
|
||||||
}
|
}
|
||||||
|
|
||||||
@property size_t waiterCount() const { return m_loop.m_waiterCount; }
|
@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount; }
|
||||||
|
|
||||||
final override ExitReason processEvents(Duration timeout)
|
final override ExitReason processEvents(Duration timeout)
|
||||||
{
|
{
|
||||||
|
@ -241,25 +241,34 @@ package class PosixEventLoop {
|
||||||
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
|
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
|
||||||
"Overwriting notification callback.");
|
"Overwriting notification callback.");
|
||||||
// ensure that the FD doesn't get closed before the callback gets called.
|
// ensure that the FD doesn't get closed before the callback gets called.
|
||||||
|
with (m_fds[fd.value]) {
|
||||||
if (callback !is null) {
|
if (callback !is null) {
|
||||||
m_waiterCount++;
|
if (!(common.flags & FDFlags.internal)) m_waiterCount++;
|
||||||
m_fds[fd.value].common.refCount++;
|
common.refCount++;
|
||||||
} else {
|
} else {
|
||||||
m_fds[fd.value].common.refCount--;
|
common.refCount--;
|
||||||
m_waiterCount--;
|
if (!(common.flags & FDFlags.internal)) m_waiterCount--;
|
||||||
|
}
|
||||||
|
common.callback[evt] = callback;
|
||||||
}
|
}
|
||||||
m_fds[fd.value].common.callback[evt] = callback;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
package void initFD(FD fd)
|
package void initFD(FD fd, FDFlags flags)
|
||||||
{
|
{
|
||||||
m_fds[fd.value].common.refCount = 1;
|
with (m_fds[fd.value]) {
|
||||||
|
common.refCount = 1;
|
||||||
|
common.flags = flags;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
package void clearFD(FD fd)
|
package void clearFD(FD fd)
|
||||||
{
|
{
|
||||||
if (m_fds[fd.value].common.userDataDestructor)
|
if (m_fds[fd.value].common.userDataDestructor)
|
||||||
() @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } ();
|
() @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } ();
|
||||||
|
if (!(m_fds[fd.value].common.flags & FDFlags.internal))
|
||||||
|
foreach (cb; m_fds[fd.value].common.callback)
|
||||||
|
if (cb !is null)
|
||||||
|
m_waiterCount--;
|
||||||
m_fds[fd.value] = m_fds.FullField.init;
|
m_fds[fd.value] = m_fds.FullField.init;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -272,6 +281,7 @@ alias FDSlotCallback = void delegate(FD);
|
||||||
private struct FDSlot {
|
private struct FDSlot {
|
||||||
FDSlotCallback[EventType.max+1] callback;
|
FDSlotCallback[EventType.max+1] callback;
|
||||||
uint refCount;
|
uint refCount;
|
||||||
|
FDFlags flags;
|
||||||
|
|
||||||
DataInitializer userDataDestructor;
|
DataInitializer userDataDestructor;
|
||||||
ubyte[16*size_t.sizeof] userData;
|
ubyte[16*size_t.sizeof] userData;
|
||||||
|
@ -285,6 +295,11 @@ private struct FDSlot {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum FDFlags {
|
||||||
|
none = 0,
|
||||||
|
internal = 1<<0,
|
||||||
|
}
|
||||||
|
|
||||||
enum EventType {
|
enum EventType {
|
||||||
read,
|
read,
|
||||||
write,
|
write,
|
||||||
|
|
|
@ -32,23 +32,30 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
m_sockets = sockets;
|
m_sockets = sockets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
package @property Loop loop() { return m_loop; }
|
||||||
|
|
||||||
final override EventID create()
|
final override EventID create()
|
||||||
|
{
|
||||||
|
return createInternal(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
package(eventcore) EventID createInternal(bool is_internal = true)
|
||||||
{
|
{
|
||||||
version (linux) {
|
version (linux) {
|
||||||
auto eid = eventfd(0, EFD_NONBLOCK);
|
auto eid = eventfd(0, EFD_NONBLOCK);
|
||||||
if (eid == -1) return EventID.invalid;
|
if (eid == -1) return EventID.invalid;
|
||||||
auto id = cast(EventID)eid;
|
auto id = cast(EventID)eid;
|
||||||
m_loop.initFD(id);
|
m_loop.initFD(id, FDFlags.internal);
|
||||||
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
|
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
||||||
m_loop.registerFD(id, EventMask.read);
|
m_loop.registerFD(id, EventMask.read);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
||||||
return id;
|
return id;
|
||||||
} else {
|
} else {
|
||||||
auto addr = new InternetAddress(0x7F000001, 0);
|
auto addr = new InternetAddress(0x7F000001, 0);
|
||||||
auto s = m_sockets.createDatagramSocket(addr, addr);
|
auto s = m_sockets.createDatagramSocketInternal(addr, addr, true);
|
||||||
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
||||||
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
|
m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
||||||
return cast(EventID)s;
|
return cast(EventID)s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,12 +67,12 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
|
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
|
||||||
foreach (w; slot.waiters.consume) {
|
foreach (w; slot.waiters.consume) {
|
||||||
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
|
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
|
||||||
m_loop.m_waiterCount--;
|
if (!isInternal(event)) m_loop.m_waiterCount--;
|
||||||
w(event);
|
w(event);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!slot.waiters.empty) {
|
if (!slot.waiters.empty) {
|
||||||
m_loop.m_waiterCount--;
|
if (!isInternal(event)) m_loop.m_waiterCount--;
|
||||||
slot.waiters.consumeOne()(event);
|
slot.waiters.consumeOne()(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,7 +92,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
|
|
||||||
final override void wait(EventID event, EventCallback on_event)
|
final override void wait(EventID event, EventCallback on_event)
|
||||||
{
|
{
|
||||||
m_loop.m_waiterCount++;
|
if (!isInternal(event)) m_loop.m_waiterCount++;
|
||||||
getSlot(event).waiters.put(on_event);
|
getSlot(event).waiters.put(on_event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +101,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
import std.algorithm.searching : countUntil;
|
import std.algorithm.searching : countUntil;
|
||||||
import std.algorithm.mutation : remove;
|
import std.algorithm.mutation : remove;
|
||||||
|
|
||||||
m_loop.m_waiterCount--;
|
if (!isInternal(event)) m_loop.m_waiterCount--;
|
||||||
getSlot(event).waiters.removePending(on_event);
|
getSlot(event).waiters.removePending(on_event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,10 +178,16 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
{
|
{
|
||||||
return m_loop.m_fds[id].common.refCount;
|
return m_loop.m_fds[id].common.refCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private bool isInternal(EventID id)
|
||||||
|
{
|
||||||
|
return getSlot(id).isInternal;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
package struct EventSlot {
|
package struct EventSlot {
|
||||||
alias Handle = EventID;
|
alias Handle = EventID;
|
||||||
ConsumableQueue!EventCallback waiters;
|
ConsumableQueue!EventCallback waiters;
|
||||||
shared bool triggerAll;
|
shared bool triggerAll;
|
||||||
|
bool isInternal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,11 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
|
||||||
this(Loop loop) { m_loop = loop; }
|
this(Loop loop) { m_loop = loop; }
|
||||||
|
|
||||||
override SignalListenID listen(int sig, SignalCallback on_signal)
|
override SignalListenID listen(int sig, SignalCallback on_signal)
|
||||||
|
{
|
||||||
|
return listenInternal(sig, on_signal, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true)
|
||||||
{
|
{
|
||||||
auto fd = () @trusted {
|
auto fd = () @trusted {
|
||||||
sigset_t sset;
|
sigset_t sset;
|
||||||
|
@ -32,7 +37,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
|
||||||
} ();
|
} ();
|
||||||
|
|
||||||
|
|
||||||
m_loop.initFD(cast(FD)fd);
|
m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none);
|
||||||
m_loop.m_fds[fd].specific = SignalSlot(on_signal);
|
m_loop.m_fds[fd].specific = SignalSlot(on_signal);
|
||||||
m_loop.registerFD(cast(FD)fd, EventMask.read);
|
m_loop.registerFD(cast(FD)fd, EventMask.read);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
|
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
|
||||||
|
@ -52,7 +57,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
|
||||||
{
|
{
|
||||||
FD fd = cast(FD)descriptor;
|
FD fd = cast(FD)descriptor;
|
||||||
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
||||||
if (--m_loop.m_fds[fd].common.refCount == 0) {
|
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference
|
||||||
m_loop.unregisterFD(fd, EventMask.read);
|
m_loop.unregisterFD(fd, EventMask.read);
|
||||||
m_loop.clearFD(fd);
|
m_loop.clearFD(fd);
|
||||||
close(cast(int)fd);
|
close(cast(int)fd);
|
||||||
|
@ -89,6 +94,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals
|
||||||
this(Loop loop) { m_loop = loop; }
|
this(Loop loop) { m_loop = loop; }
|
||||||
|
|
||||||
override SignalListenID listen(int sig, SignalCallback on_signal)
|
override SignalListenID listen(int sig, SignalCallback on_signal)
|
||||||
|
{
|
||||||
|
return listenInternal(sig, on_signal, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true)
|
||||||
{
|
{
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_loop.initFD(sock);
|
m_loop.initFD(sock, FDFlags.none);
|
||||||
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
|
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
|
||||||
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
|
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
|
||||||
|
@ -98,7 +98,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||||
return StreamSocketFD.invalid;
|
return StreamSocketFD.invalid;
|
||||||
setSocketNonBlocking(fd);
|
setSocketNonBlocking(fd);
|
||||||
m_loop.initFD(fd);
|
m_loop.initFD(fd, FDFlags.none);
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
||||||
return fd;
|
return fd;
|
||||||
|
@ -168,7 +168,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (sock == StreamListenSocketFD.invalid)
|
if (sock == StreamListenSocketFD.invalid)
|
||||||
return sock;
|
return sock;
|
||||||
|
|
||||||
m_loop.initFD(sock);
|
m_loop.initFD(sock, FDFlags.none);
|
||||||
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
|
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
|
||||||
|
|
||||||
if (on_accept) waitForConnections(sock, on_accept);
|
if (on_accept) waitForConnections(sock, on_accept);
|
||||||
|
@ -196,7 +196,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
|
|
||||||
setSocketNonBlocking(cast(SocketFD)sockfd);
|
setSocketNonBlocking(cast(SocketFD)sockfd);
|
||||||
auto fd = cast(StreamSocketFD)sockfd;
|
auto fd = cast(StreamSocketFD)sockfd;
|
||||||
m_loop.initFD(fd);
|
m_loop.initFD(fd, FDFlags.none);
|
||||||
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
|
||||||
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
|
@ -502,6 +502,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
}
|
}
|
||||||
|
|
||||||
final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
|
final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
|
||||||
|
{
|
||||||
|
return createDatagramSocketInternal(bind_address, target_address, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address, scope Address target_address, bool is_internal = true)
|
||||||
{
|
{
|
||||||
auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM);
|
auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM);
|
||||||
if (sockfd == -1) return DatagramSocketFD.invalid;
|
if (sockfd == -1) return DatagramSocketFD.invalid;
|
||||||
|
@ -533,7 +538,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m_loop.initFD(sock);
|
m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none);
|
||||||
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
|
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
|
||||||
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
|
||||||
|
|
||||||
|
@ -546,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||||
return DatagramSocketFD.init;
|
return DatagramSocketFD.init;
|
||||||
setSocketNonBlocking(fd);
|
setSocketNonBlocking(fd);
|
||||||
m_loop.initFD(fd);
|
m_loop.initFD(fd, FDFlags.none);
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
|
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
|
||||||
return fd;
|
return fd;
|
||||||
|
@ -710,8 +715,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
|
|
||||||
final override bool releaseRef(SocketFD fd)
|
final override bool releaseRef(SocketFD fd)
|
||||||
{
|
{
|
||||||
|
import taggedalgebraic : hasType;
|
||||||
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
|
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
|
||||||
if (--m_loop.m_fds[fd].common.refCount == 0) {
|
// listening sockets have an incremented the reference count because of setNotifyCallback
|
||||||
|
int base_refcount = m_loop.m_fds[fd].specific.hasType!StreamListenSocketSlot ? 1 : 0;
|
||||||
|
if (--m_loop.m_fds[fd].common.refCount == base_refcount) {
|
||||||
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.clearFD(fd);
|
m_loop.clearFD(fd);
|
||||||
closeSocket(cast(sock_t)fd);
|
closeSocket(cast(sock_t)fd);
|
||||||
|
|
|
@ -39,7 +39,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m_loop.initFD(FD(handle));
|
m_loop.initFD(FD(handle), FDFlags.none);
|
||||||
m_loop.registerFD(FD(handle), EventMask.read);
|
m_loop.registerFD(FD(handle), EventMask.read);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
|
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
|
||||||
m_loop.m_fds[handle].specific = WatcherSlot(callback);
|
m_loop.m_fds[handle].specific = WatcherSlot(callback);
|
||||||
|
@ -59,7 +59,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
|
||||||
{
|
{
|
||||||
FD fd = cast(FD)descriptor;
|
FD fd = cast(FD)descriptor;
|
||||||
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
||||||
if (--m_loop.m_fds[fd].common.refCount == 0) {
|
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count
|
||||||
m_loop.unregisterFD(fd, EventMask.read);
|
m_loop.unregisterFD(fd, EventMask.read);
|
||||||
m_loop.clearFD(fd);
|
m_loop.clearFD(fd);
|
||||||
m_watches.remove(descriptor);
|
m_watches.remove(descriptor);
|
||||||
|
@ -103,10 +103,10 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
|
||||||
ch.directory = m_watches[id][ev.wd];
|
ch.directory = m_watches[id][ev.wd];
|
||||||
ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
|
ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
|
||||||
ch.name = name;
|
ch.name = name;
|
||||||
addRef(id);
|
addRef(id); // assure that the id doesn't get invalidated until after the callback
|
||||||
auto cb = m_loop.m_fds[ id].watcher.callback;
|
auto cb = m_loop.m_fds[id].watcher.callback;
|
||||||
cb(id, ch);
|
cb(id, ch);
|
||||||
if (!releaseRef(id)) break;
|
if (!releaseRef(id)) return;
|
||||||
|
|
||||||
rem = rem[inotify_event.sizeof + ev.len .. $];
|
rem = rem[inotify_event.sizeof + ev.len .. $];
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
|
||||||
SmallIntegerSet!FileFD m_activeReads;
|
SmallIntegerSet!FileFD m_activeReads;
|
||||||
SmallIntegerSet!FileFD m_activeWrites;
|
SmallIntegerSet!FileFD m_activeWrites;
|
||||||
EventID m_readyEvent;
|
EventID m_readyEvent;
|
||||||
|
bool m_waiting;
|
||||||
Events m_events;
|
Events m_events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +109,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
|
||||||
{
|
{
|
||||||
m_events = events;
|
m_events = events;
|
||||||
m_readyEvent = events.create();
|
m_readyEvent = events.create();
|
||||||
m_events.wait(m_readyEvent, &onReady);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dispose()
|
void dispose()
|
||||||
|
@ -191,7 +191,9 @@ log("start task");
|
||||||
m_fileThreadPool.isDaemon = true;
|
m_fileThreadPool.isDaemon = true;
|
||||||
}
|
}
|
||||||
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
|
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
|
||||||
|
startWaiting();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
m_activeWrites.remove(file);
|
||||||
on_write_finish(file, IOStatus.error, 0);
|
on_write_finish(file, IOStatus.error, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -201,6 +203,7 @@ log("start task");
|
||||||
{
|
{
|
||||||
auto f = &m_files[file].write;
|
auto f = &m_files[file].write;
|
||||||
m_activeWrites.remove(file);
|
m_activeWrites.remove(file);
|
||||||
|
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
|
||||||
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
||||||
assert(res, "Cancelling write when no write is in progress.");
|
assert(res, "Cancelling write when no write is in progress.");
|
||||||
}
|
}
|
||||||
|
@ -217,7 +220,9 @@ log("start task");
|
||||||
m_fileThreadPool.isDaemon = true;
|
m_fileThreadPool.isDaemon = true;
|
||||||
}
|
}
|
||||||
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
|
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
|
||||||
|
startWaiting();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
m_activeReads.remove(file);
|
||||||
on_read_finish(file, IOStatus.error, 0);
|
on_read_finish(file, IOStatus.error, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -227,6 +232,7 @@ log("start task");
|
||||||
{
|
{
|
||||||
auto f = &m_files[file].read;
|
auto f = &m_files[file].read;
|
||||||
m_activeReads.remove(file);
|
m_activeReads.remove(file);
|
||||||
|
m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
|
||||||
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
||||||
assert(res, "Cancelling read when no write is in progress.");
|
assert(res, "Cancelling read when no write is in progress.");
|
||||||
}
|
}
|
||||||
|
@ -252,7 +258,7 @@ log("start task");
|
||||||
/// private
|
/// private
|
||||||
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer)
|
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer)
|
||||||
{
|
{
|
||||||
log("ready event");
|
log("task fun");
|
||||||
IOInfo* f = mixin("&fd.m_files[file]."~op);
|
IOInfo* f = mixin("&fd.m_files[file]."~op);
|
||||||
log("wait for cancel");
|
log("wait for cancel");
|
||||||
|
|
||||||
|
@ -318,7 +324,17 @@ log("ready event");
|
||||||
m_files[f].write.flush(f);
|
m_files[f].write.flush(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_waiting = false;
|
||||||
|
startWaiting();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startWaiting()
|
||||||
|
{
|
||||||
|
if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) {
|
||||||
|
log("wait for ready");
|
||||||
m_events.wait(m_readyEvent, &onReady);
|
m_events.wait(m_readyEvent, &onReady);
|
||||||
|
m_waiting = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,8 @@ final class LoopTimeoutTimerDriver : EventDriverTimers {
|
||||||
ms_allocator.parent = Mallocator.instance;
|
ms_allocator.parent = Mallocator.instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; }
|
||||||
|
|
||||||
final package Duration getNextTimeout(long stdtime)
|
final package Duration getNextTimeout(long stdtime)
|
||||||
@safe nothrow {
|
@safe nothrow {
|
||||||
if (m_timerQueue.length == 0) return Duration.max;
|
if (m_timerQueue.length == 0) return Duration.max;
|
||||||
|
|
|
@ -180,8 +180,11 @@ struct SmallIntegerSet(V : size_t)
|
||||||
{
|
{
|
||||||
private {
|
private {
|
||||||
uint[][4] m_bits;
|
uint[][4] m_bits;
|
||||||
|
size_t m_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@property bool empty() const { return m_count == 0; }
|
||||||
|
|
||||||
void insert(V i)
|
void insert(V i)
|
||||||
{
|
{
|
||||||
foreach (j; 0 .. m_bits.length) {
|
foreach (j; 0 .. m_bits.length) {
|
||||||
|
@ -189,6 +192,7 @@ struct SmallIntegerSet(V : size_t)
|
||||||
i /= 32;
|
i /= 32;
|
||||||
if (i >= m_bits[j].length)
|
if (i >= m_bits[j].length)
|
||||||
m_bits[j].length = nextPOT(i+1);
|
m_bits[j].length = nextPOT(i+1);
|
||||||
|
if (j == 0 && !(m_bits[j][i] & b)) m_count++;
|
||||||
m_bits[j][i] |= b;
|
m_bits[j][i] |= b;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,6 +203,7 @@ struct SmallIntegerSet(V : size_t)
|
||||||
uint b = 1u << (i%32);
|
uint b = 1u << (i%32);
|
||||||
i /= 32;
|
i /= 32;
|
||||||
if (!m_bits[j][i]) break;
|
if (!m_bits[j][i]) break;
|
||||||
|
if (j == 0 && m_bits[j][i] & b) m_count--;
|
||||||
m_bits[j][i] &= ~b;
|
m_bits[j][i] &= ~b;
|
||||||
if (m_bits[j][i]) break;
|
if (m_bits[j][i]) break;
|
||||||
}
|
}
|
||||||
|
@ -237,10 +242,13 @@ unittest {
|
||||||
|
|
||||||
SmallIntegerSet!uint set;
|
SmallIntegerSet!uint set;
|
||||||
bool[uint] controlset;
|
bool[uint] controlset;
|
||||||
|
|
||||||
|
assert(set.empty);
|
||||||
foreach (i; ints) {
|
foreach (i; ints) {
|
||||||
set.insert(i);
|
set.insert(i);
|
||||||
controlset[i] = true;
|
controlset[i] = true;
|
||||||
}
|
}
|
||||||
|
assert(!set.empty);
|
||||||
|
|
||||||
foreach (jidx, j; ints) {
|
foreach (jidx, j; ints) {
|
||||||
size_t cnt = 0;
|
size_t cnt = 0;
|
||||||
|
@ -256,6 +264,7 @@ unittest {
|
||||||
set.remove(j);
|
set.remove(j);
|
||||||
controlset.remove(j);
|
controlset.remove(j);
|
||||||
}
|
}
|
||||||
|
assert(set.empty);
|
||||||
|
|
||||||
foreach (i; set) assert(false);
|
foreach (i; set) assert(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,8 +39,8 @@ void main()
|
||||||
assert(change.kind == FileChangeKind.removed);
|
assert(change.kind == FileChangeKind.removed);
|
||||||
assert(change.directory == ".");
|
assert(change.directory == ".");
|
||||||
assert(change.name == testFilename);
|
assert(change.name == testFilename);
|
||||||
|
eventDriver.watchers.releaseRef(id);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.core.exit();
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -63,7 +63,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
|
|
||||||
|
|
|
@ -21,13 +21,12 @@ void main()
|
||||||
catch (Exception e) assert(false, e.msg);
|
catch (Exception e) assert(false, e.msg);
|
||||||
}
|
}
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.core.exit();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@ void test(bool notify_all)
|
||||||
assert(!s_done);
|
assert(!s_done);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.timers.cancelWait(tm);
|
eventDriver.timers.cancelWait(tm);
|
||||||
eventDriver.core.exit();
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -51,7 +50,7 @@ void test(bool notify_all)
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,6 @@ void main()
|
||||||
auto f = eventDriver.files.open("test.txt", FileOpenMode.createTrunc);
|
auto f = eventDriver.files.open("test.txt", FileOpenMode.createTrunc);
|
||||||
assert(eventDriver.files.getSize(f) == 0);
|
assert(eventDriver.files.getSize(f) == 0);
|
||||||
auto data = cast(const(ubyte)[])"Hello, World!";
|
auto data = cast(const(ubyte)[])"Hello, World!";
|
||||||
auto tm = eventDriver.timers.create();
|
|
||||||
eventDriver.timers.set(tm, 500.msecs, 0.msecs);
|
|
||||||
eventDriver.timers.wait(tm, (tm) {
|
|
||||||
assert(false, "File operation stalled.");
|
|
||||||
});
|
|
||||||
|
|
||||||
eventDriver.files.write(f, 0, data[0 .. 7], IOMode.all, (f, status, nbytes) {
|
eventDriver.files.write(f, 0, data[0 .. 7], IOMode.all, (f, status, nbytes) {
|
||||||
assert(status == IOStatus.ok);
|
assert(status == IOStatus.ok);
|
||||||
|
@ -48,7 +43,6 @@ void main()
|
||||||
} ();
|
} ();
|
||||||
eventDriver.files.releaseRef(f);
|
eventDriver.files.releaseRef(f);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.core.exit();
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -57,7 +51,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,8 @@ void main()
|
||||||
assert(!s_done);
|
assert(!s_done);
|
||||||
assert(status == SignalStatus.ok);
|
assert(status == SignalStatus.ok);
|
||||||
assert(sig == () @trusted { return SIGUSR1; } ());
|
assert(sig == () @trusted { return SIGUSR1; } ());
|
||||||
|
eventDriver.signals.releaseRef(id);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.core.exit();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
auto tm = eventDriver.timers.create();
|
auto tm = eventDriver.timers.create();
|
||||||
|
@ -34,7 +34,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
|
|
||||||
|
|
|
@ -43,9 +43,6 @@ void main()
|
||||||
destroy(server);
|
destroy(server);
|
||||||
destroy(client);
|
destroy(client);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
|
|
||||||
// FIXME: this shouldn't ne necessary:
|
|
||||||
eventDriver.core.exit();
|
|
||||||
})(s_rbuf, IOMode.immediate);
|
})(s_rbuf, IOMode.immediate);
|
||||||
})(s_rbuf[0 .. 0], IOMode.once);
|
})(s_rbuf[0 .. 0], IOMode.once);
|
||||||
});
|
});
|
||||||
|
@ -67,7 +64,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
|
|
||||||
|
|
|
@ -43,9 +43,6 @@ void main()
|
||||||
destroy(server);
|
destroy(server);
|
||||||
destroy(client);
|
destroy(client);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
|
|
||||||
// FIXME: this shouldn't ne necessary:
|
|
||||||
eventDriver.core.exit();
|
|
||||||
})(s_rbuf, IOMode.once);
|
})(s_rbuf, IOMode.once);
|
||||||
})(s_rbuf, IOMode.once);
|
})(s_rbuf, IOMode.once);
|
||||||
});
|
});
|
||||||
|
@ -62,7 +59,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,11 @@ void main()
|
||||||
s_cnt++;
|
s_cnt++;
|
||||||
assert(dur > 100.msecs * s_cnt);
|
assert(dur > 100.msecs * s_cnt);
|
||||||
assert(dur < 100.msecs * s_cnt + 20.msecs);
|
assert(dur < 100.msecs * s_cnt + 20.msecs);
|
||||||
|
assert(s_cnt <= 3);
|
||||||
|
|
||||||
if (s_cnt == 3) {
|
if (s_cnt == 3) {
|
||||||
s_done = true;
|
s_done = true;
|
||||||
eventDriver.core.exit();
|
eventDriver.timers.stop(timer);
|
||||||
} else eventDriver.timers.wait(tm, &secondTier);
|
} else eventDriver.timers.wait(tm, &secondTier);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assert(false, e.msg);
|
assert(false, e.msg);
|
||||||
|
@ -52,7 +53,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,9 +55,6 @@ void main()
|
||||||
destroy(s_connectedSocket);
|
destroy(s_connectedSocket);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
log("done.");
|
log("done.");
|
||||||
|
|
||||||
// FIXME: this shouldn't be necessary:
|
|
||||||
eventDriver.core.exit();
|
|
||||||
})(s_rbuf, IOMode.immediate);
|
})(s_rbuf, IOMode.immediate);
|
||||||
});
|
});
|
||||||
})(s_rbuf, IOMode.once);
|
})(s_rbuf, IOMode.once);
|
||||||
|
@ -70,7 +67,7 @@ void main()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,9 +118,6 @@ void testStream()
|
||||||
destroy(server);
|
destroy(server);
|
||||||
destroy(client);
|
destroy(client);
|
||||||
s_done = true;
|
s_done = true;
|
||||||
|
|
||||||
// FIXME: this shouldn't ne necessary:
|
|
||||||
eventDriver.core.exit();
|
|
||||||
})(s_rbuf, IOMode.once);
|
})(s_rbuf, IOMode.once);
|
||||||
})(s_rbuf, IOMode.once);
|
})(s_rbuf, IOMode.once);
|
||||||
});
|
});
|
||||||
|
@ -137,7 +134,7 @@ void testStream()
|
||||||
ExitReason er;
|
ExitReason er;
|
||||||
do er = eventDriver.core.processEvents(Duration.max);
|
do er = eventDriver.core.processEvents(Duration.max);
|
||||||
while (er == ExitReason.idle);
|
while (er == ExitReason.idle);
|
||||||
//assert(er == ExitReason.outOfWaiters); // FIXME: see above
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
}
|
}
|
||||||
|
|
33
tests/0-waitercount.d
Normal file
33
tests/0-waitercount.d
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
/++ dub.sdl:
|
||||||
|
name "test"
|
||||||
|
dependency "eventcore" path=".."
|
||||||
|
+/
|
||||||
|
module test;
|
||||||
|
|
||||||
|
import eventcore.core;
|
||||||
|
import core.stdc.signal;
|
||||||
|
import core.time : Duration, msecs;
|
||||||
|
import core.thread : Thread;
|
||||||
|
|
||||||
|
|
||||||
|
void test(Duration timeout)
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
auto reason = eventDriver.core.processEvents(timeout);
|
||||||
|
final switch (reason) with (ExitReason) {
|
||||||
|
case exited: assert(false, "Manual exit without call to exit()!?");
|
||||||
|
case timeout: assert(false, "Event loop timed out, although no waiters exist!?");
|
||||||
|
case idle: break;
|
||||||
|
case outOfWaiters: return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void main()
|
||||||
|
{
|
||||||
|
assert(eventDriver.core.waiterCount == 0, "Initial waiter count not 0!");
|
||||||
|
test(100.msecs);
|
||||||
|
assert(eventDriver.core.waiterCount == 0, "Waiter count after outOfWaiters not 0!");
|
||||||
|
test(Duration.max);
|
||||||
|
assert(eventDriver.core.waiterCount == 0);
|
||||||
|
}
|
Loading…
Reference in a new issue