Add EventDriver.exit and move all event callback parameters to the end.

This commit is contained in:
Sönke Ludwig 2016-01-27 11:02:54 +01:00
parent 3d8183248c
commit 844e955cdb
7 changed files with 125 additions and 36 deletions

View file

@ -98,7 +98,7 @@ struct StreamConnectionImpl {
{ {
reader.start(); reader.start();
if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0);
else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
reader.wait(); reader.wait();
auto ln = m_line; auto ln = m_line;
m_line = null; m_line = null;
@ -108,7 +108,7 @@ struct StreamConnectionImpl {
void write(const(ubyte)[] data) void write(const(ubyte)[] data)
{ {
writer.start(); writer.start();
eventDriver.writeSocket(m_socket, data, &onWrite, IOMode.all); eventDriver.writeSocket(m_socket, data, IOMode.all, &onWrite);
writer.wait(); writer.wait();
} }
@ -159,7 +159,7 @@ struct StreamConnectionImpl {
reader.finish(); reader.finish();
} else if (m_readBuffer.length - m_readBufferFill > 0) { } else if (m_readBuffer.length - m_readBufferFill > 0) {
eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], &onReadLineData, IOMode.once); eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
} else { } else {
reader.finish(exh); reader.finish(exh);
} }
@ -174,9 +174,9 @@ void main()
auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect));
enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
import core.time : msecs; /*import core.time : msecs;
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs);
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs); eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/
print("Listening for requests on port 8080..."); print("Listening for requests on port 8080...");
while (eventDriver.waiterCount) while (eventDriver.waiterCount)
@ -209,7 +209,7 @@ struct ClientHandler {
auto conn = StreamConnection(client, linebuf); auto conn = StreamConnection(client, linebuf);
try { try {
while (!conn.empty) { while (true) {
conn.readLine(); conn.readLine();
ubyte[] ln; ubyte[] ln;

View file

@ -49,7 +49,7 @@ struct ClientHandler {
{ {
onLine = on_line; onLine = on_line;
if (linefill >= 2) onReadData(client, IOStatus.ok, 0); if (linefill >= 2) onReadData(client, IOStatus.ok, 0);
else eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); else eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData);
} }
void onRequestLine(ubyte[] ln) void onRequestLine(ubyte[] ln)
@ -68,7 +68,7 @@ struct ClientHandler {
{ {
if (ln.length == 0) { if (ln.length == 0) {
auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!";
eventDriver.writeSocket(client, reply, &onWriteFinished, IOMode.all); eventDriver.writeSocket(client, reply, IOMode.all, &onWriteFinished);
} else readLine(&onHeaderLine); } else readLine(&onHeaderLine);
} }
@ -101,7 +101,7 @@ struct ClientHandler {
onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]); onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]);
} else if (linebuf.length - linefill > 0) { } else if (linebuf.length - linefill > 0) {
eventDriver.readSocket(client, linebuf[linefill .. $], &onReadData, IOMode.once); eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData);
} else { } else {
// ERROR: header line too long // ERROR: header line too long
print("Header line too long"); print("Header line too long");

View file

@ -36,7 +36,17 @@ interface EventDriver {
the default duration of `Duration.max`, if necessary, will wait the default duration of `Duration.max`, if necessary, will wait
indefinitely until an event arrives. indefinitely until an event arrives.
*/ */
void processEvents(Duration timeout = Duration.max); ExitReason processEvents(Duration timeout = Duration.max);
/**
Causes `processEvents` to return with `ExitReason.exited` as soon as
possible.
A call to `processEvents` that is currently in progress will be notfied
so that it returns immediately. If no call is in progress, the next call
to `processEvents` will immediately return with `ExitReason.exited`.
*/
void exit();
// //
// TCP // TCP
@ -46,8 +56,8 @@ interface EventDriver {
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
void setTCPNoDelay(StreamSocketFD socket, bool enable); void setTCPNoDelay(StreamSocketFD socket, bool enable);
void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.once); void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.once); void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish);
void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); void waitSocketData(StreamSocketFD socket, IOCallback on_data_available);
void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true);
@ -56,6 +66,7 @@ interface EventDriver {
// //
EventID createEvent(); EventID createEvent();
void triggerEvent(EventID event, bool notify_all = true); void triggerEvent(EventID event, bool notify_all = true);
void triggerEvent(EventID event, bool notify_all = true) shared;
EventWaitID waitForEvent(EventID event, EventCallback on_event); EventWaitID waitForEvent(EventID event, EventCallback on_event);
void stopWaitingForEvent(EventID event, EventWaitID wait_id); void stopWaitingForEvent(EventID event, EventWaitID wait_id);
@ -105,6 +116,13 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
enum ExitReason {
timeout,
idle,
outOfWaiters,
exited
}
enum ConnectStatus { enum ConnectStatus {
connected, connected,
refused, refused,
@ -155,8 +173,6 @@ alias SocketFD = Handle!FD;
alias StreamSocketFD = Handle!SocketFD; alias StreamSocketFD = Handle!SocketFD;
alias StreamListenSocketFD = Handle!SocketFD; alias StreamListenSocketFD = Handle!SocketFD;
alias FileFD = Handle!FD; alias FileFD = Handle!FD;
alias EventID = Handle!FD;
alias TimerID = Handle!int; alias TimerID = Handle!int;
alias EventID = Handle!int;
alias EventWaitID = Handle!int; alias EventWaitID = Handle!int;

View file

@ -29,7 +29,7 @@ final class EpollEventDriver : PosixEventDriver {
m_events.length = 100; m_events.length = 100;
} }
override void doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)
@trusted { @trusted {
import std.algorithm : min; import std.algorithm : min;
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
@ -49,7 +49,8 @@ final class EpollEventDriver : PosixEventDriver {
if (evt.events & EPOLLERR) notify!(EventType.status)(fd); if (evt.events & EPOLLERR) notify!(EventType.status)(fd);
else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd); else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd);
} }
} return true;
} else return false;
} }
override void dispose() override void dispose()

View file

@ -14,7 +14,7 @@ import std.socket : Address, AddressFamily, UnknownAddress;
version (Posix) { version (Posix) {
import core.sys.posix.netinet.in_; import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp; import core.sys.posix.netinet.tcp;
import core.sys.posix.unistd : close; import core.sys.posix.unistd : close, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
} }
@ -22,6 +22,10 @@ version (Windows) {
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
alias EAGAIN = WSAEWOULDBLOCK; alias EAGAIN = WSAEWOULDBLOCK;
} }
version (linux) {
extern (C) int eventfd(uint initval, int flags);
enum EFD_NONBLOCK = 0x800;
}
private long currStdTime() private long currStdTime()
@ -37,6 +41,16 @@ abstract class PosixEventDriver : EventDriver {
private { private {
ChoppedVector!FDSlot m_fds; ChoppedVector!FDSlot m_fds;
size_t m_waiterCount = 0; size_t m_waiterCount = 0;
bool m_exit = false;
FD m_wakeupEvent;
}
protected this()
{
m_wakeupEvent = eventfd(0, EFD_NONBLOCK);
initFD(m_wakeupEvent);
registerFD(m_wakeupEvent, EventMask.read);
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
} }
mixin DefaultTimerImpl!(); mixin DefaultTimerImpl!();
@ -45,29 +59,53 @@ abstract class PosixEventDriver : EventDriver {
@property size_t waiterCount() const { return m_waiterCount; } @property size_t waiterCount() const { return m_waiterCount; }
final override void processEvents(Duration timeout) final override ExitReason processEvents(Duration timeout)
{ {
import std.algorithm : min; import std.algorithm : min;
import core.time : seconds; import core.time : seconds;
if (m_exit) {
m_exit = false;
return ExitReason.exited;
}
if (!waiterCount) return ExitReason.outOfWaiters;
bool got_events;
if (timeout <= 0.seconds) { if (timeout <= 0.seconds) {
doProcessEvents(0.seconds); got_events = doProcessEvents(0.seconds);
processTimers(currStdTime); processTimers(currStdTime);
} else { } else {
long now = currStdTime; long now = currStdTime;
do { do {
auto nextto = min(getNextTimeout(now), timeout); auto nextto = min(getNextTimeout(now), timeout);
doProcessEvents(nextto); got_events = doProcessEvents(nextto);
long prev_step = now; long prev_step = now;
now = currStdTime; now = currStdTime;
processTimers(now); processTimers(now);
if (timeout != Duration.max) if (timeout != Duration.max)
timeout -= (now - prev_step).hnsecs; timeout -= (now - prev_step).hnsecs;
} while (timeout > 0.seconds); } while (timeout > 0.seconds && !m_exit && !got_events);
}
} }
protected abstract void doProcessEvents(Duration dur); if (m_exit) {
m_exit = false;
return ExitReason.exited;
}
if (!waiterCount) return ExitReason.outOfWaiters;
if (got_events) return ExitReason.idle;
return ExitReason.timeout;
}
final override void exit()
{
m_exit = true;
int one = 1;
() @trusted { write(m_wakeupEvent, &one, one.sizeof); } ();
}
protected abstract bool doProcessEvents(Duration dur);
abstract void dispose(); abstract void dispose();
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
@ -105,7 +143,7 @@ abstract class PosixEventDriver : EventDriver {
} }
} }
addFD(sock); initFD(sock);
return sock; return sock;
} }
@ -148,7 +186,7 @@ abstract class PosixEventDriver : EventDriver {
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{ {
registerFD(sock, EventMask.read); registerFD(sock, EventMask.read);
addFD(sock); initFD(sock);
m_fds[sock].acceptCallback = on_accept; m_fds[sock].acceptCallback = on_accept;
startNotify!(EventType.read)(sock, &onAccept); startNotify!(EventType.read)(sock, &onAccept);
} }
@ -166,7 +204,7 @@ abstract class PosixEventDriver : EventDriver {
setSocketNonBlocking(cast(SocketFD)sockfd); setSocketNonBlocking(cast(SocketFD)sockfd);
auto fd = cast(StreamSocketFD)sockfd; auto fd = cast(StreamSocketFD)sockfd;
registerFD(fd, EventMask.read|EventMask.write|EventMask.status); registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
addFD(fd); initFD(fd);
//print("accept %d", sockfd); //print("accept %d", sockfd);
m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd);
} }
@ -178,7 +216,7 @@ abstract class PosixEventDriver : EventDriver {
() @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); () @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
} }
final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.all) final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
{ {
sizediff_t ret; sizediff_t ret;
() @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } ();
@ -260,7 +298,7 @@ abstract class PosixEventDriver : EventDriver {
} }
} }
final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.all) final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
sizediff_t ret; sizediff_t ret;
() @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } ();
@ -402,22 +440,45 @@ abstract class PosixEventDriver : EventDriver {
final override EventID createEvent() final override EventID createEvent()
{ {
assert(false); auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
initFD(id);
registerFD(id, EventMask.read);
startNotify!(EventType.read)(id, &onEvent);
return id;
} }
final override void triggerEvent(EventID event, bool notify_all = true) final override void triggerEvent(EventID event, bool notify_all = true)
{ {
foreach (w; m_fds[event].waiters.consume)
w(event);
}
final override void triggerEvent(EventID event, bool notify_all = true)
shared {
/*int one = 1;
if (notify_all) atomicStore(m_fds[event].triggerAll, true);
() @trusted { write(event, &one, one.sizeof); } ();*/
assert(false); assert(false);
} }
final override EventWaitID waitForEvent(EventID event, EventCallback on_event) final override EventWaitID waitForEvent(EventID event, EventCallback on_event)
{ {
//return m_fds[event].waiters.put(on_event);
assert(false); assert(false);
} }
final override void stopWaitingForEvent(EventID event, EventWaitID wait_id) final override void stopWaitingForEvent(EventID event, EventWaitID wait_id)
{ {
assert(false); assert(false);
//m_fds[event].waiters.remove(wait_id);
}
private void onEvent(FD event)
{
assert(false);
/*auto all = atomicLoad(m_fds[event].triggerAll);
atomicStore(m_fds[event].triggerAll, false);
triggerEvent(cast(EventID)event, all);*/
} }
final override void addRef(SocketFD fd) final override void addRef(SocketFD fd)
@ -434,7 +495,9 @@ abstract class PosixEventDriver : EventDriver {
final override void addRef(EventID descriptor) final override void addRef(EventID descriptor)
{ {
assert(false); auto pfd = &m_fds[descriptor];
assert(pfd.refCount > 0);
m_fds[descriptor].refCount++;
} }
final override void releaseRef(SocketFD fd) final override void releaseRef(SocketFD fd)
@ -460,7 +523,13 @@ abstract class PosixEventDriver : EventDriver {
final override void releaseRef(EventID descriptor) final override void releaseRef(EventID descriptor)
{ {
assert(false); auto pfd = &m_fds[descriptor];
assert(pfd.refCount > 0);
if (--m_fds[descriptor].refCount == 0) {
unregisterFD(descriptor);
clearFD(descriptor);
close(descriptor);
}
} }
@ -490,7 +559,6 @@ abstract class PosixEventDriver : EventDriver {
{ {
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
m_fds[fd].callback[evt] = callback; m_fds[fd].callback[evt] = callback;
assert(m_fds[0].callback[evt] is null);
m_waiterCount++; m_waiterCount++;
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd].eventMask);
} }
@ -518,7 +586,7 @@ abstract class PosixEventDriver : EventDriver {
return cast(SocketFD)sock; return cast(SocketFD)sock;
} }
private void addFD(FD fd) private void initFD(FD fd)
{ {
m_fds[fd].refCount = 1; m_fds[fd].refCount = 1;
} }
@ -534,6 +602,8 @@ alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD);
private struct FDSlot { private struct FDSlot {
import eventcore.internal.consumablequeue;
FDSlotCallback[EventType.max+1] callback; FDSlotCallback[EventType.max+1] callback;
uint refCount; uint refCount;
@ -550,6 +620,7 @@ private struct FDSlot {
ConnectCallback connectCallback; ConnectCallback connectCallback;
AcceptCallback acceptCallback; AcceptCallback acceptCallback;
ConsumableQueue!EventCallback waiters;
@property EventMask eventMask() const nothrow { @property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0; EventMask ret = cast(EventMask)0;

View file

@ -24,7 +24,7 @@ version (Windows) {
final class SelectEventDriver : PosixEventDriver { final class SelectEventDriver : PosixEventDriver {
override void doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)
{ {
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
//scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs"); //scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs");
@ -61,7 +61,8 @@ final class SelectEventDriver : PosixEventDriver {
if (FD_ISSET(fd, &statusfds)) if (FD_ISSET(fd, &statusfds))
notify!(EventType.status)(fd); notify!(EventType.status)(fd);
}); });
} return true;
} else return false;
} }
override void dispose() override void dispose()

View file

@ -34,7 +34,7 @@ struct ChoppedVector(T, size_t CHUNK_SIZE = 16*64*1024/nextPOT(T.sizeof)) {
import core.stdc.stdlib : calloc, free, malloc, realloc; import core.stdc.stdlib : calloc, free, malloc, realloc;
import std.traits : hasElaborateDestructor; import std.traits : hasElaborateDestructor;
static assert(!hasElaborateDestructor!T); static assert(!hasElaborateDestructor!T, "Cannot store element with elaborate destructor in ChoppedVector.");
alias chunkSize = CHUNK_SIZE; alias chunkSize = CHUNK_SIZE;