diff --git a/examples/http-server-fibers/source/app.d b/examples/http-server-fibers/source/app.d index ab4e209..c5dea97 100644 --- a/examples/http-server-fibers/source/app.d +++ b/examples/http-server-fibers/source/app.d @@ -98,7 +98,7 @@ struct StreamConnectionImpl { { reader.start(); if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); - else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); + else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); reader.wait(); auto ln = m_line; m_line = null; @@ -108,7 +108,7 @@ struct StreamConnectionImpl { void write(const(ubyte)[] data) { writer.start(); - eventDriver.writeSocket(m_socket, data, &onWrite, IOMode.all); + eventDriver.writeSocket(m_socket, data, IOMode.all, &onWrite); writer.wait(); } @@ -159,7 +159,7 @@ struct StreamConnectionImpl { reader.finish(); } else if (m_readBuffer.length - m_readBufferFill > 0) { - eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); + eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); } else { reader.finish(exh); } @@ -174,9 +174,9 @@ void main() auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); - import core.time : msecs; + /*import core.time : msecs; eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); - eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs); + eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/ print("Listening for requests on port 8080..."); while (eventDriver.waiterCount) @@ -209,7 +209,7 @@ struct ClientHandler { auto conn = StreamConnection(client, linebuf); try { - while (!conn.empty) { + while (true) { conn.readLine(); ubyte[] ln; diff --git a/examples/http-server/source/app.d b/examples/http-server/source/app.d index 339b668..4646e31 100644 --- a/examples/http-server/source/app.d +++ b/examples/http-server/source/app.d @@ -49,7 +49,7 @@ struct ClientHandler { { onLine = on_line; if (linefill >= 2) onReadData(client, IOStatus.ok, 0); - else eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); + else eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); } void onRequestLine(ubyte[] ln) @@ -68,7 +68,7 @@ struct ClientHandler { { if (ln.length == 0) { auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; - eventDriver.writeSocket(client, reply, &onWriteFinished, IOMode.all); + eventDriver.writeSocket(client, reply, IOMode.all, &onWriteFinished); } else readLine(&onHeaderLine); } @@ -101,7 +101,7 @@ struct ClientHandler { onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]); } else if (linebuf.length - linefill > 0) { - eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); + eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); } else { // ERROR: header line too long print("Header line too long"); diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 81bf98f..0c83c7c 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -36,7 +36,17 @@ interface EventDriver { the default duration of `Duration.max`, if necessary, will wait indefinitely until an event arrives. */ - void processEvents(Duration timeout = Duration.max); + ExitReason processEvents(Duration timeout = Duration.max); + + /** + Causes `processEvents` to return with `ExitReason.exited` as soon as + possible. + + A call to `processEvents` that is currently in progress will be notfied + so that it returns immediately. If no call is in progress, the next call + to `processEvents` will immediately return with `ExitReason.exited`. + */ + void exit(); // // TCP @@ -46,8 +56,8 @@ interface EventDriver { void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void setTCPNoDelay(StreamSocketFD socket, bool enable); - void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.once); - void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.once); + void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); + void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); @@ -56,6 +66,7 @@ interface EventDriver { // EventID createEvent(); void triggerEvent(EventID event, bool notify_all = true); + void triggerEvent(EventID event, bool notify_all = true) shared; EventWaitID waitForEvent(EventID event, EventCallback on_event); void stopWaitingForEvent(EventID event, EventWaitID wait_id); @@ -105,6 +116,13 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias EventCallback = void delegate(EventID); alias TimerCallback = void delegate(TimerID); +enum ExitReason { + timeout, + idle, + outOfWaiters, + exited +} + enum ConnectStatus { connected, refused, @@ -155,8 +173,6 @@ alias SocketFD = Handle!FD; alias StreamSocketFD = Handle!SocketFD; alias StreamListenSocketFD = Handle!SocketFD; alias FileFD = Handle!FD; - +alias EventID = Handle!FD; alias TimerID = Handle!int; -alias EventID = Handle!int; alias EventWaitID = Handle!int; - diff --git a/source/eventcore/drivers/epoll.d b/source/eventcore/drivers/epoll.d index 94f2507..1a3427e 100644 --- a/source/eventcore/drivers/epoll.d +++ b/source/eventcore/drivers/epoll.d @@ -29,7 +29,7 @@ final class EpollEventDriver : PosixEventDriver { m_events.length = 100; } - override void doProcessEvents(Duration timeout) + override bool doProcessEvents(Duration timeout) @trusted { import std.algorithm : min; //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); @@ -49,7 +49,8 @@ final class EpollEventDriver : PosixEventDriver { if (evt.events & EPOLLERR) notify!(EventType.status)(fd); else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd); } - } + return true; + } else return false; } override void dispose() diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index f6562cc..275d550 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -14,7 +14,7 @@ import std.socket : Address, AddressFamily, UnknownAddress; version (Posix) { import core.sys.posix.netinet.in_; import core.sys.posix.netinet.tcp; - import core.sys.posix.unistd : close; + import core.sys.posix.unistd : close, write; import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sys.posix.fcntl; } @@ -22,6 +22,10 @@ version (Windows) { import core.sys.windows.winsock2; alias EAGAIN = WSAEWOULDBLOCK; } +version (linux) { + extern (C) int eventfd(uint initval, int flags); + enum EFD_NONBLOCK = 0x800; +} private long currStdTime() @@ -37,6 +41,16 @@ abstract class PosixEventDriver : EventDriver { private { ChoppedVector!FDSlot m_fds; size_t m_waiterCount = 0; + bool m_exit = false; + FD m_wakeupEvent; + } + + protected this() + { + m_wakeupEvent = eventfd(0, EFD_NONBLOCK); + initFD(m_wakeupEvent); + registerFD(m_wakeupEvent, EventMask.read); + //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD } mixin DefaultTimerImpl!(); @@ -45,29 +59,53 @@ abstract class PosixEventDriver : EventDriver { @property size_t waiterCount() const { return m_waiterCount; } - final override void processEvents(Duration timeout) + final override ExitReason processEvents(Duration timeout) { import std.algorithm : min; import core.time : seconds; + if (m_exit) { + m_exit = false; + return ExitReason.exited; + } + + if (!waiterCount) return ExitReason.outOfWaiters; + + bool got_events; + if (timeout <= 0.seconds) { - doProcessEvents(0.seconds); + got_events = doProcessEvents(0.seconds); processTimers(currStdTime); } else { long now = currStdTime; do { auto nextto = min(getNextTimeout(now), timeout); - doProcessEvents(nextto); + got_events = doProcessEvents(nextto); long prev_step = now; now = currStdTime; processTimers(now); if (timeout != Duration.max) timeout -= (now - prev_step).hnsecs; - } while (timeout > 0.seconds); + } while (timeout > 0.seconds && !m_exit && !got_events); } + + if (m_exit) { + m_exit = false; + return ExitReason.exited; + } + if (!waiterCount) return ExitReason.outOfWaiters; + if (got_events) return ExitReason.idle; + return ExitReason.timeout; } - protected abstract void doProcessEvents(Duration dur); + final override void exit() + { + m_exit = true; + int one = 1; + () @trusted { write(m_wakeupEvent, &one, one.sizeof); } (); + } + + protected abstract bool doProcessEvents(Duration dur); abstract void dispose(); final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) @@ -105,7 +143,7 @@ abstract class PosixEventDriver : EventDriver { } } - addFD(sock); + initFD(sock); return sock; } @@ -148,7 +186,7 @@ abstract class PosixEventDriver : EventDriver { final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) { registerFD(sock, EventMask.read); - addFD(sock); + initFD(sock); m_fds[sock].acceptCallback = on_accept; startNotify!(EventType.read)(sock, &onAccept); } @@ -166,7 +204,7 @@ abstract class PosixEventDriver : EventDriver { setSocketNonBlocking(cast(SocketFD)sockfd); auto fd = cast(StreamSocketFD)sockfd; registerFD(fd, EventMask.read|EventMask.write|EventMask.status); - addFD(fd); + initFD(fd); //print("accept %d", sockfd); m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); } @@ -178,7 +216,7 @@ abstract class PosixEventDriver : EventDriver { () @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); } - final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.all) + final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { sizediff_t ret; () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); @@ -260,7 +298,7 @@ abstract class PosixEventDriver : EventDriver { } } - final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.all) + final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) { sizediff_t ret; () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); @@ -402,22 +440,45 @@ abstract class PosixEventDriver : EventDriver { final override EventID createEvent() { - assert(false); + auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); + initFD(id); + registerFD(id, EventMask.read); + startNotify!(EventType.read)(id, &onEvent); + return id; } final override void triggerEvent(EventID event, bool notify_all = true) { + foreach (w; m_fds[event].waiters.consume) + w(event); + } + + final override void triggerEvent(EventID event, bool notify_all = true) + shared { + /*int one = 1; + if (notify_all) atomicStore(m_fds[event].triggerAll, true); + () @trusted { write(event, &one, one.sizeof); } ();*/ assert(false); } final override EventWaitID waitForEvent(EventID event, EventCallback on_event) { + //return m_fds[event].waiters.put(on_event); assert(false); } final override void stopWaitingForEvent(EventID event, EventWaitID wait_id) { assert(false); + //m_fds[event].waiters.remove(wait_id); + } + + private void onEvent(FD event) + { + assert(false); + /*auto all = atomicLoad(m_fds[event].triggerAll); + atomicStore(m_fds[event].triggerAll, false); + triggerEvent(cast(EventID)event, all);*/ } final override void addRef(SocketFD fd) @@ -434,7 +495,9 @@ abstract class PosixEventDriver : EventDriver { final override void addRef(EventID descriptor) { - assert(false); + auto pfd = &m_fds[descriptor]; + assert(pfd.refCount > 0); + m_fds[descriptor].refCount++; } final override void releaseRef(SocketFD fd) @@ -460,7 +523,13 @@ abstract class PosixEventDriver : EventDriver { final override void releaseRef(EventID descriptor) { - assert(false); + auto pfd = &m_fds[descriptor]; + assert(pfd.refCount > 0); + if (--m_fds[descriptor].refCount == 0) { + unregisterFD(descriptor); + clearFD(descriptor); + close(descriptor); + } } @@ -490,7 +559,6 @@ abstract class PosixEventDriver : EventDriver { { //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); m_fds[fd].callback[evt] = callback; - assert(m_fds[0].callback[evt] is null); m_waiterCount++; updateFD(fd, m_fds[fd].eventMask); } @@ -518,7 +586,7 @@ abstract class PosixEventDriver : EventDriver { return cast(SocketFD)sock; } - private void addFD(FD fd) + private void initFD(FD fd) { m_fds[fd].refCount = 1; } @@ -534,6 +602,8 @@ alias FDEnumerateCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD); private struct FDSlot { + import eventcore.internal.consumablequeue; + FDSlotCallback[EventType.max+1] callback; uint refCount; @@ -550,6 +620,7 @@ private struct FDSlot { ConnectCallback connectCallback; AcceptCallback acceptCallback; + ConsumableQueue!EventCallback waiters; @property EventMask eventMask() const nothrow { EventMask ret = cast(EventMask)0; diff --git a/source/eventcore/drivers/select.d b/source/eventcore/drivers/select.d index 8d6d64c..a6a1589 100644 --- a/source/eventcore/drivers/select.d +++ b/source/eventcore/drivers/select.d @@ -24,7 +24,7 @@ version (Windows) { final class SelectEventDriver : PosixEventDriver { - override void doProcessEvents(Duration timeout) + override bool doProcessEvents(Duration timeout) { //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); //scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs"); @@ -61,7 +61,8 @@ final class SelectEventDriver : PosixEventDriver { if (FD_ISSET(fd, &statusfds)) notify!(EventType.status)(fd); }); - } + return true; + } else return false; } override void dispose() diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d index 1f21c75..45481cd 100644 --- a/source/eventcore/internal/utils.d +++ b/source/eventcore/internal/utils.d @@ -34,7 +34,7 @@ struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { import core.stdc.stdlib : calloc, free, malloc, realloc; import std.traits : hasElaborateDestructor; - static assert(!hasElaborateDestructor!T); + static assert(!hasElaborateDestructor!T, "Cannot store element with elaborate destructor in ChoppedVector."); alias chunkSize = CHUNK_SIZE;