Initial version with sone partial Posix implementations.

This commit is contained in:
Sönke Ludwig 2016-01-11 21:33:49 +01:00
commit 2a926d87aa
13 changed files with 1566 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
.dub

3
dub.sdl Normal file
View file

@ -0,0 +1,3 @@
name "eventcore"
description "Callback based abstraction layer over operating system asynchronous I/O facilities."

View file

@ -0,0 +1,4 @@
name "http-server-example"
description "Simple pseudo HTTP server suitable for benchmarking"
dependency "eventcore" path="../.."

View file

@ -0,0 +1,229 @@
import eventcore.core;
import eventcore.internal.utils;
import std.functional : toDelegate;
import std.socket : InternetAddress;
import std.exception : enforce;
import std.typecons : Rebindable, RefCounted;
import core.thread : Fiber;
Fiber[] store = new Fiber[20000];
size_t storeSize = 0;
Fiber getFiber()
nothrow {
if (storeSize > 0) return store[--storeSize];
return new Fiber({});
}
void done(Fiber f)
nothrow {
if (storeSize < store.length)
store[storeSize++] = f;
}
struct AsyncBlocker {
@safe:
bool done;
Rebindable!(const(Exception)) exception;
Fiber owner;
void start()
nothrow {
assert(owner is null);
done = false;
exception = null;
() @trusted { owner = Fiber.getThis(); } ();
}
void wait()
{
() @trusted { while (!done) Fiber.yield(); } ();
auto ex = cast(const(Exception))exception;
owner = null;
done = false;
exception = null;
if (ex) throw ex;
}
void finish(const(Exception) e = null)
nothrow {
assert(!done && owner !is null);
exception = e;
done = true;
() @trusted { scope (failure) assert(false); if (owner.state == Fiber.State.HOLD) owner.call(); } ();
}
}
alias StreamConnection = RefCounted!StreamConnectionImpl;
struct StreamConnectionImpl {
@safe: /*@nogc:*/
private {
StreamSocketFD m_socket;
bool m_empty = false;
AsyncBlocker writer;
AsyncBlocker reader;
ubyte[] m_readBuffer;
size_t m_readBufferFill;
ubyte[] m_line;
}
this(StreamSocketFD sock, ubyte[] buffer)
nothrow {
m_socket = sock;
m_readBuffer = buffer;
}
~this()
nothrow {
if (m_socket != StreamSocketFD.invalid)
eventDriver.releaseRef(m_socket);
}
@property bool empty()
{
reader.start();
eventDriver.waitSocketData(m_socket, &onData);
reader.wait();
return m_empty;
}
ubyte[] readLine()
{
reader.start();
if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0);
else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once);
reader.wait();
auto ln = m_line;
m_line = null;
return ln;
}
void write(const(ubyte)[] data)
{
writer.start();
eventDriver.writeSocket(m_socket, data, &onWrite, IOMode.all);
writer.wait();
}
void close()
nothrow {
eventDriver.releaseRef(m_socket);
m_socket = StreamSocketFD.invalid;
m_readBuffer = null;
}
private void onWrite(StreamSocketFD fd, IOStatus status, size_t len)
@safe nothrow {
static const ex = new Exception("Failed to write data!");
writer.finish(status == IOStatus.ok ? null : ex);
}
private void onData(StreamSocketFD, IOStatus status, size_t bytes_read)
@safe nothrow {
if (status != IOStatus.ok)
m_empty = true;
reader.finish();
}
private void onReadLineData(StreamSocketFD, IOStatus status, size_t bytes_read)
@safe nothrow {
static const ex = new Exception("Failed to read data!");
static const exh = new Exception("Header line too long.");
import std.algorithm : countUntil;
if (status != IOStatus.ok) {
reader.finish(ex);
return;
}
m_readBufferFill += bytes_read;
assert(m_readBufferFill <= m_readBuffer.length);
auto idx = m_readBuffer[0 .. m_readBufferFill].countUntil(cast(const(ubyte)[])"\r\n");
if (idx >= 0) {
m_readBuffer[m_readBufferFill .. m_readBufferFill + idx] = m_readBuffer[0 .. idx];
foreach (i; 0 .. m_readBufferFill - idx - 2)
m_readBuffer[i] = m_readBuffer[idx+2+i];
m_readBufferFill -= idx + 2;
m_line = m_readBuffer[m_readBufferFill + idx + 2 .. m_readBufferFill + idx + 2 + idx];
reader.finish();
} else if (m_readBuffer.length - m_readBufferFill > 0) {
eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once);
} else {
reader.finish(exh);
}
}
}
void main()
{
print("Starting up...");
auto addr = new InternetAddress("127.0.0.1", 8080);
auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect));
enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
import core.time : msecs;
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs);
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);
print("Listening for requests on port 8080...");
while (eventDriver.waiterCount)
eventDriver.processEvents();
}
void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client)
@trusted /*@nogc*/ nothrow {
import core.stdc.stdlib;
auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof);
handler.client = client;
auto f = getFiber();
f.reset(&handler.handleConnection);
scope (failure) assert(false);
f.call();
}
struct ClientHandler {
@safe: /*@nogc:*/ nothrow:
StreamSocketFD client;
@disable this(this);
void handleConnection()
@trusted {
ubyte[512] linebuf = void;
auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!";
auto conn = StreamConnection(client, linebuf);
try {
while (!conn.empty) {
conn.readLine();
ubyte[] ln;
do ln = conn.readLine();
while (ln.length > 0);
conn.write(reply);
}
//print("close %s", cast(int)client);
} catch (Exception e) {
print("close %s: %s", cast(int)client, e.msg);
}
conn.close();
done(Fiber.getThis());
}
}

View file

@ -0,0 +1,4 @@
name "http-server-example"
description "Simple pseudo HTTP server suitable for benchmarking"
dependency "eventcore" path="../.."

View file

@ -0,0 +1,112 @@
import eventcore.core;
import eventcore.internal.utils;
import std.functional : toDelegate;
import std.socket : InternetAddress;
import std.exception : enforce;
void main()
{
print("Starting up...");
auto addr = new InternetAddress("127.0.0.1", 8080);
auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect));
enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
print("Listening for requests on port 8080...");
while (eventDriver.waiterCount)
eventDriver.processEvents();
}
void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client)
@trusted /*@nogc*/ nothrow {
import core.stdc.stdlib;
auto handler = cast(ClientHandler*)calloc(1, ClientHandler.sizeof);
handler.client = client;
handler.handleConnection();
}
struct ClientHandler {
@safe: /*@nogc:*/ nothrow:
alias LineCallback = void delegate(ubyte[]);
StreamSocketFD client;
ubyte[512] linebuf = void;
size_t linefill = 0;
LineCallback onLine;
@disable this(this);
void handleConnection()
{
int fd = client;
//import core.thread;
//() @trusted { print("Connection %d %s", fd, cast(void*)Thread.getThis()); } ();
readLine(&onRequestLine);
}
void readLine(LineCallback on_line)
{
onLine = on_line;
if (linefill >= 2) onReadData(client, IOStatus.ok, 0);
else eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once);
}
void onRequestLine(ubyte[] ln)
{
//print("Request: %s", cast(char[])ln);
if (ln.length == 0) {
//print("Error: empty request line");
eventDriver.shutdownSocket(client);
eventDriver.releaseRef(client);
}
readLine(&onHeaderLine);
}
void onHeaderLine(ubyte[] ln)
{
if (ln.length == 0) {
auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!";
eventDriver.writeSocket(client, reply, &onWriteFinished, IOMode.all);
} else readLine(&onHeaderLine);
}
void onWriteFinished(StreamSocketFD fd, IOStatus status, size_t len)
{
readLine(&onRequestLine);
}
void onReadData(StreamSocketFD, IOStatus status, size_t bytes_read)
{
import std.algorithm : countUntil;
if (status != IOStatus.ok) {
print("Client disconnect");
eventDriver.shutdownSocket(client);
eventDriver.releaseRef(client);
return;
}
linefill += bytes_read;
assert(linefill <= linebuf.length);
auto idx = linebuf[0 .. linefill].countUntil(cast(const(ubyte)[])"\r\n");
if (idx >= 0) {
linebuf[linefill .. linefill + idx] = linebuf[0 .. idx];
foreach (i; 0 .. linefill - idx - 2)
linebuf[i] = linebuf[idx+2+i];
linefill -= idx + 2;
onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]);
} else if (linebuf.length - linefill > 0) {
eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once);
} else {
// ERROR: header line too long
print("Header line too long");
eventDriver.shutdownSocket(client);
eventDriver.releaseRef(client);
}
}
}

26
source/eventcore/core.d Normal file
View file

@ -0,0 +1,26 @@
module eventcore.core;
public import eventcore.driver;
import eventcore.epoll;
import eventcore.select;
alias NativeEventDriver = SelectEventDriver;
@property EventDriver eventDriver()
@safe @nogc nothrow {
assert(s_driver !is null, "eventcore.core static constructor didn't run!?");
return s_driver;
}
static this()
{
if (!s_driver) s_driver = new NativeEventDriver;
}
shared static this()
{
s_driver = new NativeEventDriver;
}
private NativeEventDriver s_driver;

162
source/eventcore/driver.d Normal file
View file

@ -0,0 +1,162 @@
module eventcore.driver;
@safe: /*@nogc:*/ nothrow:
import core.time : Duration;
import std.socket : Address;
interface EventDriver {
@safe: /*@nogc:*/ nothrow:
//
// General functionality
//
/// Releases all resources associated with the driver
void dispose();
/**
The number of pending callbacks.
When this number drops to zero, the event loop can safely be quit. It is
guaranteed that no callbacks will be made anymore, unless new callbacks
get registered.
*/
size_t waiterCount();
/**
Runs the event loop to process a chunk of events.
This method optionally waits for an event to arrive if none are present
in the event queue. The function will return after either the specified
timeout has elapsed, or once the event queue has been fully emptied.
Params:
timeout = Maximum amount of time to wait for an event. A duration of
zero will cause the function to only process pending events. The
the default duration of `Duration.max`, if necessary, will wait
indefinitely until an event arrives.
*/
void processEvents(Duration timeout = Duration.max);
//
// TCP
//
StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect);
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
void setTCPNoDelay(StreamSocketFD socket, bool enable);
void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.once);
void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.once);
void waitSocketData(StreamSocketFD socket, IOCallback on_data_available);
void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true);
//
// Manual events
//
EventID createEvent();
void triggerEvent(EventID event, bool notify_all = true);
EventWaitID waitForEvent(EventID event, EventCallback on_event);
void stopWaitingForEvent(EventID event, EventWaitID wait_id);
//
// Timers
//
TimerID createTimer(TimerCallback callback);
void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero);
void stopTimer(TimerID timer);
bool isTimerPending(TimerID timer);
bool isTimerPeriodic(TimerID timer);
//
// Resource ownership
//
/**
Increments the reference count of the given resource.
*/
void addRef(SocketFD descriptor);
/// ditto
void addRef(FileFD descriptor);
/// ditto
void addRef(TimerID descriptor);
/// ditto
void addRef(EventID descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the descriptor gets invalidated.
*/
void releaseRef(SocketFD descriptor);
/// ditto
void releaseRef(FileFD descriptor);
/// ditto
void releaseRef(TimerID descriptor);
/// ditto
void releaseRef(EventID descriptor);
}
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD);
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID);
alias TimerCallback = void delegate(TimerID);
enum ConnectStatus {
connected,
refused,
timeout,
bindFailure,
unknownError
}
enum IOMode {
immediate, /// Process only as much as possible without waiting
once, /// Process as much as possible with a single call
all /// Process the full buffer
}
enum IOStatus {
ok, /// The data has been transferred normally
disconnected, /// The connection was closed before all data could be transterred
error, /// An error occured while transferring the data
wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable
}
struct Handle(T, T invalid_value = T.init, int MAGIC = __LINE__) {
static if (is(T : Handle!(V, M), V, int M)) alias BaseType = T.BaseType;
else alias BaseType = T;
enum invalid = Handle.init;
T value = invalid_value;
this(BaseType value) { this.value = T(value); }
U opCast(U : Handle!(V, M), V, int M)() {
// TODO: verify that U derives from typeof(this)!
return U(value);
}
U opCast(U : BaseType)()
{
return cast(U)value;
}
alias value this;
}
alias FD = Handle!(int, -1);
alias SocketFD = Handle!FD;
alias StreamSocketFD = Handle!SocketFD;
alias StreamListenSocketFD = Handle!SocketFD;
alias FileFD = Handle!FD;
alias TimerID = Handle!int;
alias EventID = Handle!int;
alias EventWaitID = Handle!int;

87
source/eventcore/epoll.d Normal file
View file

@ -0,0 +1,87 @@
module eventcore.epoll;
@safe: /*@nogc:*/ nothrow:
public import eventcore.posix;
import eventcore.internal.utils;
import core.time : Duration;
import core.sys.posix.sys.time : timeval;
import core.sys.linux.epoll;
final class EpollEventDriver : PosixEventDriver {
private {
int m_epoll;
epoll_event[] m_events;
}
this()
{
m_epoll = () @trusted { return epoll_create1(0); } ();
m_events.length = 100;
}
override void doProcessEvents(Duration timeout)
@trusted {
import std.algorithm : min;
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
auto ts = timeout.toTimeVal;
//print("wait %s", m_events.length);
auto ret = epoll_wait(m_epoll, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? -1 : cast(int)min(timeout.total!"msecs", int.max));
//print("wait done %s", ret);
if (ret > 0) {
foreach (ref evt; m_events[0 .. ret]) {
//print("event %s %s", evt.data.fd, evt.events);
auto fd = cast(FD)evt.data.fd;
if (evt.events & EPOLLIN) notify!(EventType.read)(fd);
if (evt.events & EPOLLOUT) notify!(EventType.write)(fd);
if (evt.events & EPOLLERR) notify!(EventType.status)(fd);
else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd);
}
}
}
override void dispose()
{
close(m_epoll);
}
override void registerFD(FD fd, EventMask mask)
{
//print("register %s %s", fd, mask);
epoll_event ev;
ev.events |= EPOLLET;
if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
ev.data.fd = fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, fd, &ev); } ();
}
override void unregisterFD(FD fd)
{
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, fd, null); } ();
}
override void updateFD(FD fd, EventMask mask)
{
//print("update %s %s", fd, mask);
epoll_event ev;
//ev.events = EPOLLONESHOT;
if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
ev.data.fd = fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, fd, &ev); } ();
}
}
private timeval toTimeVal(Duration dur)
{
timeval tvdur;
dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
return tvdur;
}

View file

@ -0,0 +1,112 @@
module eventcore.internal.utils;
void print(ARGS...)(string str, ARGS args)
@trusted @nogc nothrow {
import std.format : formattedWrite;
StdoutRange r;
scope cb = () {
scope (failure) assert(false);
(&r).formattedWrite(str, args);
};
(cast(void delegate() @nogc @safe nothrow)cb)();
r.put('\n');
}
struct StdoutRange {
@safe: @nogc: nothrow:
import core.stdc.stdio;
void put(string str)
{
() @trusted { fwrite(str.ptr, str.length, 1, stdout); } ();
}
void put(char ch)
{
() @trusted { fputc(ch, stdout); } ();
}
}
struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) {
static assert(nextPOT(CHUNK_SIZE) == CHUNK_SIZE,
"CHUNK_SIZE must be a power of two for performance reasons.");
@safe: @nogc: nothrow:
import core.stdc.stdlib : calloc, free, malloc, realloc;
import std.traits : hasElaborateDestructor;
static assert(!hasElaborateDestructor!T);
alias chunkSize = CHUNK_SIZE;
private {
alias Chunk = T[chunkSize];
alias ChunkPtr = Chunk*;
ChunkPtr[] m_chunks;
size_t m_chunkCount;
size_t m_length;
}
@disable this(this);
~this()
{
clear();
}
@property size_t length() const { return m_length; }
void clear()
{
() @trusted {
foreach (i; 0 .. m_chunkCount)
free(m_chunks[i]);
free(m_chunks.ptr);
} ();
m_chunkCount = 0;
m_length = 0;
}
ref T opIndex(size_t index)
{
auto chunk = index / chunkSize;
auto subidx = index % chunkSize;
if (index >= m_length) m_length = index+1;
reserveChunk(chunk);
return (*m_chunks[chunk])[subidx];
}
private void reserveChunk(size_t chunkidx)
{
if (m_chunks.length <= chunkidx) {
auto l = m_chunks.length == 0 ? 64 : m_chunks.length;
while (l <= chunkidx) l *= 2;
() @trusted {
auto newptr = cast(ChunkPtr*)realloc(m_chunks.ptr, l * ChunkPtr.length);
m_chunks = newptr[0 .. l];
} ();
}
while (m_chunkCount <= chunkidx) {
() @trusted { m_chunks[m_chunkCount++] = cast(ChunkPtr)calloc(chunkSize, T.sizeof); } ();
}
}
}
private size_t nextPOT(size_t n)
{
foreach_reverse (i; 0 .. size_t.sizeof*8) {
size_t ni = cast(size_t)1 << i;
if (n & ni) {
return n & (ni-1) ? ni << 1 : ni;
}
}
return 1;
}
unittest {
assert(nextPOT(1) == 1);
assert(nextPOT(2) == 2);
assert(nextPOT(3) == 4);
assert(nextPOT(4) == 4);
assert(nextPOT(5) == 8);
}

603
source/eventcore/posix.d Normal file
View file

@ -0,0 +1,603 @@
module eventcore.posix;
@safe: /*@nogc:*/ nothrow:
public import eventcore.driver;
import eventcore.timer;
import eventcore.internal.utils;
import std.socket : Address, AddressFamily, UnknownAddress;
import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp;
import core.sys.posix.unistd : close;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl;
version (Windows) import core.sys.windows.winsock;
private long currStdTime()
{
import std.datetime : Clock;
scope (failure) assert(false);
return Clock.currStdTime;
}
abstract class PosixEventDriver : EventDriver {
@safe: /*@nogc:*/ nothrow:
private {
ChoppedVector!FDSlot m_fds;
size_t m_waiterCount = 0;
}
mixin DefaultTimerImpl!();
protected int maxFD() const { return cast(int)m_fds.length; }
@property size_t waiterCount() const { return m_waiterCount; }
final override void processEvents(Duration timeout)
{
import std.algorithm : min;
import core.time : seconds;
if (timeout <= 0.seconds) {
doProcessEvents(0.seconds);
processTimers(currStdTime);
} else {
long now = currStdTime;
do {
auto nextto = min(getNextTimeout(now), timeout);
doProcessEvents(nextto);
long prev_step = now;
now = currStdTime;
processTimers(now);
if (timeout != Duration.max)
timeout -= (now - prev_step).hnsecs;
} while (timeout > 0.seconds);
}
}
protected abstract void doProcessEvents(Duration dur);
abstract void dispose();
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
{
auto sock = cast(StreamSocketFD)createSocket(address.addressFamily);
if (sock == -1) return StreamSocketFD.invalid;
void invalidateSocket() @nogc @trusted nothrow { close(sock); sock = StreamSocketFD.invalid; }
scope bind_addr = new UnknownAddress;
bind_addr.name.sa_family = cast(ushort)address.addressFamily;
bind_addr.name.sa_data[] = 0;
if (() @trusted { return bind(sock, bind_addr.name, bind_addr.nameLen); } () != 0) {
invalidateSocket();
on_connect(sock, ConnectStatus.bindFailure);
return sock;
}
registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } ();
if (ret == 0) {
on_connect(sock, ConnectStatus.connected);
} else {
auto err = errno;
if (err == EINPROGRESS) {
with (m_fds[sock]) {
connectCallback = on_connect;
}
startNotify!(EventType.write)(sock, &onConnect);
} else {
unregisterFD(sock);
invalidateSocket();
on_connect(sock, ConnectStatus.unknownError);
}
}
addFD(sock);
return sock;
}
private void onConnect(FD sock)
{
stopNotify!(EventType.status)(sock);
stopNotify!(EventType.write)(sock);
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
}
private void onConnectError(FD sock)
{
stopNotify!(EventType.status)(sock);
stopNotify!(EventType.write)(sock);
// FIXME: determine the correct kind of error!
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
}
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
{
auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily);
void invalidateSocket() @nogc @trusted nothrow { close(sock); sock = StreamSocketFD.invalid; }
() @trusted {
int tmp_reuse = 1;
// FIXME: error handling!
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
invalidateSocket();
} else if (bind(sock, address.name, address.nameLen) != 0) {
invalidateSocket();
} else if (listen(sock, 128) != 0) {
invalidateSocket();
} else { scope (failure) assert(false); import std.stdio; writeln("Success!"); }
} ();
if (on_accept && sock != StreamListenSocketFD.invalid)
waitForConnections(sock, on_accept);
return sock;
}
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{
registerFD(sock, EventMask.read);
addFD(sock);
m_fds[sock].acceptCallback = on_accept;
startNotify!(EventType.read)(sock, &onAccept);
}
private void onAccept(FD listenfd)
{
scope addr = new UnknownAddress;
foreach (i; 0 .. 20) {
int sockfd;
uint addr_len = addr.nameLen;
() @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } ();
if (sockfd == -1) break;
() @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } ();
registerFD(cast(FD)sockfd, EventMask.read|EventMask.write|EventMask.status);
addFD(cast(FD)sockfd);
//print("accept %d", sockfd);
m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, cast(StreamSocketFD)sockfd);
}
}
final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{
int opt = enable;
() @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
}
final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.all)
{
sizediff_t ret;
() @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_read = 0;
if (ret == 0) {
on_read_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret > 0) {
bytes_read += ret;
buffer = buffer[bytes_read .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, bytes_read);
return;
}
}
with (m_fds[socket]) {
readCallback = on_read_finish;
readMode = mode;
bytesRead = ret > 0 ? ret : 0;
readBuffer = buffer;
}
startNotify!(EventType.read)(socket, &onSocketRead);
}
private void onSocketRead(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
stopNotify!(EventType.read)(socket);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead);
}
sizediff_t ret;
() @trusted { ret = recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
finalize(IOStatus.error);
return;
}
}
if (ret == 0) {
finalize(IOStatus.disconnected);
return;
}
if (ret > 0) {
slot.bytesRead += ret;
slot.readBuffer = slot.readBuffer[ret .. $];
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
finalize(IOStatus.ok);
return;
}
}
}
final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.all)
{
sizediff_t ret;
() @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
on_write_finish(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_written = 0;
if (ret == 0) {
on_write_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
on_write_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret > 0) {
bytes_written += ret;
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_write_finish(socket, IOStatus.ok, bytes_written);
return;
}
}
with (m_fds[socket]) {
writeCallback = on_write_finish;
writeMode = mode;
bytesWritten = ret > 0 ? ret : 0;
writeBuffer = buffer;
}
startNotify!(EventType.write)(socket, &onSocketWrite);
}
private void onSocketWrite(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
sizediff_t ret;
() @trusted { ret = send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
stopNotify!(EventType.write)(socket);
slot.readCallback(socket, IOStatus.error, slot.bytesRead);
return;
}
}
if (ret == 0) {
stopNotify!(EventType.write)(socket);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten);
return;
}
if (ret > 0) {
slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
stopNotify!(EventType.write)(socket);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
return;
}
}
}
final override void waitSocketData(StreamSocketFD socket, IOCallback on_data_available)
{
sizediff_t ret;
ubyte dummy;
() @trusted { ret = recv(socket, &dummy, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) {
on_data_available(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_read = 0;
if (ret == 0) {
on_data_available(socket, IOStatus.disconnected, 0);
return;
}
if (ret > 0) {
on_data_available(socket, IOStatus.ok, 0);
return;
}
with (m_fds[socket]) {
readCallback = on_data_available;
readMode = IOMode.once;
bytesRead = 0;
readBuffer = null;
}
startNotify!(EventType.read)(socket, &onSocketDataAvailable);
}
private void onSocketDataAvailable(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
stopNotify!(EventType.read)(socket);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, 0);
}
sizediff_t ret;
ubyte tmp;
() @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = errno;
if (err != EAGAIN) finalize(IOStatus.error);
} else finalize(ret ? IOStatus.ok : IOStatus.disconnected);
}
final override void shutdownSocket(StreamSocketFD socket, bool shut_read, bool shut_write)
{
// TODO!
}
final override EventID createEvent()
{
assert(false);
}
final override void triggerEvent(EventID event, bool notify_all = true)
{
assert(false);
}
final override EventWaitID waitForEvent(EventID event, EventCallback on_event)
{
assert(false);
}
final override void stopWaitingForEvent(EventID event, EventWaitID wait_id)
{
assert(false);
}
final override void addRef(SocketFD fd)
{
auto pfd = &m_fds[fd];
assert(pfd.refCount > 0);
m_fds[fd].refCount++;
}
final override void addRef(FileFD descriptor)
{
assert(false);
}
final override void addRef(EventID descriptor)
{
assert(false);
}
final override void releaseRef(SocketFD fd)
{
auto pfd = &m_fds[fd];
assert(pfd.refCount > 0);
if (--m_fds[fd].refCount == 0) {
unregisterFD(fd);
clearFD(fd);
close(fd);
}
}
final override void releaseRef(FileFD descriptor)
{
assert(false);
}
final override void releaseRef(TimerID descriptor)
{
assert(false);
}
final override void releaseRef(EventID descriptor)
{
assert(false);
}
/// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask);
/// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd);
/// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask mask);
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
{
// TODO: optimize!
foreach (i; 0 .. cast(int)m_fds.length)
if (m_fds[i].callback[evt])
del(cast(FD)i);
}
final protected void notify(EventType evt)(FD fd)
{
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd].callback[evt])
m_fds[fd].callback[evt](fd);
}
private void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{
assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
m_fds[fd].callback[evt] = callback;
assert(m_fds[0].callback[evt] is null);
m_waiterCount++;
updateFD(fd, m_fds[fd].eventMask);
}
private void stopNotify(EventType evt)(FD fd)
{
assert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
m_fds[fd].callback[evt] = null;
m_waiterCount--;
updateFD(fd, m_fds[fd].eventMask);
}
private SocketFD createSocket(AddressFamily family)
{
int sock;
() @trusted { sock = socket(family, SOCK_STREAM, 0); } ();
if (sock == -1) return SocketFD.invalid;
int on = 1;
() @trusted { fcntl(sock, F_SETFL, O_NONBLOCK, on); } ();
return cast(SocketFD)sock;
}
private void addFD(FD fd)
{
m_fds[fd].refCount = 1;
}
private void clearFD(FD fd)
{
m_fds[fd] = FDSlot.init;
}
}
alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD);
private struct FDSlot {
FDSlotCallback[EventType.max+1] callback;
uint refCount;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
IOCallback readCallback;
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
IOCallback writeCallback;
ConnectCallback connectCallback;
AcceptCallback acceptCallback;
@property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0;
if (callback[EventType.read] !is null) ret |= EventMask.read;
if (callback[EventType.write] !is null) ret |= EventMask.write;
if (callback[EventType.status] !is null) ret |= EventMask.status;
return ret;
}
}
enum EventType {
read,
write,
status
}
enum EventMask {
read = 1<<0,
write = 1<<1,
status = 1<<2
}
/*version (Windows) {
import std.c.windows.windows;
import std.c.windows.winsock;
alias EWOULDBLOCK = WSAEWOULDBLOCK;
extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments);
class WSAErrorException : Exception {
int error;
this(string message, string file = __FILE__, size_t line = __LINE__)
{
error = WSAGetLastError();
this(message, error, file, line);
}
this(string message, int error, string file = __FILE__, size_t line = __LINE__)
{
import std.string : format;
ushort* errmsg;
FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS,
null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null);
size_t len = 0;
while (errmsg[len]) len++;
auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup;
LocalFree(errmsg);
super(format("%s: %s (%s)", message, errmsgd, error), file, line);
}
}
alias SystemSocketException = WSAErrorException;
} else {
import std.exception : ErrnoException;
alias SystemSocketException = ErrnoException;
}
T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__)
{
import std.exception : enforceEx;
return enforceEx!SystemSocketException(value, msg, file, line);
}*/

76
source/eventcore/select.d Normal file
View file

@ -0,0 +1,76 @@
module eventcore.select;
@safe: /*@nogc:*/ nothrow:
public import eventcore.posix;
import eventcore.internal.utils;
import core.time : Duration;
import core.sys.posix.sys.time : timeval;
import core.sys.posix.sys.select;
final class SelectEventDriver : PosixEventDriver {
override void doProcessEvents(Duration timeout)
{
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
//scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs");
//scope (success) writefln("%.3f: process out", Clock.currAppTick.usecs * 1e-3);
auto ts = timeout.toTimeVal;
fd_set readfds, writefds, statusfds;
() @trusted {
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&statusfds);
} ();
enumerateFDs!(EventType.read)((fd) @trusted { FD_SET(fd, &readfds); });
enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); });
enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); });
//print("Wait for event...");
//writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3);
auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } ();
//writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3);
//print("Done wait for event...");
if (ret > 0) {
enumerateFDs!(EventType.read)((fd) @trusted {
if (FD_ISSET(fd, &readfds))
notify!(EventType.read)(fd);
});
enumerateFDs!(EventType.write)((fd) @trusted {
if (FD_ISSET(fd, &writefds))
notify!(EventType.write)(fd);
});
enumerateFDs!(EventType.status)((fd) @trusted {
if (FD_ISSET(fd, &statusfds))
notify!(EventType.status)(fd);
});
}
}
override void dispose()
{
}
override void registerFD(FD fd, EventMask mask)
{
}
override void unregisterFD(FD fd)
{
}
override void updateFD(FD fd, EventMask mask)
{
}
}
private timeval toTimeVal(Duration dur)
{
timeval tvdur;
dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
return tvdur;
}

147
source/eventcore/timer.d Normal file
View file

@ -0,0 +1,147 @@
module eventcore.timer;
import eventcore.driver;
mixin template DefaultTimerImpl() {
import std.experimental.allocator.building_blocks.free_list;
import std.experimental.allocator.building_blocks.region;
import std.experimental.allocator.mallocator;
import std.experimental.allocator : dispose, make;
import std.container.array;
import std.datetime : Clock;
import std.range : SortedRange, assumeSorted, take;
import core.time : hnsecs;
private {
static FreeList!(Mallocator, TimerSlot.sizeof) ms_allocator;
TimerSlot*[TimerID] m_timers;
Array!(TimerSlot*) m_timerQueue;
TimerID m_lastTimerID;
TimerSlot*[] m_firedTimers;
}
static this()
{
ms_allocator.parent = Mallocator.instance;
}
final protected Duration getNextTimeout(long stdtime)
{
return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max;
}
final protected void processTimers(long stdtime)
@trusted {
assert(m_firedTimers.length == 0);
if (m_timerQueue.empty) return;
TimerSlot ts = void;
ts.timeout = stdtime+1;
auto fired = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).lowerBound(&ts);
foreach (tm; fired) {
if (tm.repeatDuration > 0) {
do tm.timeout += tm.repeatDuration;
while (tm.timeout <= stdtime);
auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(tail.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); }
} else tm.pending = false;
m_firedTimers ~= tm;
}
// NOTE: this isn't yet verified to work under all circumstances
auto elems = m_timerQueue[0 .. fired.length];
scope (failure) assert(false);
m_timerQueue.linearRemove(elems);
foreach (tm; m_firedTimers)
tm.callback(tm.id);
m_firedTimers.length = 0;
m_firedTimers.assumeSafeAppend();
}
final override TimerID createTimer(TimerCallback callback)
@trusted {
auto id = cast(TimerID)(m_lastTimerID + 1);
TimerSlot* tm;
try tm = ms_allocator.make!TimerSlot;
catch (Exception e) return TimerID.invalid;
assert(tm !is null);
tm.id = id;
tm.refCount = 1;
tm.callback = callback;
m_timers[id] = tm;
return id;
}
final override void setTimer(TimerID timer, Duration timeout, Duration repeat)
@trusted {
scope (failure) assert(false);
auto tm = m_timers[timer];
if (tm.pending) stopTimer(timer);
tm.timeout = Clock.currStdTime + timeout.total!"hnsecs";
tm.repeatDuration = repeat.total!"hnsecs";
tm.pending = true;
auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(largerRange.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); }
}
final override void stopTimer(TimerID timer)
@trusted {
auto tm = m_timers[timer];
if (!tm.pending) return;
tm.pending = false;
tm.callback = null;
TimerSlot cmp = void;
cmp.timeout = tm.timeout-1;
auto upper = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(&cmp);
assert(!upper.empty);
while (!upper.empty) {
assert(upper.front.timeout == tm.timeout);
if (upper.front is tm) {
scope (failure) assert(false);
m_timerQueue.linearRemove(upper.release.take(1));
break;
}
}
}
final override bool isTimerPending(TimerID descriptor)
{
return m_timers[descriptor].pending;
}
final override bool isTimerPeriodic(TimerID descriptor)
{
return m_timers[descriptor].repeatDuration > 0;
}
final override void addRef(TimerID descriptor)
{
m_timers[descriptor].refCount++;
}
final override void releaseRef(TimerID descriptor)
{
auto tm = m_timers[descriptor];
if (!--tm.refCount) {
if (tm.pending) stopTimer(tm.id);
m_timers.remove(descriptor);
() @trusted { scope (failure) assert(false); ms_allocator.dispose(tm); } ();
}
}
}
struct TimerSlot {
TimerID id;
uint refCount;
bool pending;
long timeout; // stdtime
long repeatDuration;
TimerCallback callback;
}