From 5ec6b9a5e58b0bda2c55bb47ae00fbb4ec47a952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 14 Jun 2016 07:57:10 +0200 Subject: [PATCH] Add more interface functions. - Cancelling of socket read/write events - Cancelling of timer waits - Retrieving the TCP connection state - Storing custom data together with the event structures --- source/eventcore/driver.d | 36 +++++++++++++++- source/eventcore/drivers/posix.d | 71 +++++++++++++++++++++++++++++++- source/eventcore/drivers/timer.d | 10 +++++ 3 files changed, 115 insertions(+), 2 deletions(-) diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 1918075..18cd929 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -54,12 +54,20 @@ interface EventDriver { StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect); StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); - + ConnectionState getConnectionState(StreamSocketFD sock); void setTCPNoDelay(StreamSocketFD socket, bool enable); void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); + void cancelRead(StreamSocketFD socket); + void cancelWrite(StreamSocketFD socket); + + // + // Files + // + //FileFD openFile(string path, FileOpenMode mode); + //FileFD createTempFile(); // // Manual events @@ -78,6 +86,7 @@ interface EventDriver { bool isTimerPending(TimerID timer); bool isTimerPeriodic(TimerID timer); void waitTimer(TimerID timer, TimerCallback callback); + void cancelTimerWait(TimerID timer, TimerCallback callback); // // Resource ownership @@ -107,6 +116,20 @@ interface EventDriver { void releaseRef(TimerID descriptor); /// ditto void releaseRef(EventID descriptor); + + + /// Low-level user data access. Use `getUserData` instead. + protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; + + /** Retrieves a reference to a user-defined value associated with a descriptor. + */ + @property final ref T userData(T, FD)(FD descriptor) + @trusted { + import std.conv : emplace; + static void init(void* ptr) { emplace(cast(T*)ptr); } + static void destr(void* ptr) { destroy(*cast(T*)ptr); } + return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + } } @@ -115,6 +138,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias EventCallback = void delegate(EventID); alias TimerCallback = void delegate(TimerID); +@system alias DataInitializer = void function(void*); enum ExitReason { timeout, @@ -131,6 +155,15 @@ enum ConnectStatus { unknownError } +enum ConnectionState { + initialized, + connecting, + connected, + passiveClose, + activeClose, + closed +} + enum IOMode { immediate, /// Process only as much as possible without waiting once, /// Process as much as possible with a single call @@ -141,6 +174,7 @@ enum IOMode { enum IOStatus { ok, /// The data has been transferred normally disconnected, /// The connection was closed before all data could be transterred + cancelled, /// The operation was cancelled manually error, /// An error occured while transferring the data wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable } diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index c55e9d0..44491ec 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -161,6 +161,7 @@ abstract class PosixEventDriver : EventDriver { final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) { + log("Listen stream"); auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily); void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; } @@ -169,12 +170,15 @@ abstract class PosixEventDriver : EventDriver { int tmp_reuse = 1; // FIXME: error handling! if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { + log("setsockopt failed."); invalidateSocket(); } else if (bind(sock, address.name, address.nameLen) != 0) { + log("bind failed."); invalidateSocket(); } else if (listen(sock, 128) != 0) { + log("listen failed."); invalidateSocket(); - } else { scope (failure) assert(false); import std.stdio; writeln("Success!"); } + } else log("Success!"); } (); if (on_accept && sock != StreamListenSocketFD.invalid) @@ -185,6 +189,7 @@ abstract class PosixEventDriver : EventDriver { final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { + log("wait for conn"); registerFD(sock, EventMask.read); initFD(sock); m_fds[sock].acceptCallback = on_accept; @@ -210,6 +215,11 @@ abstract class PosixEventDriver : EventDriver { } } + ConnectionState getConnectionState(StreamSocketFD sock) + { + assert(false); + } + final override void setTCPNoDelay(StreamSocketFD socket, bool enable) { int opt = enable; @@ -218,6 +228,11 @@ abstract class PosixEventDriver : EventDriver { final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { + if (buffer.length == 0) { + on_read_finish(socket, IOStatus.ok, 0); + return; + } + sizediff_t ret; () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); @@ -261,6 +276,16 @@ abstract class PosixEventDriver : EventDriver { setNotifyCallback!(EventType.read)(socket, &onSocketRead); } + override void cancelRead(StreamSocketFD socket) + { + assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + setNotifyCallback!(EventType.read)(socket, null); + with (m_fds[socket]) { + readBuffer = null; + readCallback(socket, IOStatus.cancelled, bytesRead); + } + } + private void onSocketRead(FD fd) { auto slot = &m_fds[fd]; @@ -300,6 +325,11 @@ abstract class PosixEventDriver : EventDriver { final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { + if (buffer.length == 0) { + on_write_finish(socket, IOStatus.ok, 0); + return; + } + sizediff_t ret; () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); @@ -342,6 +372,16 @@ abstract class PosixEventDriver : EventDriver { setNotifyCallback!(EventType.write)(socket, &onSocketWrite); } + override void cancelWrite(StreamSocketFD socket) + { + assert(m_fds[socket].readCallback !is null, "Cancelling write when there is no read in progress."); + setNotifyCallback!(EventType.write)(socket, null); + with (m_fds[socket]) { + writeBuffer = null; + writeCallback(socket, IOStatus.cancelled, bytesWritten); + } + } + private void onSocketWrite(FD fd) { auto slot = &m_fds[fd]; @@ -530,6 +570,20 @@ abstract class PosixEventDriver : EventDriver { } } + final override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system { + FDSlot* fds = &m_fds[descriptor]; + assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, + "Requesting user data with differing type (destructor)."); + assert(size <= FDSlot.userData.length, "Requested user data is too large."); + if (size > FDSlot.userData.length) assert(false); + if (!fds.userDataDestructor) { + initialize(fds.userData.ptr); + fds.userDataDestructor = destroy; + } + return m_fds[descriptor].userData.ptr; + } + /// Registers the FD for general notification reception. protected abstract void registerFD(FD fd, EventMask mask); @@ -555,6 +609,7 @@ abstract class PosixEventDriver : EventDriver { private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) { +import std.stdio : writefln; try writefln("start notify %s %s", evt, fd); catch(Exception) {} //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); m_fds[fd].callback[evt] = callback; m_waiterCount++; @@ -563,6 +618,7 @@ abstract class PosixEventDriver : EventDriver { private void stopNotify(EventType evt)(FD fd) { +import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(Exception) {} //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); m_fds[fd].callback[evt] = null; m_waiterCount--; @@ -591,6 +647,8 @@ abstract class PosixEventDriver : EventDriver { private void clearFD(FD fd) { + if (m_fds[fd].userDataDestructor) + () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); m_fds[fd] = FDSlot.init; } } @@ -619,6 +677,10 @@ private struct FDSlot { ConnectCallback connectCallback; AcceptCallback acceptCallback; ConsumableQueue!EventCallback waiters; + + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + shared bool triggerAll; @property EventMask eventMask() const nothrow { @@ -664,6 +726,13 @@ private int getSocketError() else return errno; } +void log(ARGS...)(string fmt, ARGS args) +{ + import std.stdio; + try writefln(fmt, args); + catch (Exception) {} +} + /*version (Windows) { import std.c.windows.windows; diff --git a/source/eventcore/drivers/timer.d b/source/eventcore/drivers/timer.d index a30e50d..f900a83 100644 --- a/source/eventcore/drivers/timer.d +++ b/source/eventcore/drivers/timer.d @@ -130,6 +130,16 @@ mixin template DefaultTimerImpl() { m_timers[timer].callbacks ~= callback; } + final override void cancelTimerWait(TimerID timer, TimerCallback callback) + { + import std.algorithm.mutation : remove; + import std.algorithm.searching : countUntil; + + auto pt = m_timers[timer]; + auto idx = pt.callbacks.countUntil(callback); + if (idx >= 0) pt.callbacks = pt.callbacks.remove(idx); + } + final override void addRef(TimerID descriptor) { m_timers[descriptor].refCount++;