Split up EventDriver interface into individual sub interfaces.

This commit is contained in:
Sönke Ludwig 2016-10-05 04:21:24 +02:00
parent 021f918236
commit e3c4af8433
4 changed files with 134 additions and 98 deletions

View file

@ -83,13 +83,13 @@ struct StreamConnectionImpl {
~this() ~this()
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != StreamSocketFD.invalid)
eventDriver.releaseRef(m_socket); eventDriver.sockets.releaseRef(m_socket);
} }
@property bool empty() @property bool empty()
{ {
reader.start(); reader.start();
eventDriver.waitSocketData(m_socket, &onData); eventDriver.sockets.waitSocketData(m_socket, &onData);
reader.wait(); reader.wait();
return m_empty; return m_empty;
} }
@ -98,7 +98,7 @@ struct StreamConnectionImpl {
{ {
reader.start(); reader.start();
if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0); if (m_readBufferFill >= 2) onReadLineData(m_socket, IOStatus.ok, 0);
else eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); else eventDriver.sockets.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
reader.wait(); reader.wait();
auto ln = m_line; auto ln = m_line;
m_line = null; m_line = null;
@ -108,13 +108,13 @@ struct StreamConnectionImpl {
void write(const(ubyte)[] data) void write(const(ubyte)[] data)
{ {
writer.start(); writer.start();
eventDriver.writeSocket(m_socket, data, IOMode.all, &onWrite); eventDriver.sockets.writeSocket(m_socket, data, IOMode.all, &onWrite);
writer.wait(); writer.wait();
} }
void close() void close()
nothrow { nothrow {
eventDriver.releaseRef(m_socket); eventDriver.sockets.releaseRef(m_socket);
m_socket = StreamSocketFD.invalid; m_socket = StreamSocketFD.invalid;
m_readBuffer = null; m_readBuffer = null;
} }
@ -159,7 +159,7 @@ struct StreamConnectionImpl {
reader.finish(); reader.finish();
} else if (m_readBuffer.length - m_readBufferFill > 0) { } else if (m_readBuffer.length - m_readBufferFill > 0) {
eventDriver.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData); eventDriver.sockets.readSocket(m_socket, m_readBuffer[m_readBufferFill .. $], IOMode.once, &onReadLineData);
} else { } else {
reader.finish(exh); reader.finish(exh);
} }
@ -171,16 +171,16 @@ void main()
{ {
print("Starting up..."); print("Starting up...");
auto addr = new InternetAddress("127.0.0.1", 8080); auto addr = new InternetAddress("127.0.0.1", 8080);
auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect));
enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
/*import core.time : msecs; /*import core.time : msecs;
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs); eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 1"); }), 1000.msecs, 1000.msecs);
eventDriver.setTimer(eventDriver.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/ eventDriver.setTimer(eventDriver.timers.createTimer((tm) { print("timer 2"); }), 250.msecs, 500.msecs);*/
print("Listening for requests on port 8080..."); print("Listening for requests on port 8080...");
while (eventDriver.waiterCount) while (eventDriver.core.waiterCount)
eventDriver.processEvents(); eventDriver.core.processEvents();
} }
void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client)

View file

@ -9,12 +9,12 @@ void main()
{ {
print("Starting up..."); print("Starting up...");
auto addr = new InternetAddress("127.0.0.1", 8080); auto addr = new InternetAddress("127.0.0.1", 8080);
auto listener = eventDriver.listenStream(addr, toDelegate(&onClientConnect)); auto listener = eventDriver.sockets.listenStream(addr, toDelegate(&onClientConnect));
enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections."); enforce(listener != StreamListenSocketFD.invalid, "Failed to listen for connections.");
print("Listening for requests on port 8080..."); print("Listening for requests on port 8080...");
while (eventDriver.waiterCount) while (eventDriver.core.waiterCount)
eventDriver.processEvents(); eventDriver.core.processEvents();
} }
void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client) void onClientConnect(StreamListenSocketFD listener, StreamSocketFD client)
@ -49,7 +49,7 @@ struct ClientHandler {
{ {
onLine = on_line; onLine = on_line;
if (linefill >= 2) onReadData(client, IOStatus.ok, 0); if (linefill >= 2) onReadData(client, IOStatus.ok, 0);
else eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); else eventDriver.sockets.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData);
} }
void onRequestLine(ubyte[] ln) void onRequestLine(ubyte[] ln)
@ -57,8 +57,8 @@ struct ClientHandler {
//print("Request: %s", cast(char[])ln); //print("Request: %s", cast(char[])ln);
if (ln.length == 0) { if (ln.length == 0) {
//print("Error: empty request line"); //print("Error: empty request line");
eventDriver.shutdownSocket(client); eventDriver.sockets.shutdownSocket(client);
eventDriver.releaseRef(client); eventDriver.sockets.releaseRef(client);
} }
readLine(&onHeaderLine); readLine(&onHeaderLine);
@ -68,7 +68,7 @@ struct ClientHandler {
{ {
if (ln.length == 0) { if (ln.length == 0) {
auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!"; auto reply = cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\nKeep-Alive: timeout=10\r\n\r\nHello, World!";
eventDriver.writeSocket(client, reply, IOMode.all, &onWriteFinished); eventDriver.sockets.writeSocket(client, reply, IOMode.all, &onWriteFinished);
} else readLine(&onHeaderLine); } else readLine(&onHeaderLine);
} }
@ -83,8 +83,8 @@ struct ClientHandler {
if (status != IOStatus.ok) { if (status != IOStatus.ok) {
print("Client disconnect"); print("Client disconnect");
eventDriver.shutdownSocket(client); eventDriver.sockets.shutdownSocket(client);
eventDriver.releaseRef(client); eventDriver.sockets.releaseRef(client);
return; return;
} }
@ -101,12 +101,12 @@ struct ClientHandler {
onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]); onLine(linebuf[linefill + idx + 2 .. linefill + idx + 2 + idx]);
} else if (linebuf.length - linefill > 0) { } else if (linebuf.length - linefill > 0) {
eventDriver.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData); eventDriver.sockets.readSocket(client, linebuf[linefill .. $], IOMode.once, &onReadData);
} else { } else {
// ERROR: header line too long // ERROR: header line too long
print("Header line too long"); print("Header line too long");
eventDriver.shutdownSocket(client); eventDriver.sockets.shutdownSocket(client);
eventDriver.releaseRef(client); eventDriver.sockets.releaseRef(client);
} }
} }
} }

View file

@ -7,10 +7,16 @@ import std.socket : Address;
interface EventDriver { interface EventDriver {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
// @property EventDriverCore core();
// General functionality @property EventDriverFiles files();
// @property EventDriverSockets sockets();
@property EventDriverTimers udp();
@property EventDriverEvents events();
@property EventDriverWatchers watchers();
}
interface EventDriverCore {
@safe: /*@nogc:*/ nothrow:
/// Releases all resources associated with the driver /// Releases all resources associated with the driver
void dispose(); void dispose();
@ -57,77 +63,6 @@ interface EventDriver {
*/ */
void clearExitFlag(); void clearExitFlag();
//
// TCP
//
StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect);
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
ConnectionState getConnectionState(StreamSocketFD sock);
void setTCPNoDelay(StreamSocketFD socket, bool enable);
void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish);
void waitSocketData(StreamSocketFD socket, IOCallback on_data_available);
void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true);
void cancelRead(StreamSocketFD socket);
void cancelWrite(StreamSocketFD socket);
//
// Files
//
//FileFD openFile(string path, FileOpenMode mode);
//FileFD createTempFile();
//
// Manual events
//
EventID createEvent();
void triggerEvent(EventID event, bool notify_all = true);
void triggerEvent(EventID event, bool notify_all = true) shared;
void waitForEvent(EventID event, EventCallback on_event);
void cancelWaitForEvent(EventID event, EventCallback on_event);
//
// Timers
//
TimerID createTimer();
void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero);
void stopTimer(TimerID timer);
bool isTimerPending(TimerID timer);
bool isTimerPeriodic(TimerID timer);
void waitTimer(TimerID timer, TimerCallback callback);
void cancelTimerWait(TimerID timer, TimerCallback callback);
//
// Resource ownership
//
/**
Increments the reference count of the given resource.
*/
void addRef(SocketFD descriptor);
/// ditto
void addRef(FileFD descriptor);
/// ditto
void addRef(TimerID descriptor);
/// ditto
void addRef(EventID descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
*/
void releaseRef(SocketFD descriptor);
/// ditto
void releaseRef(FileFD descriptor);
/// ditto
void releaseRef(TimerID descriptor);
/// ditto
void releaseRef(EventID descriptor);
/// Low-level user data access. Use `getUserData` instead. /// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
@ -142,6 +77,100 @@ interface EventDriver {
} }
} }
interface EventDriverSockets {
@safe: /*@nogc:*/ nothrow:
/**
Increments the reference count of the given resource.
*/
void addRef(SocketFD descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
*/
void releaseRef(SocketFD descriptor);
StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect);
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
ConnectionState getConnectionState(StreamSocketFD sock);
void setTCPNoDelay(StreamSocketFD socket, bool enable);
void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish);
void waitSocketData(StreamSocketFD socket, IOCallback on_data_available);
void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true);
void cancelRead(StreamSocketFD socket);
void cancelWrite(StreamSocketFD socket);
}
interface EventDriverFiles {
@safe: /*@nogc:*/ nothrow:
/**
Increments the reference count of the given resource.
*/
void addRef(FileFD descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
*/
void releaseRef(FileFD descriptor);
//FileFD openFile(string path, FileOpenMode mode);
//FileFD createTempFile();
}
interface EventDriverEvents {
@safe: /*@nogc:*/ nothrow:
/**
Increments the reference count of the given resource.
*/
void addRef(EventID descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
*/
void releaseRef(EventID descriptor);
EventID createEvent();
void triggerEvent(EventID event, bool notify_all = true);
void triggerEvent(EventID event, bool notify_all = true) shared;
void waitForEvent(EventID event, EventCallback on_event);
void cancelWaitForEvent(EventID event, EventCallback on_event);
}
interface EventDriverTimers {
@safe: /*@nogc:*/ nothrow:
/**
Increments the reference count of the given resource.
*/
void addRef(TimerID descriptor);
/**
Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
*/
void releaseRef(TimerID descriptor);
TimerID createTimer();
void setTimer(TimerID timer, Duration timeout, Duration repeat = Duration.zero);
void stopTimer(TimerID timer);
bool isTimerPending(TimerID timer);
bool isTimerPeriodic(TimerID timer);
void waitTimer(TimerID timer, TimerCallback callback);
void cancelTimerWait(TimerID timer, TimerCallback callback);
}
interface EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
}
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD);

View file

@ -36,7 +36,7 @@ private long currStdTime()
return Clock.currStdTime; return Clock.currStdTime;
} }
abstract class PosixEventDriver : EventDriver { abstract class PosixEventDriver : EventDriver, EventDriverCore, EventDriverFiles, EventDriverSockets, EventDriverTimers, EventDriverEvents, EventDriverWatchers {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private { private {
@ -54,6 +54,13 @@ abstract class PosixEventDriver : EventDriver {
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
} }
@property PosixEventDriver core() { return this; }
@property PosixEventDriver files() { return this; }
@property PosixEventDriver sockets() { return this; }
@property PosixEventDriver udp() { return this; }
@property PosixEventDriver events() { return this; }
@property PosixEventDriver watchers() { return this; }
mixin DefaultTimerImpl!(); mixin DefaultTimerImpl!();
protected int maxFD() const { return cast(int)m_fds.length; } protected int maxFD() const { return cast(int)m_fds.length; }