From e3c4af843353ab8d92f81d40ff6018f0482fc654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 5 Oct 2016 04:21:24 +0200 Subject: [PATCH] Split up EventDriver interface into individual sub interfaces. --- examples/http-server-fibers/source/app.d | 22 +-- examples/http-server/source/app.d | 24 +-- source/eventcore/driver.d | 177 +++++++++++++---------- source/eventcore/drivers/posix.d | 9 +- 4 files changed, 134 insertions(+), 98 deletions(-) diff --git a/examples/http-server-fibers/source/app.d b/examples/http-server-fibers/source/app.d index c5dea97..badfc9d 100644 --- a/examples/http-server-fibers/source/app.d +++ b/examples/http-server-fibers/source/app.d @@ -83,13 +83,13 @@ struct StreamConnectionImpl { ~this() nothrow { if (m_socket != StreamSocketFD.invalid) - eventDriver.releaseRef(m_socket); + eventDriver.sockets.releaseRef(m_socket); } @property bool empty() { reader.start(); - eventDriver.waitSocketData(m_socket, &onData); + eventDriver.sockets.waitSocketData(m_socket, &onData); reader.wait(); return m_empty; } @@ -98,7 +98,7 @@ struct StreamConnectionImpl { { reader.start(); if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); - else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); + else eventDriver.sockets.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); reader.wait(); auto ln = m_line; m_line = null; @@ -108,13 +108,13 @@ struct StreamConnectionImpl { void write(const(ubyte)[] data) { writer.start(); - eventDriver.writeSocket(m_socket, data, IOMode.all, &onWrite); + eventDriver.sockets.writeSocket(m_socket, data, IOMode.all, &onWrite); writer.wait(); } void close() nothrow { - eventDriver.releaseRef(m_socket); + eventDriver.sockets.releaseRef(m_socket); m_socket = StreamSocketFD.invalid; m_readBuffer = null; } @@ -159,7 +159,7 @@ struct StreamConnectionImpl { reader.finish(); } else if (m_readBuffer.length - m_readBufferFill > 0) { - eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); + eventDriver.sockets.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); } else { reader.finish(exh); } @@ -171,16 +171,16 @@ void main() { print("Starting up..."); auto addr = new InternetAddress("127.0.0.1", 8080); - auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); + auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect)); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); /*import core.time : msecs; - eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); - eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/ + eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); + eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/ print("Listening for requests on port 8080..."); - while (eventDriver.waiterCount) - eventDriver.processEvents(); + while (eventDriver.core.waiterCount) + eventDriver.core.processEvents(); } void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) diff --git a/examples/http-server/source/app.d b/examples/http-server/source/app.d index 4646e31..640b9b3 100644 --- a/examples/http-server/source/app.d +++ b/examples/http-server/source/app.d @@ -9,12 +9,12 @@ void main() { print("Starting up..."); auto addr = new InternetAddress("127.0.0.1", 8080); - auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); + auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect)); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); print("Listening for requests on port 8080..."); - while (eventDriver.waiterCount) - eventDriver.processEvents(); + while (eventDriver.core.waiterCount) + eventDriver.core.processEvents(); } void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) @@ -49,7 +49,7 @@ struct ClientHandler { { onLine = on_line; if (linefill >= 2) onReadData(client, IOStatus.ok, 0); - else eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); + else eventDriver.sockets.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); } void onRequestLine(ubyte[] ln) @@ -57,8 +57,8 @@ struct ClientHandler { //print("Request: %s", cast(char[])ln); if (ln.length == 0) { //print("Error: empty request line"); - eventDriver.shutdownSocket(client); - eventDriver.releaseRef(client); + eventDriver.sockets.shutdownSocket(client); + eventDriver.sockets.releaseRef(client); } readLine(&onHeaderLine); @@ -68,7 +68,7 @@ struct ClientHandler { { if (ln.length == 0) { auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; - eventDriver.writeSocket(client, reply, IOMode.all, &onWriteFinished); + eventDriver.sockets.writeSocket(client, reply, IOMode.all, &onWriteFinished); } else readLine(&onHeaderLine); } @@ -83,8 +83,8 @@ struct ClientHandler { if (status != IOStatus.ok) { print("Client disconnect"); - eventDriver.shutdownSocket(client); - eventDriver.releaseRef(client); + eventDriver.sockets.shutdownSocket(client); + eventDriver.sockets.releaseRef(client); return; } @@ -101,12 +101,12 @@ struct ClientHandler { onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]); } else if (linebuf.length - linefill > 0) { - eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); + eventDriver.sockets.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); } else { // ERROR: header line too long print("Header line too long"); - eventDriver.shutdownSocket(client); - eventDriver.releaseRef(client); + eventDriver.sockets.shutdownSocket(client); + eventDriver.sockets.releaseRef(client); } } } diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index e4c7b5f..d202377 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -7,10 +7,16 @@ import std.socket : Address; interface EventDriver { @safe: /*@nogc:*/ nothrow: - // - // General functionality - // + @property EventDriverCore core(); + @property EventDriverFiles files(); + @property EventDriverSockets sockets(); + @property EventDriverTimers udp(); + @property EventDriverEvents events(); + @property EventDriverWatchers watchers(); +} +interface EventDriverCore { +@safe: /*@nogc:*/ nothrow: /// Releases all resources associated with the driver void dispose(); @@ -57,77 +63,6 @@ interface EventDriver { */ void clearExitFlag(); - // - // TCP - // - StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect); - StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); - void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); - ConnectionState getConnectionState(StreamSocketFD sock); - void setTCPNoDelay(StreamSocketFD socket, bool enable); - void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); - void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); - void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); - void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); - void cancelRead(StreamSocketFD socket); - void cancelWrite(StreamSocketFD socket); - - // - // Files - // - //FileFD openFile(string path, FileOpenMode mode); - //FileFD createTempFile(); - - // - // Manual events - // - EventID createEvent(); - void triggerEvent(EventID event, bool notify_all = true); - void triggerEvent(EventID event, bool notify_all = true) shared; - void waitForEvent(EventID event, EventCallback on_event); - void cancelWaitForEvent(EventID event, EventCallback on_event); - - // - // Timers - // - TimerID createTimer(); - void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero); - void stopTimer(TimerID timer); - bool isTimerPending(TimerID timer); - bool isTimerPeriodic(TimerID timer); - void waitTimer(TimerID timer, TimerCallback callback); - void cancelTimerWait(TimerID timer, TimerCallback callback); - - // - // Resource ownership - // - - /** - Increments the reference count of the given resource. - */ - void addRef(SocketFD descriptor); - /// ditto - void addRef(FileFD descriptor); - /// ditto - void addRef(TimerID descriptor); - /// ditto - void addRef(EventID descriptor); - - /** - Decrements the reference count of the given resource. - - Once the reference count reaches zero, all associated resources will be - freed and the resource descriptor gets invalidated. - */ - void releaseRef(SocketFD descriptor); - /// ditto - void releaseRef(FileFD descriptor); - /// ditto - void releaseRef(TimerID descriptor); - /// ditto - void releaseRef(EventID descriptor); - - /// Low-level user data access. Use `getUserData` instead. protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; @@ -142,6 +77,100 @@ interface EventDriver { } } +interface EventDriverSockets { +@safe: /*@nogc:*/ nothrow: + /** + Increments the reference count of the given resource. + */ + void addRef(SocketFD descriptor); + /** + Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + */ + void releaseRef(SocketFD descriptor); + + StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect); + StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); + void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); + ConnectionState getConnectionState(StreamSocketFD sock); + void setTCPNoDelay(StreamSocketFD socket, bool enable); + void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); + void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); + void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); + void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); + void cancelRead(StreamSocketFD socket); + void cancelWrite(StreamSocketFD socket); +} + +interface EventDriverFiles { +@safe: /*@nogc:*/ nothrow: + /** + Increments the reference count of the given resource. + */ + void addRef(FileFD descriptor); + /** + Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + */ + void releaseRef(FileFD descriptor); + + //FileFD openFile(string path, FileOpenMode mode); + //FileFD createTempFile(); +} + +interface EventDriverEvents { +@safe: /*@nogc:*/ nothrow: + /** + Increments the reference count of the given resource. + */ + void addRef(EventID descriptor); + /** + Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + */ + void releaseRef(EventID descriptor); + + EventID createEvent(); + void triggerEvent(EventID event, bool notify_all = true); + void triggerEvent(EventID event, bool notify_all = true) shared; + void waitForEvent(EventID event, EventCallback on_event); + void cancelWaitForEvent(EventID event, EventCallback on_event); +} + +interface EventDriverTimers { +@safe: /*@nogc:*/ nothrow: + /** + Increments the reference count of the given resource. + */ + void addRef(TimerID descriptor); + /** + Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + */ + void releaseRef(TimerID descriptor); + + TimerID createTimer(); + void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero); + void stopTimer(TimerID timer); + bool isTimerPending(TimerID timer); + bool isTimerPeriodic(TimerID timer); + void waitTimer(TimerID timer, TimerCallback callback); + void cancelTimerWait(TimerID timer, TimerCallback callback); +} + +interface EventDriverWatchers { +@safe: /*@nogc:*/ nothrow: + +} + alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index e53a6a6..5f5f7f5 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -36,7 +36,7 @@ private long currStdTime() return Clock.currStdTime; } -abstract class PosixEventDriver : EventDriver { +abstract class PosixEventDriver : EventDriver, EventDriverCore, EventDriverFiles, EventDriverSockets, EventDriverTimers, EventDriverEvents, EventDriverWatchers { @safe: /*@nogc:*/ nothrow: private { @@ -54,6 +54,13 @@ abstract class PosixEventDriver : EventDriver { //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD } + @property PosixEventDriver core() { return this; } + @property PosixEventDriver files() { return this; } + @property PosixEventDriver sockets() { return this; } + @property PosixEventDriver udp() { return this; } + @property PosixEventDriver events() { return this; } + @property PosixEventDriver watchers() { return this; } + mixin DefaultTimerImpl!(); protected int maxFD() const { return cast(int)m_fds.length; }