Fix issues for posix events and cleanup cancel semantics.

Cancelling an operation now guarantees that the callback won't be called.
This commit is contained in:
Sönke Ludwig 2016-06-15 18:19:22 +02:00
parent fe939bff18
commit f808f89e7c
2 changed files with 26 additions and 16 deletions

View file

@ -76,6 +76,7 @@ interface EventDriver {
void triggerEvent(EventID event, bool notify_all = true); void triggerEvent(EventID event, bool notify_all = true);
void triggerEvent(EventID event, bool notify_all = true) shared; void triggerEvent(EventID event, bool notify_all = true) shared;
void waitForEvent(EventID event, EventCallback on_event); void waitForEvent(EventID event, EventCallback on_event);
void cancelWaitForEvent(EventID event, EventCallback on_event);
// //
// Timers // Timers
@ -174,7 +175,6 @@ enum IOMode {
enum IOStatus { enum IOStatus {
ok, /// The data has been transferred normally ok, /// The data has been transferred normally
disconnected, /// The connection was closed before all data could be transterred disconnected, /// The connection was closed before all data could be transterred
cancelled, /// The operation was cancelled manually
error, /// An error occured while transferring the data error, /// An error occured while transferring the data
wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable
} }

View file

@ -8,6 +8,7 @@ module eventcore.drivers.posix;
public import eventcore.driver; public import eventcore.driver;
import eventcore.drivers.timer; import eventcore.drivers.timer;
import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils; import eventcore.internal.utils;
import std.socket : Address, AddressFamily, UnknownAddress; import std.socket : Address, AddressFamily, UnknownAddress;
@ -282,7 +283,6 @@ abstract class PosixEventDriver : EventDriver {
setNotifyCallback!(EventType.read)(socket, null); setNotifyCallback!(EventType.read)(socket, null);
with (m_fds[socket]) { with (m_fds[socket]) {
readBuffer = null; readBuffer = null;
readCallback(socket, IOStatus.cancelled, bytesRead);
} }
} }
@ -378,7 +378,6 @@ abstract class PosixEventDriver : EventDriver {
setNotifyCallback!(EventType.write)(socket, null); setNotifyCallback!(EventType.write)(socket, null);
with (m_fds[socket]) { with (m_fds[socket]) {
writeBuffer = null; writeBuffer = null;
writeCallback(socket, IOStatus.cancelled, bytesWritten);
} }
} }
@ -482,6 +481,7 @@ abstract class PosixEventDriver : EventDriver {
{ {
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
initFD(id); initFD(id);
m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation
registerFD(id, EventMask.read); registerFD(id, EventMask.read);
startNotify!(EventType.read)(id, &onEvent); startNotify!(EventType.read)(id, &onEvent);
return id; return id;
@ -489,6 +489,7 @@ abstract class PosixEventDriver : EventDriver {
final override void triggerEvent(EventID event, bool notify_all = true) final override void triggerEvent(EventID event, bool notify_all = true)
{ {
assert(event < m_fds.length, "Invalid event ID passed to triggerEvent.");
if (notify_all) { if (notify_all) {
foreach (w; m_fds[event].waiters.consume) foreach (w; m_fds[event].waiters.consume)
w(event); w(event);
@ -502,6 +503,7 @@ abstract class PosixEventDriver : EventDriver {
shared @trusted { shared @trusted {
import core.atomic : atomicStore; import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriver)this; auto thisus = cast(PosixEventDriver)this;
assert(event < thisus.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
int one = 1; int one = 1;
if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true); if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true);
() @trusted { write(event, &one, one.sizeof); } (); () @trusted { write(event, &one, one.sizeof); } ();
@ -509,9 +511,19 @@ abstract class PosixEventDriver : EventDriver {
final override void waitForEvent(EventID event, EventCallback on_event) final override void waitForEvent(EventID event, EventCallback on_event)
{ {
assert(event < m_fds.length, "Invalid event ID passed to waitForEvent.");
return m_fds[event].waiters.put(on_event); return m_fds[event].waiters.put(on_event);
} }
final override void cancelWaitForEvent(EventID event, EventCallback on_event)
{
import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove;
auto slot = &m_fds[event];
slot.waiters.removePending(on_event);
}
private void onEvent(FD event) private void onEvent(FD event)
@trusted { @trusted {
import core.atomic : cas; import core.atomic : cas;
@ -522,7 +534,7 @@ abstract class PosixEventDriver : EventDriver {
final override void addRef(SocketFD fd) final override void addRef(SocketFD fd)
{ {
auto pfd = &m_fds[fd]; auto pfd = &m_fds[fd];
assert(pfd.refCount > 0); assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD.");
m_fds[fd].refCount++; m_fds[fd].refCount++;
} }
@ -534,14 +546,14 @@ abstract class PosixEventDriver : EventDriver {
final override void addRef(EventID descriptor) final override void addRef(EventID descriptor)
{ {
auto pfd = &m_fds[descriptor]; auto pfd = &m_fds[descriptor];
assert(pfd.refCount > 0); assert(pfd.refCount > 0, "Adding reference to unreferenced event FD.");
m_fds[descriptor].refCount++; m_fds[descriptor].refCount++;
} }
final override void releaseRef(SocketFD fd) final override void releaseRef(SocketFD fd)
{ {
auto pfd = &m_fds[fd]; auto pfd = &m_fds[fd];
assert(pfd.refCount > 0); assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD.");
if (--m_fds[fd].refCount == 0) { if (--m_fds[fd].refCount == 0) {
unregisterFD(fd); unregisterFD(fd);
clearFD(fd); clearFD(fd);
@ -554,15 +566,10 @@ abstract class PosixEventDriver : EventDriver {
assert(false); assert(false);
} }
final override void releaseRef(TimerID descriptor)
{
assert(false);
}
final override void releaseRef(EventID descriptor) final override void releaseRef(EventID descriptor)
{ {
auto pfd = &m_fds[descriptor]; auto pfd = &m_fds[descriptor];
assert(pfd.refCount > 0); assert(pfd.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_fds[descriptor].refCount == 0) { if (--m_fds[descriptor].refCount == 0) {
unregisterFD(descriptor); unregisterFD(descriptor);
clearFD(descriptor); clearFD(descriptor);
@ -627,7 +634,8 @@ import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(E
private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert((callback !is null) != (m_fds[fd].callback[evt] !is null)); assert((callback !is null) != (m_fds[fd].callback[evt] !is null),
"Overwriting notification callback.");
m_fds[fd].callback[evt] = callback; m_fds[fd].callback[evt] = callback;
} }
@ -649,6 +657,10 @@ import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(E
{ {
if (m_fds[fd].userDataDestructor) if (m_fds[fd].userDataDestructor)
() @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } ();
() @trusted nothrow {
scope (failure) assert(false);
destroy(m_fds[fd].waiters);
} ();
m_fds[fd] = FDSlot.init; m_fds[fd] = FDSlot.init;
} }
} }
@ -658,8 +670,6 @@ alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD);
private struct FDSlot { private struct FDSlot {
import eventcore.internal.consumablequeue;
FDSlotCallback[EventType.max+1] callback; FDSlotCallback[EventType.max+1] callback;
uint refCount; uint refCount;