commit 2a926d87aa78b36aeeff30a2eeac5e58da4b6ff8 Author: Sönke Ludwig Date: Mon Jan 11 21:33:49 2016 +0100 Initial version with sone partial Posix implementations. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..38bd39f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.dub diff --git a/dub.sdl b/dub.sdl new file mode 100644 index 0000000..132d26e --- /dev/null +++ b/dub.sdl @@ -0,0 +1,3 @@ +name "eventcore" +description "Callback based abstraction layer over operating system asynchronous I/O facilities." + diff --git a/examples/http-server-fibers/dub.sdl b/examples/http-server-fibers/dub.sdl new file mode 100644 index 0000000..774b915 --- /dev/null +++ b/examples/http-server-fibers/dub.sdl @@ -0,0 +1,4 @@ +name "http-server-example" +description "Simple pseudo HTTP server suitable for benchmarking" +dependency "eventcore" path="../.." + diff --git a/examples/http-server-fibers/source/app.d b/examples/http-server-fibers/source/app.d new file mode 100644 index 0000000..ab4e209 --- /dev/null +++ b/examples/http-server-fibers/source/app.d @@ -0,0 +1,229 @@ + +import eventcore.core; +import eventcore.internal.utils; +import std.functional : toDelegate; +import std.socket : InternetAddress; +import std.exception : enforce; +import std.typecons : Rebindable, RefCounted; +import core.thread : Fiber; + + +Fiber[] store = new Fiber[20000]; +size_t storeSize = 0; +Fiber getFiber() +nothrow { + + if (storeSize > 0) return store[--storeSize]; + return new Fiber({}); +} +void done(Fiber f) +nothrow { + if (storeSize < store.length) + store[storeSize++] = f; +} + + + +struct AsyncBlocker { + @safe: + + bool done; + Rebindable!(const(Exception)) exception; + Fiber owner; + + void start() + nothrow { + assert(owner is null); + done = false; + exception = null; + () @trusted { owner = Fiber.getThis(); } (); + } + + void wait() + { + () @trusted { while (!done) Fiber.yield(); } (); + auto ex = cast(const(Exception))exception; + owner = null; + done = false; + exception = null; + if (ex) throw ex; + } + + void finish(const(Exception) e = null) + nothrow { + assert(!done && owner !is null); + exception = e; + done = true; + () @trusted { scope (failure) assert(false); if (owner.state == Fiber.State.HOLD) owner.call(); } (); + } +} + +alias StreamConnection = RefCounted!StreamConnectionImpl; + +struct StreamConnectionImpl { + @safe: /*@nogc:*/ + private { + StreamSocketFD m_socket; + bool m_empty = false; + + AsyncBlocker writer; + AsyncBlocker reader; + ubyte[] m_readBuffer; + size_t m_readBufferFill; + + ubyte[] m_line; + } + + this(StreamSocketFD sock, ubyte[] buffer) + nothrow { + m_socket = sock; + m_readBuffer = buffer; + } + + ~this() + nothrow { + if (m_socket != StreamSocketFD.invalid) + eventDriver.releaseRef(m_socket); + } + + @property bool empty() + { + reader.start(); + eventDriver.waitSocketData(m_socket, &onData); + reader.wait(); + return m_empty; + } + + ubyte[] readLine() + { + reader.start(); + if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); + else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); + reader.wait(); + auto ln = m_line; + m_line = null; + return ln; + } + + void write(const(ubyte)[] data) + { + writer.start(); + eventDriver.writeSocket(m_socket, data, &onWrite, IOMode.all); + writer.wait(); + } + + void close() + nothrow { + eventDriver.releaseRef(m_socket); + m_socket = StreamSocketFD.invalid; + m_readBuffer = null; + } + + private void onWrite(StreamSocketFD fd, IOStatus status, size_t len) + @safe nothrow { + static const ex = new Exception("Failed to write data!"); + writer.finish(status == IOStatus.ok ? null : ex); + } + + private void onData(StreamSocketFD, IOStatus status, size_t bytes_read) + @safe nothrow { + if (status != IOStatus.ok) + m_empty = true; + reader.finish(); + } + + private void onReadLineData(StreamSocketFD, IOStatus status, size_t bytes_read) + @safe nothrow { + static const ex = new Exception("Failed to read data!"); + static const exh = new Exception("Header line too long."); + + import std.algorithm : countUntil; + + if (status != IOStatus.ok) { + reader.finish(ex); + return; + } + + m_readBufferFill += bytes_read; + + assert(m_readBufferFill <= m_readBuffer.length); + + auto idx = m_readBuffer[0 .. m_readBufferFill].countUntil(cast(const(ubyte)[])"\r\n"); + if (idx >= 0) { + m_readBuffer[m_readBufferFill .. m_readBufferFill + idx] = m_readBuffer[0 .. idx]; + foreach (i; 0 .. m_readBufferFill - idx - 2) + m_readBuffer[i] = m_readBuffer[idx+2+i]; + m_readBufferFill -= idx + 2; + + m_line = m_readBuffer[m_readBufferFill + idx + 2 .. m_readBufferFill + idx + 2 + idx]; + + reader.finish(); + } else if (m_readBuffer.length - m_readBufferFill > 0) { + eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); + } else { + reader.finish(exh); + } + } +} + + +void main() +{ + print("Starting up..."); + auto addr = new InternetAddress("127.0.0.1", 8080); + auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); + enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); + + import core.time : msecs; + eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); + eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs); + + print("Listening for requests on port 8080..."); + while (eventDriver.waiterCount) + eventDriver.processEvents(); +} + +void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) +@trusted /*@nogc*/ nothrow { + import core.stdc.stdlib; + auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof); + handler.client = client; + auto f = getFiber(); + f.reset(&handler.handleConnection); + scope (failure) assert(false); + f.call(); + +} + +struct ClientHandler { + @safe: /*@nogc:*/ nothrow: + + StreamSocketFD client; + + @disable this(this); + + void handleConnection() + @trusted { + ubyte[512] linebuf = void; + auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; + + auto conn = StreamConnection(client, linebuf); + try { + while (!conn.empty) { + conn.readLine(); + + ubyte[] ln; + do ln = conn.readLine(); + while (ln.length > 0); + + conn.write(reply); + } + //print("close %s", cast(int)client); + } catch (Exception e) { + print("close %s: %s", cast(int)client, e.msg); + } + conn.close(); + + done(Fiber.getThis()); + } +} diff --git a/examples/http-server/dub.sdl b/examples/http-server/dub.sdl new file mode 100644 index 0000000..774b915 --- /dev/null +++ b/examples/http-server/dub.sdl @@ -0,0 +1,4 @@ +name "http-server-example" +description "Simple pseudo HTTP server suitable for benchmarking" +dependency "eventcore" path="../.." + diff --git a/examples/http-server/source/app.d b/examples/http-server/source/app.d new file mode 100644 index 0000000..339b668 --- /dev/null +++ b/examples/http-server/source/app.d @@ -0,0 +1,112 @@ + +import eventcore.core; +import eventcore.internal.utils; +import std.functional : toDelegate; +import std.socket : InternetAddress; +import std.exception : enforce; + +void main() +{ + print("Starting up..."); + auto addr = new InternetAddress("127.0.0.1", 8080); + auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); + enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); + + print("Listening for requests on port 8080..."); + while (eventDriver.waiterCount) + eventDriver.processEvents(); +} + +void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) +@trusted /*@nogc*/ nothrow { + import core.stdc.stdlib; + auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof); + handler.client = client; + handler.handleConnection(); +} + +struct ClientHandler { + @safe: /*@nogc:*/ nothrow: + + alias LineCallback = void delegate(ubyte[]); + + StreamSocketFD client; + ubyte[512] linebuf = void; + size_t linefill = 0; + LineCallback onLine; + + @disable this(this); + + void handleConnection() + { + int fd = client; + //import core.thread; + //() @trusted { print("Connection %d %s", fd, cast(void*)Thread.getThis()); } (); + readLine(&onRequestLine); + } + + void readLine(LineCallback on_line) + { + onLine = on_line; + if (linefill >= 2) onReadData(client, IOStatus.ok, 0); + else eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); + } + + void onRequestLine(ubyte[] ln) + { + //print("Request: %s", cast(char[])ln); + if (ln.length == 0) { + //print("Error: empty request line"); + eventDriver.shutdownSocket(client); + eventDriver.releaseRef(client); + } + + readLine(&onHeaderLine); + } + + void onHeaderLine(ubyte[] ln) + { + if (ln.length == 0) { + auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; + eventDriver.writeSocket(client, reply, &onWriteFinished, IOMode.all); + } else readLine(&onHeaderLine); + } + + void onWriteFinished(StreamSocketFD fd, IOStatus status, size_t len) + { + readLine(&onRequestLine); + } + + void onReadData(StreamSocketFD, IOStatus status, size_t bytes_read) + { + import std.algorithm : countUntil; + + if (status != IOStatus.ok) { + print("Client disconnect"); + eventDriver.shutdownSocket(client); + eventDriver.releaseRef(client); + return; + } + + linefill += bytes_read; + + assert(linefill <= linebuf.length); + + auto idx = linebuf[0 .. linefill].countUntil(cast(const(ubyte)[])"\r\n"); + if (idx >= 0) { + linebuf[linefill .. linefill + idx] = linebuf[0 .. idx]; + foreach (i; 0 .. linefill - idx - 2) + linebuf[i] = linebuf[idx+2+i]; + linefill -= idx + 2; + + onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]); + } else if (linebuf.length - linefill > 0) { + eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); + } else { + // ERROR: header line too long + print("Header line too long"); + eventDriver.shutdownSocket(client); + eventDriver.releaseRef(client); + } + } +} diff --git a/source/eventcore/core.d b/source/eventcore/core.d new file mode 100644 index 0000000..a816f5b --- /dev/null +++ b/source/eventcore/core.d @@ -0,0 +1,26 @@ +module eventcore.core; + +public import eventcore.driver; + +import eventcore.epoll; +import eventcore.select; + +alias NativeEventDriver = SelectEventDriver; + +@property EventDriver eventDriver() +@safe @nogc nothrow { + assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); + return s_driver; +} + +static this() +{ + if (!s_driver) s_driver = new NativeEventDriver; +} + +shared static this() +{ + s_driver = new NativeEventDriver; +} + +private NativeEventDriver s_driver; diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d new file mode 100644 index 0000000..81bf98f --- /dev/null +++ b/source/eventcore/driver.d @@ -0,0 +1,162 @@ +module eventcore.driver; +@safe: /*@nogc:*/ nothrow: + +import core.time : Duration; +import std.socket : Address; + + +interface EventDriver { +@safe: /*@nogc:*/ nothrow: + // + // General functionality + // + + /// Releases all resources associated with the driver + void dispose(); + + /** + The number of pending callbacks. + + When this number drops to zero, the event loop can safely be quit. It is + guaranteed that no callbacks will be made anymore, unless new callbacks + get registered. + */ + size_t waiterCount(); + + /** + Runs the event loop to process a chunk of events. + + This method optionally waits for an event to arrive if none are present + in the event queue. The function will return after either the specified + timeout has elapsed, or once the event queue has been fully emptied. + + Params: + timeout = Maximum amount of time to wait for an event. A duration of + zero will cause the function to only process pending events. The + the default duration of `Duration.max`, if necessary, will wait + indefinitely until an event arrives. + */ + void processEvents(Duration timeout = Duration.max); + + // + // TCP + // + StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect); + StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); + void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); + + void setTCPNoDelay(StreamSocketFD socket, bool enable); + void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.once); + void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.once); + void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); + void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); + + // + // Manual events + // + EventID createEvent(); + void triggerEvent(EventID event, bool notify_all = true); + EventWaitID waitForEvent(EventID event, EventCallback on_event); + void stopWaitingForEvent(EventID event, EventWaitID wait_id); + + // + // Timers + // + TimerID createTimer(TimerCallback callback); + void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero); + void stopTimer(TimerID timer); + bool isTimerPending(TimerID timer); + bool isTimerPeriodic(TimerID timer); + + // + // Resource ownership + // + + /** + Increments the reference count of the given resource. + */ + void addRef(SocketFD descriptor); + /// ditto + void addRef(FileFD descriptor); + /// ditto + void addRef(TimerID descriptor); + /// ditto + void addRef(EventID descriptor); + + /** + Decrements the reference count of the given resource. + + Once the reference count reaches zero, all associated resources will be + freed and the descriptor gets invalidated. + */ + void releaseRef(SocketFD descriptor); + /// ditto + void releaseRef(FileFD descriptor); + /// ditto + void releaseRef(TimerID descriptor); + /// ditto + void releaseRef(EventID descriptor); +} + + +alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); +alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); +alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); +alias EventCallback = void delegate(EventID); +alias TimerCallback = void delegate(TimerID); + +enum ConnectStatus { + connected, + refused, + timeout, + bindFailure, + unknownError +} + +enum IOMode { + immediate, /// Process only as much as possible without waiting + once, /// Process as much as possible with a single call + all /// Process the full buffer +} + + +enum IOStatus { + ok, /// The data has been transferred normally + disconnected, /// The connection was closed before all data could be transterred + error, /// An error occured while transferring the data + wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable +} + +struct Handle(T, T invalid_value = T.init, int MAGIC = __LINE__) { + static if (is(T : Handle!(V, M), V, int M)) alias BaseType = T.BaseType; + else alias BaseType = T; + + enum invalid = Handle.init; + + T value = invalid_value; + + this(BaseType value) { this.value = T(value); } + + U opCast(U : Handle!(V, M), V, int M)() { + // TODO: verify that U derives from typeof(this)! + return U(value); + } + + U opCast(U : BaseType)() + { + return cast(U)value; + } + + alias value this; +} + +alias FD = Handle!(int, -1); +alias SocketFD = Handle!FD; +alias StreamSocketFD = Handle!SocketFD; +alias StreamListenSocketFD = Handle!SocketFD; +alias FileFD = Handle!FD; + +alias TimerID = Handle!int; +alias EventID = Handle!int; +alias EventWaitID = Handle!int; + diff --git a/source/eventcore/epoll.d b/source/eventcore/epoll.d new file mode 100644 index 0000000..05bf95c --- /dev/null +++ b/source/eventcore/epoll.d @@ -0,0 +1,87 @@ +module eventcore.epoll; +@safe: /*@nogc:*/ nothrow: + +public import eventcore.posix; +import eventcore.internal.utils; + +import core.time : Duration; +import core.sys.posix.sys.time : timeval; +import core.sys.linux.epoll; + + +final class EpollEventDriver : PosixEventDriver { + private { + int m_epoll; + epoll_event[] m_events; + } + + this() + { + m_epoll = () @trusted { return epoll_create1(0); } (); + m_events.length = 100; + } + + override void doProcessEvents(Duration timeout) + @trusted { + import std.algorithm : min; + //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); + + auto ts = timeout.toTimeVal; + + //print("wait %s", m_events.length); + auto ret = epoll_wait(m_epoll, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? -1 : cast(int)min(timeout.total!"msecs", int.max)); + //print("wait done %s", ret); + + if (ret > 0) { + foreach (ref evt; m_events[0 .. ret]) { + //print("event %s %s", evt.data.fd, evt.events); + auto fd = cast(FD)evt.data.fd; + if (evt.events & EPOLLIN) notify!(EventType.read)(fd); + if (evt.events & EPOLLOUT) notify!(EventType.write)(fd); + if (evt.events & EPOLLERR) notify!(EventType.status)(fd); + else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd); + } + } + } + + override void dispose() + { + close(m_epoll); + } + + override void registerFD(FD fd, EventMask mask) + { + //print("register %s %s", fd, mask); + epoll_event ev; + ev.events |= EPOLLET; + if (mask & EventMask.read) ev.events |= EPOLLIN; + if (mask & EventMask.write) ev.events |= EPOLLOUT; + if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; + ev.data.fd = fd; + () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, fd, &ev); } (); + } + + override void unregisterFD(FD fd) + { + () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, fd, null); } (); + } + + override void updateFD(FD fd, EventMask mask) + { + //print("update %s %s", fd, mask); + epoll_event ev; + //ev.events = EPOLLONESHOT; + if (mask & EventMask.read) ev.events |= EPOLLIN; + if (mask & EventMask.write) ev.events |= EPOLLOUT; + if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; + ev.data.fd = fd; + () @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, fd, &ev); } (); + } +} + +private timeval toTimeVal(Duration dur) +{ + timeval tvdur; + dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); + return tvdur; +} diff --git a/source/eventcore/internal/utils.d b/source/eventcore/internal/utils.d new file mode 100644 index 0000000..1f21c75 --- /dev/null +++ b/source/eventcore/internal/utils.d @@ -0,0 +1,112 @@ +module eventcore.internal.utils; +void print(ARGS...)(string str, ARGS args) +@trusted @nogc nothrow { + import std.format : formattedWrite; + StdoutRange r; + scope cb = () { + scope (failure) assert(false); + (&r).formattedWrite(str, args); + }; + (cast(void delegate() @nogc @safe nothrow)cb)(); + r.put('\n'); +} + +struct StdoutRange { + @safe: @nogc: nothrow: + import core.stdc.stdio; + + void put(string str) + { + () @trusted { fwrite(str.ptr, str.length, 1, stdout); } (); + } + + void put(char ch) + { + () @trusted { fputc(ch, stdout); } (); + } +} + +struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) { + static assert(nextPOT(CHUNK_SIZE) == CHUNK_SIZE, + "CHUNK_SIZE must be a power of two for performance reasons."); + + @safe: @nogc: nothrow: + import core.stdc.stdlib : calloc, free, malloc, realloc; + import std.traits : hasElaborateDestructor; + + static assert(!hasElaborateDestructor!T); + + alias chunkSize = CHUNK_SIZE; + + private { + alias Chunk = T[chunkSize]; + alias ChunkPtr = Chunk*; + ChunkPtr[] m_chunks; + size_t m_chunkCount; + size_t m_length; + } + + @disable this(this); + + ~this() + { + clear(); + } + + @property size_t length() const { return m_length; } + + void clear() + { + () @trusted { + foreach (i; 0 .. m_chunkCount) + free(m_chunks[i]); + free(m_chunks.ptr); + } (); + m_chunkCount = 0; + m_length = 0; + } + + ref T opIndex(size_t index) + { + auto chunk = index / chunkSize; + auto subidx = index % chunkSize; + if (index >= m_length) m_length = index+1; + reserveChunk(chunk); + return (*m_chunks[chunk])[subidx]; + } + + private void reserveChunk(size_t chunkidx) + { + if (m_chunks.length <= chunkidx) { + auto l = m_chunks.length == 0 ? 64 : m_chunks.length; + while (l <= chunkidx) l *= 2; + () @trusted { + auto newptr = cast(ChunkPtr*)realloc(m_chunks.ptr, l * ChunkPtr.length); + m_chunks = newptr[0 .. l]; + } (); + } + + while (m_chunkCount <= chunkidx) { + () @trusted { m_chunks[m_chunkCount++] = cast(ChunkPtr)calloc(chunkSize, T.sizeof); } (); + } + } +} + +private size_t nextPOT(size_t n) +{ + foreach_reverse (i; 0 .. size_t.sizeof*8) { + size_t ni = cast(size_t)1 << i; + if (n & ni) { + return n & (ni-1) ? ni << 1 : ni; + } + } + return 1; +} + +unittest { + assert(nextPOT(1) == 1); + assert(nextPOT(2) == 2); + assert(nextPOT(3) == 4); + assert(nextPOT(4) == 4); + assert(nextPOT(5) == 8); +} diff --git a/source/eventcore/posix.d b/source/eventcore/posix.d new file mode 100644 index 0000000..2ed7cd3 --- /dev/null +++ b/source/eventcore/posix.d @@ -0,0 +1,603 @@ +module eventcore.posix; +@safe: /*@nogc:*/ nothrow: + +public import eventcore.driver; +import eventcore.timer; +import eventcore.internal.utils; + +import std.socket : Address, AddressFamily, UnknownAddress; +import core.sys.posix.netinet.in_; +import core.sys.posix.netinet.tcp; +import core.sys.posix.unistd : close; +import core.stdc.errno : errno, EAGAIN, EINPROGRESS; +import core.sys.posix.fcntl; +version (Windows) import core.sys.windows.winsock; + + +private long currStdTime() +{ + import std.datetime : Clock; + scope (failure) assert(false); + return Clock.currStdTime; +} + +abstract class PosixEventDriver : EventDriver { + @safe: /*@nogc:*/ nothrow: + + private { + ChoppedVector!FDSlot m_fds; + size_t m_waiterCount = 0; + } + + mixin DefaultTimerImpl!(); + + protected int maxFD() const { return cast(int)m_fds.length; } + + @property size_t waiterCount() const { return m_waiterCount; } + + final override void processEvents(Duration timeout) + { + import std.algorithm : min; + import core.time : seconds; + + if (timeout <= 0.seconds) { + doProcessEvents(0.seconds); + processTimers(currStdTime); + } else { + long now = currStdTime; + do { + auto nextto = min(getNextTimeout(now), timeout); + doProcessEvents(nextto); + long prev_step = now; + now = currStdTime; + processTimers(now); + if (timeout != Duration.max) + timeout -= (now - prev_step).hnsecs; + } while (timeout > 0.seconds); + } + } + + protected abstract void doProcessEvents(Duration dur); + abstract void dispose(); + + final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) + { + auto sock = cast(StreamSocketFD)createSocket(address.addressFamily); + if (sock == -1) return StreamSocketFD.invalid; + + void invalidateSocket() @nogc @trusted nothrow { close(sock); sock = StreamSocketFD.invalid; } + + scope bind_addr = new UnknownAddress; + bind_addr.name.sa_family = cast(ushort)address.addressFamily; + bind_addr.name.sa_data[] = 0; + if (() @trusted { return bind(sock, bind_addr.name, bind_addr.nameLen); } () != 0) { + invalidateSocket(); + on_connect(sock, ConnectStatus.bindFailure); + return sock; + } + + registerFD(sock, EventMask.read|EventMask.write|EventMask.status); + + auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } (); + if (ret == 0) { + on_connect(sock, ConnectStatus.connected); + } else { + auto err = errno; + if (err == EINPROGRESS) { + with (m_fds[sock]) { + connectCallback = on_connect; + } + startNotify!(EventType.write)(sock, &onConnect); + } else { + unregisterFD(sock); + invalidateSocket(); + on_connect(sock, ConnectStatus.unknownError); + } + } + + addFD(sock); + + return sock; + } + + private void onConnect(FD sock) + { + stopNotify!(EventType.status)(sock); + stopNotify!(EventType.write)(sock); + m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); + } + + private void onConnectError(FD sock) + { + stopNotify!(EventType.status)(sock); + stopNotify!(EventType.write)(sock); + // FIXME: determine the correct kind of error! + m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); + } + + final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) + { + auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily); + + void invalidateSocket() @nogc @trusted nothrow { close(sock); sock = StreamSocketFD.invalid; } + + () @trusted { + int tmp_reuse = 1; + // FIXME: error handling! + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { + invalidateSocket(); + } else if (bind(sock, address.name, address.nameLen) != 0) { + invalidateSocket(); + } else if (listen(sock, 128) != 0) { + invalidateSocket(); + } else { scope (failure) assert(false); import std.stdio; writeln("Success!"); } + } (); + + if (on_accept && sock != StreamListenSocketFD.invalid) + waitForConnections(sock, on_accept); + + return sock; + } + + final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) + { + registerFD(sock, EventMask.read); + addFD(sock); + m_fds[sock].acceptCallback = on_accept; + startNotify!(EventType.read)(sock, &onAccept); + } + + private void onAccept(FD listenfd) + { + scope addr = new UnknownAddress; + foreach (i; 0 .. 20) { + int sockfd; + uint addr_len = addr.nameLen; + () @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } (); + if (sockfd == -1) break; + () @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } (); + registerFD(cast(FD)sockfd, EventMask.read|EventMask.write|EventMask.status); + addFD(cast(FD)sockfd); + //print("accept %d", sockfd); + m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, cast(StreamSocketFD)sockfd); + } + } + + final override void setTCPNoDelay(StreamSocketFD socket, bool enable) + { + int opt = enable; + () @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); + } + + final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.all) + { + sizediff_t ret; + () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + print("sock error %s!", err); + on_read_finish(socket, IOStatus.error, 0); + return; + } + } + + size_t bytes_read = 0; + + if (ret == 0) { + on_read_finish(socket, IOStatus.disconnected, 0); + return; + } + + if (ret < 0 && mode == IOMode.immediate) { + on_read_finish(socket, IOStatus.wouldBlock, 0); + return; + } + + if (ret > 0) { + bytes_read += ret; + buffer = buffer[bytes_read .. $]; + if (mode != IOMode.all || buffer.length == 0) { + on_read_finish(socket, IOStatus.ok, bytes_read); + return; + } + } + + with (m_fds[socket]) { + readCallback = on_read_finish; + readMode = mode; + bytesRead = ret > 0 ? ret : 0; + readBuffer = buffer; + } + + startNotify!(EventType.read)(socket, &onSocketRead); + } + + private void onSocketRead(FD fd) + { + auto slot = &m_fds[fd]; + auto socket = cast(StreamSocketFD)fd; + + void finalize()(IOStatus status) + { + stopNotify!(EventType.read)(socket); + //m_fds[fd].readBuffer = null; + slot.readCallback(socket, status, slot.bytesRead); + } + + sizediff_t ret; + () @trusted { ret = recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + finalize(IOStatus.error); + return; + } + } + + if (ret == 0) { + finalize(IOStatus.disconnected); + return; + } + + if (ret > 0) { + slot.bytesRead += ret; + slot.readBuffer = slot.readBuffer[ret .. $]; + if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { + finalize(IOStatus.ok); + return; + } + } + } + + final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.all) + { + sizediff_t ret; + () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + on_write_finish(socket, IOStatus.error, 0); + return; + } + } + + size_t bytes_written = 0; + + if (ret == 0) { + on_write_finish(socket, IOStatus.disconnected, 0); + return; + } + + if (ret < 0 && mode == IOMode.immediate) { + on_write_finish(socket, IOStatus.wouldBlock, 0); + return; + } + + if (ret > 0) { + bytes_written += ret; + buffer = buffer[ret .. $]; + if (mode != IOMode.all || buffer.length == 0) { + on_write_finish(socket, IOStatus.ok, bytes_written); + return; + } + } + + with (m_fds[socket]) { + writeCallback = on_write_finish; + writeMode = mode; + bytesWritten = ret > 0 ? ret : 0; + writeBuffer = buffer; + } + + startNotify!(EventType.write)(socket, &onSocketWrite); + } + + private void onSocketWrite(FD fd) + { + auto slot = &m_fds[fd]; + auto socket = cast(StreamSocketFD)fd; + + sizediff_t ret; + () @trusted { ret = send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + stopNotify!(EventType.write)(socket); + slot.readCallback(socket, IOStatus.error, slot.bytesRead); + return; + } + } + + if (ret == 0) { + stopNotify!(EventType.write)(socket); + slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten); + return; + } + + if (ret > 0) { + slot.bytesWritten += ret; + slot.writeBuffer = slot.writeBuffer[ret .. $]; + if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { + stopNotify!(EventType.write)(socket); + slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); + return; + } + } + } + + final override void waitSocketData(StreamSocketFD socket, IOCallback on_data_available) + { + sizediff_t ret; + ubyte dummy; + () @trusted { ret = recv(socket, &dummy, 1, MSG_PEEK); } (); + + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) { + on_data_available(socket, IOStatus.error, 0); + return; + } + } + + size_t bytes_read = 0; + + if (ret == 0) { + on_data_available(socket, IOStatus.disconnected, 0); + return; + } + + if (ret > 0) { + on_data_available(socket, IOStatus.ok, 0); + return; + } + + with (m_fds[socket]) { + readCallback = on_data_available; + readMode = IOMode.once; + bytesRead = 0; + readBuffer = null; + } + + startNotify!(EventType.read)(socket, &onSocketDataAvailable); + } + + private void onSocketDataAvailable(FD fd) + { + auto slot = &m_fds[fd]; + auto socket = cast(StreamSocketFD)fd; + + void finalize()(IOStatus status) + { + stopNotify!(EventType.read)(socket); + //m_fds[fd].readBuffer = null; + slot.readCallback(socket, status, 0); + } + + sizediff_t ret; + ubyte tmp; + () @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } (); + if (ret < 0) { + auto err = errno; + if (err != EAGAIN) finalize(IOStatus.error); + } else finalize(ret ? IOStatus.ok : IOStatus.disconnected); + } + + final override void shutdownSocket(StreamSocketFD socket, bool shut_read, bool shut_write) + { + // TODO! + } + + final override EventID createEvent() + { + assert(false); + } + + final override void triggerEvent(EventID event, bool notify_all = true) + { + assert(false); + } + + final override EventWaitID waitForEvent(EventID event, EventCallback on_event) + { + assert(false); + } + + final override void stopWaitingForEvent(EventID event, EventWaitID wait_id) + { + assert(false); + } + + final override void addRef(SocketFD fd) + { + auto pfd = &m_fds[fd]; + assert(pfd.refCount > 0); + m_fds[fd].refCount++; + } + + final override void addRef(FileFD descriptor) + { + assert(false); + } + + final override void addRef(EventID descriptor) + { + assert(false); + } + + final override void releaseRef(SocketFD fd) + { + auto pfd = &m_fds[fd]; + assert(pfd.refCount > 0); + if (--m_fds[fd].refCount == 0) { + unregisterFD(fd); + clearFD(fd); + close(fd); + } + } + + final override void releaseRef(FileFD descriptor) + { + assert(false); + } + + final override void releaseRef(TimerID descriptor) + { + assert(false); + } + + final override void releaseRef(EventID descriptor) + { + assert(false); + } + + + /// Registers the FD for general notification reception. + protected abstract void registerFD(FD fd, EventMask mask); + /// Unregisters the FD for general notification reception. + protected abstract void unregisterFD(FD fd); + /// Updates the event mask to use for listening for notifications. + protected abstract void updateFD(FD fd, EventMask mask); + + final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) + { + // TODO: optimize! + foreach (i; 0 .. cast(int)m_fds.length) + if (m_fds[i].callback[evt]) + del(cast(FD)i); + } + + final protected void notify(EventType evt)(FD fd) + { + //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); + if (m_fds[fd].callback[evt]) + m_fds[fd].callback[evt](fd); + } + + private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) + { + assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); + m_fds[fd].callback[evt] = callback; + assert(m_fds[0].callback[evt] is null); + m_waiterCount++; + updateFD(fd, m_fds[fd].eventMask); + } + + private void stopNotify(EventType evt)(FD fd) + { + assert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); + m_fds[fd].callback[evt] = null; + m_waiterCount--; + updateFD(fd, m_fds[fd].eventMask); + } + + private SocketFD createSocket(AddressFamily family) + { + int sock; + () @trusted { sock = socket(family, SOCK_STREAM, 0); } (); + if (sock == -1) return SocketFD.invalid; + int on = 1; + () @trusted { fcntl(sock, F_SETFL, O_NONBLOCK, on); } (); + return cast(SocketFD)sock; + } + + private void addFD(FD fd) + { + m_fds[fd].refCount = 1; + } + + private void clearFD(FD fd) + { + m_fds[fd] = FDSlot.init; + } +} + +alias FDEnumerateCallback = void delegate(FD); + +alias FDSlotCallback = void delegate(FD); + +private struct FDSlot { + FDSlotCallback[EventType.max+1] callback; + + uint refCount; + + size_t bytesRead; + ubyte[] readBuffer; + IOMode readMode; + IOCallback readCallback; + + size_t bytesWritten; + const(ubyte)[] writeBuffer; + IOMode writeMode; + IOCallback writeCallback; + + ConnectCallback connectCallback; + AcceptCallback acceptCallback; + + @property EventMask eventMask() const nothrow { + EventMask ret = cast(EventMask)0; + if (callback[EventType.read] !is null) ret |= EventMask.read; + if (callback[EventType.write] !is null) ret |= EventMask.write; + if (callback[EventType.status] !is null) ret |= EventMask.status; + return ret; + } +} + +enum EventType { + read, + write, + status +} + +enum EventMask { + read = 1<<0, + write = 1<<1, + status = 1<<2 +} + + +/*version (Windows) { + import std.c.windows.windows; + import std.c.windows.winsock; + + alias EWOULDBLOCK = WSAEWOULDBLOCK; + + extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments); + + class WSAErrorException : Exception { + int error; + + this(string message, string file = __FILE__, size_t line = __LINE__) + { + error = WSAGetLastError(); + this(message, error, file, line); + } + + this(string message, int error, string file = __FILE__, size_t line = __LINE__) + { + import std.string : format; + ushort* errmsg; + FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS, + null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null); + size_t len = 0; + while (errmsg[len]) len++; + auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup; + LocalFree(errmsg); + super(format("%s: %s (%s)", message, errmsgd, error), file, line); + } + } + + alias SystemSocketException = WSAErrorException; +} else { + import std.exception : ErrnoException; + alias SystemSocketException = ErrnoException; +} + +T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__) +{ + import std.exception : enforceEx; + return enforceEx!SystemSocketException(value, msg, file, line); +}*/ diff --git a/source/eventcore/select.d b/source/eventcore/select.d new file mode 100644 index 0000000..1abecc7 --- /dev/null +++ b/source/eventcore/select.d @@ -0,0 +1,76 @@ +module eventcore.select; +@safe: /*@nogc:*/ nothrow: + +public import eventcore.posix; +import eventcore.internal.utils; + +import core.time : Duration; +import core.sys.posix.sys.time : timeval; +import core.sys.posix.sys.select; + + +final class SelectEventDriver : PosixEventDriver { + override void doProcessEvents(Duration timeout) + { + //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); +//scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs"); +//scope (success) writefln("%.3f: process out", Clock.currAppTick.usecs * 1e-3); + + auto ts = timeout.toTimeVal; + + fd_set readfds, writefds, statusfds; + + () @trusted { + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&statusfds); + } (); + enumerateFDs!(EventType.read)((fd) @trusted { FD_SET(fd, &readfds); }); + enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); }); + enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); }); + +//print("Wait for event..."); +//writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3); + auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } (); +//writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3); +//print("Done wait for event..."); + if (ret > 0) { + enumerateFDs!(EventType.read)((fd) @trusted { + if (FD_ISSET(fd, &readfds)) + notify!(EventType.read)(fd); + }); + enumerateFDs!(EventType.write)((fd) @trusted { + if (FD_ISSET(fd, &writefds)) + notify!(EventType.write)(fd); + }); + enumerateFDs!(EventType.status)((fd) @trusted { + if (FD_ISSET(fd, &statusfds)) + notify!(EventType.status)(fd); + }); + } + } + + override void dispose() + { + + } + + override void registerFD(FD fd, EventMask mask) + { + } + + override void unregisterFD(FD fd) + { + } + + override void updateFD(FD fd, EventMask mask) + { + } +} + +private timeval toTimeVal(Duration dur) +{ + timeval tvdur; + dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); + return tvdur; +} diff --git a/source/eventcore/timer.d b/source/eventcore/timer.d new file mode 100644 index 0000000..d04b550 --- /dev/null +++ b/source/eventcore/timer.d @@ -0,0 +1,147 @@ +module eventcore.timer; + +import eventcore.driver; + + +mixin template DefaultTimerImpl() { + import std.experimental.allocator.building_blocks.free_list; + import std.experimental.allocator.building_blocks.region; + import std.experimental.allocator.mallocator; + import std.experimental.allocator : dispose, make; + import std.container.array; + import std.datetime : Clock; + import std.range : SortedRange, assumeSorted, take; + import core.time : hnsecs; + + private { + static FreeList!(Mallocator, TimerSlot.sizeof) ms_allocator; + TimerSlot*[TimerID] m_timers; + Array!(TimerSlot*) m_timerQueue; + TimerID m_lastTimerID; + TimerSlot*[] m_firedTimers; + } + + static this() + { + ms_allocator.parent = Mallocator.instance; + } + + final protected Duration getNextTimeout(long stdtime) + { + return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max; + } + + final protected void processTimers(long stdtime) + @trusted { + assert(m_firedTimers.length == 0); + if (m_timerQueue.empty) return; + + TimerSlot ts = void; + ts.timeout = stdtime+1; + auto fired = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).lowerBound(&ts); + foreach (tm; fired) { + if (tm.repeatDuration > 0) { + do tm.timeout += tm.repeatDuration; + while (tm.timeout <= stdtime); + auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); + try m_timerQueue.insertBefore(tail.release, tm); + catch (Exception e) { print("Failed to insert timer: %s", e.msg); } + } else tm.pending = false; + m_firedTimers ~= tm; + } + + // NOTE: this isn't yet verified to work under all circumstances + auto elems = m_timerQueue[0 .. fired.length]; + scope (failure) assert(false); + m_timerQueue.linearRemove(elems); + + foreach (tm; m_firedTimers) + tm.callback(tm.id); + + m_firedTimers.length = 0; + m_firedTimers.assumeSafeAppend(); + } + + final override TimerID createTimer(TimerCallback callback) + @trusted { + auto id = cast(TimerID)(m_lastTimerID + 1); + TimerSlot* tm; + try tm = ms_allocator.make!TimerSlot; + catch (Exception e) return TimerID.invalid; + assert(tm !is null); + tm.id = id; + tm.refCount = 1; + tm.callback = callback; + m_timers[id] = tm; + return id; + } + + final override void setTimer(TimerID timer, Duration timeout, Duration repeat) + @trusted { + scope (failure) assert(false); + auto tm = m_timers[timer]; + if (tm.pending) stopTimer(timer); + tm.timeout = Clock.currStdTime + timeout.total!"hnsecs"; + tm.repeatDuration = repeat.total!"hnsecs"; + tm.pending = true; + + auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); + try m_timerQueue.insertBefore(largerRange.release, tm); + catch (Exception e) { print("Failed to insert timer: %s", e.msg); } + } + + final override void stopTimer(TimerID timer) + @trusted { + auto tm = m_timers[timer]; + if (!tm.pending) return; + tm.pending = false; + tm.callback = null; + + TimerSlot cmp = void; + cmp.timeout = tm.timeout-1; + auto upper = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(&cmp); + assert(!upper.empty); + while (!upper.empty) { + assert(upper.front.timeout == tm.timeout); + if (upper.front is tm) { + scope (failure) assert(false); + m_timerQueue.linearRemove(upper.release.take(1)); + break; + } + } + } + + final override bool isTimerPending(TimerID descriptor) + { + return m_timers[descriptor].pending; + } + + final override bool isTimerPeriodic(TimerID descriptor) + { + return m_timers[descriptor].repeatDuration > 0; + } + + final override void addRef(TimerID descriptor) + { + m_timers[descriptor].refCount++; + } + + final override void releaseRef(TimerID descriptor) + { + auto tm = m_timers[descriptor]; + if (!--tm.refCount) { + if (tm.pending) stopTimer(tm.id); + m_timers.remove(descriptor); + () @trusted { scope (failure) assert(false); ms_allocator.dispose(tm); } (); + } + } +} + +struct TimerSlot { + TimerID id; + uint refCount; + bool pending; + long timeout; // stdtime + long repeatDuration; + TimerCallback callback; +}