Split up PosixEventDriver into individual classes.

This commit is contained in:
Sönke Ludwig 2016-10-12 22:59:15 +02:00
parent 2a44817911
commit c6dec730d8
6 changed files with 249 additions and 186 deletions

View file

@ -5,9 +5,11 @@ public import eventcore.driver;
import eventcore.drivers.epoll; import eventcore.drivers.epoll;
import eventcore.drivers.libasync; import eventcore.drivers.libasync;
import eventcore.drivers.select; import eventcore.drivers.select;
import eventcore.drivers.posix;
version (Have_libasync) alias NativeEventDriver = LibasyncEventDriver; version (Have_libasync) alias NativeEventDriver = LibasyncEventDriver;
else alias NativeEventDriver = SelectEventDriver; else version (linux) alias NativeEventDriver = PosixEventDriver!EpollEventLoop;
else alias NativeEventDriver = PosixEventDriver!SelectEventLoop;
@property EventDriver eventDriver() @property EventDriver eventDriver()
@safe @nogc nothrow { @safe @nogc nothrow {

View file

@ -8,19 +8,19 @@ import std.socket : Address;
interface EventDriver { interface EventDriver {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
@property EventDriverCore core(); @property EventDriverCore core();
@property EventDriverFiles files();
@property EventDriverSockets sockets();
@property EventDriverTimers timers(); @property EventDriverTimers timers();
@property EventDriverEvents events(); @property EventDriverEvents events();
@property EventDriverSignals signals(); @property EventDriverSignals signals();
@property EventDriverSockets sockets();
@property EventDriverFiles files();
@property EventDriverWatchers watchers(); @property EventDriverWatchers watchers();
/// Releases all resources associated with the driver
void dispose();
} }
interface EventDriverCore { interface EventDriverCore {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
/// Releases all resources associated with the driver
void dispose();
/** The number of pending callbacks. /** The number of pending callbacks.
When this number drops to zero, the event loop can safely be quit. It is When this number drops to zero, the event loop can safely be quit. It is

View file

@ -17,7 +17,9 @@ import core.sys.posix.sys.time : timeval;
import core.sys.linux.epoll; import core.sys.linux.epoll;
final class EpollEventDriver : PosixEventDriver { final class EpollEventLoop : PosixEventLoop {
@safe: nothrow:
private { private {
int m_epoll; int m_epoll;
epoll_event[] m_events; epoll_event[] m_events;
@ -29,15 +31,6 @@ final class EpollEventDriver : PosixEventDriver {
m_events.length = 100; m_events.length = 100;
} }
nothrow @safe {
override @property EpollEventDriver core() { return this; }
override @property EpollEventDriver sockets() { return this; }
override @property EpollEventDriver timers() { return this; }
override @property EpollEventDriver events() { return this; }
override @property EpollEventDriver signals() { return this; }
override @property EpollEventDriver watchers() { return this; }
}
override bool doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)
@trusted { @trusted {
import std.algorithm : min; import std.algorithm : min;
@ -66,7 +59,6 @@ final class EpollEventDriver : PosixEventDriver {
override void dispose() override void dispose()
{ {
import core.sys.posix.unistd : close; import core.sys.posix.unistd : close;
super.dispose();
close(m_epoll); close(m_epoll);
} }

View file

@ -37,48 +37,87 @@ private long currStdTime()
return Clock.currStdTime; return Clock.currStdTime;
} }
abstract class PosixEventDriver : EventDriver, final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
EventDriverCore, EventDriverSockets, EventDriverTimers,
EventDriverEvents, EventDriverSignals, EventDriverWatchers
{
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private { private {
ChoppedVector!FDSlot m_fds; alias CoreDriver = PosixEventDriverCore!(Loop, LoopTimeoutTimerDriver);
size_t m_waiterCount = 0; alias EventsDriver = PosixEventDriverEvents!Loop;
bool m_exit = false; alias SignalsDriver = PosixEventDriverSignals!Loop;
FD m_wakeupEvent; alias TimerDriver = LoopTimeoutTimerDriver;
ThreadedFileEventDriver!PosixEventDriver m_files; alias SocketsDriver = PosixEventDriverSockets!Loop;
alias FileDriver = ThreadedFileEventDriver!EventsDriver;
alias WatcherDriver = PosixEventDriverWatchers!Loop;
Loop m_loop;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
LoopTimeoutTimerDriver m_timers;
SocketsDriver m_sockets;
FileDriver m_files;
WatcherDriver m_watchers;
} }
protected this() this()
{ {
m_wakeupEvent = eventfd(0, EFD_NONBLOCK); m_loop = new Loop;
initFD(m_wakeupEvent); m_events = new EventsDriver(m_loop);
registerFD(m_wakeupEvent, EventMask.read); m_signals = new SignalsDriver(m_loop);
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD m_timers = new TimerDriver;
m_files = new ThreadedFileEventDriver!PosixEventDriver(this); m_core = new CoreDriver(m_loop, m_timers);
m_sockets = new SocketsDriver(m_loop);
m_files = new FileDriver(m_events);
m_watchers = new WatcherDriver(m_loop);
} }
// force overriding these in the (final) sub classes to avoid virtual calls // force overriding these in the (final) sub classes to avoid virtual calls
abstract override @property PosixEventDriver core(); final override @property CoreDriver core() { return m_core; }
final override @property ThreadedFileEventDriver!PosixEventDriver files() { return m_files; } final override @property EventsDriver events() { return m_events; }
abstract override @property PosixEventDriver sockets(); final override @property SignalsDriver signals() { return m_signals; }
abstract override @property PosixEventDriver timers(); final override @property TimerDriver timers() { return m_timers; }
abstract override @property PosixEventDriver events(); final override @property SocketsDriver sockets() { return m_sockets; }
abstract override @property PosixEventDriver signals(); final override @property FileDriver files() { return m_files; }
abstract override @property PosixEventDriver watchers(); final override @property WatcherDriver watchers() { return m_watchers; }
mixin DefaultTimerImpl!(); final override void dispose()
{
m_files.dispose();
m_loop.dispose();
}
}
protected int maxFD() const { return cast(int)m_fds.length; }
@property size_t waiterCount() const { return m_waiterCount; } final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers) : EventDriverCore {
@safe: nothrow:
import core.time : Duration;
protected alias ExtraEventsCallback = bool delegate(long);
private {
Loop m_loop;
Timers m_timers;
bool m_exit = false;
FD m_wakeupEvent;
}
protected this(Loop loop, Timers timers)
{
m_loop = loop;
m_timers = timers;
m_wakeupEvent = eventfd(0, EFD_NONBLOCK);
m_loop.initFD(m_wakeupEvent);
m_loop.registerFD(m_wakeupEvent, EventMask.read);
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
}
@property size_t waiterCount() const { return m_loop.m_waiterCount; }
final override ExitReason processEvents(Duration timeout) final override ExitReason processEvents(Duration timeout)
{ {
import std.algorithm : min; import std.algorithm : min;
import core.time : seconds; import core.time : hnsecs, seconds;
if (m_exit) { if (m_exit) {
m_exit = false; m_exit = false;
@ -90,16 +129,16 @@ abstract class PosixEventDriver : EventDriver,
bool got_events; bool got_events;
if (timeout <= 0.seconds) { if (timeout <= 0.seconds) {
got_events = doProcessEvents(0.seconds); got_events = m_loop.doProcessEvents(0.seconds);
processTimers(currStdTime); m_timers.process(currStdTime);
} else { } else {
long now = currStdTime; long now = currStdTime;
do { do {
auto nextto = min(getNextTimeout(now), timeout); auto nextto = min(m_timers.getNextTimeout(now), timeout);
got_events = doProcessEvents(nextto); got_events = m_loop.doProcessEvents(nextto);
long prev_step = now; long prev_step = now;
now = currStdTime; now = currStdTime;
got_events |= processTimers(now); got_events |= m_timers.process(now);
if (timeout != Duration.max) if (timeout != Duration.max)
timeout -= (now - prev_step).hnsecs; timeout -= (now - prev_step).hnsecs;
} while (timeout > 0.seconds && !m_exit && !got_events); } while (timeout > 0.seconds && !m_exit && !got_events);
@ -126,12 +165,27 @@ abstract class PosixEventDriver : EventDriver,
m_exit = false; m_exit = false;
} }
protected abstract bool doProcessEvents(Duration dur); final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
abstract void dispose() FDSlot* fds = &m_loop.m_fds[descriptor];
{ assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
m_files.dispose(); "Requesting user data with differing type (destructor).");
assert(size <= FDSlot.userData.length, "Requested user data is too large.");
if (size > FDSlot.userData.length) assert(false);
if (!fds.userDataDestructor) {
initialize(fds.userData.ptr);
fds.userDataDestructor = destroy;
} }
return m_loop.m_fds[descriptor].userData.ptr;
}
}
final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect) final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
{ {
@ -153,9 +207,8 @@ abstract class PosixEventDriver : EventDriver,
return sock; return sock;
} }
registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.initFD(sock);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
initFD(sock);
auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } (); auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } ();
if (ret == 0) { if (ret == 0) {
@ -163,13 +216,13 @@ abstract class PosixEventDriver : EventDriver,
} else { } else {
auto err = getSocketError(); auto err = getSocketError();
if (err == EINPROGRESS) { if (err == EINPROGRESS) {
with (m_fds[sock]) { with (m_loop.m_fds[sock]) {
connectCallback = on_connect; connectCallback = on_connect;
} }
startNotify!(EventType.write)(sock, &onConnect); m_loop.startNotify!(EventType.write)(sock, &onConnect);
} else { } else {
clearFD(sock); m_loop.clearFD(sock);
unregisterFD(sock); m_loop.unregisterFD(sock);
invalidateSocket(); invalidateSocket();
on_connect(sock, ConnectStatus.unknownError); on_connect(sock, ConnectStatus.unknownError);
return sock; return sock;
@ -181,14 +234,14 @@ abstract class PosixEventDriver : EventDriver,
private void onConnect(FD sock) private void onConnect(FD sock)
{ {
setNotifyCallback!(EventType.write)(sock, null); m_loop.setNotifyCallback!(EventType.write)(sock, null);
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
} }
private void onConnectError(FD sock) private void onConnectError(FD sock)
{ {
// FIXME: determine the correct kind of error! // FIXME: determine the correct kind of error!
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
} }
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
@ -216,7 +269,7 @@ abstract class PosixEventDriver : EventDriver,
if (sock == StreamListenSocketFD.invalid) if (sock == StreamListenSocketFD.invalid)
return sock; return sock;
initFD(sock); m_loop.initFD(sock);
if (on_accept) waitForConnections(sock, on_accept); if (on_accept) waitForConnections(sock, on_accept);
@ -226,9 +279,9 @@ abstract class PosixEventDriver : EventDriver,
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{ {
log("wait for conn"); log("wait for conn");
registerFD(sock, EventMask.read); m_loop.registerFD(sock, EventMask.read);
m_fds[sock].acceptCallback = on_accept; m_loop.m_fds[sock].acceptCallback = on_accept;
startNotify!(EventType.read)(sock, &onAccept); m_loop.startNotify!(EventType.read)(sock, &onAccept);
onAccept(sock); onAccept(sock);
} }
@ -244,10 +297,10 @@ abstract class PosixEventDriver : EventDriver,
setSocketNonBlocking(cast(SocketFD)sockfd); setSocketNonBlocking(cast(SocketFD)sockfd);
auto fd = cast(StreamSocketFD)sockfd; auto fd = cast(StreamSocketFD)sockfd;
registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.initFD(fd);
initFD(fd); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
//print("accept %d", sockfd); //print("accept %d", sockfd);
m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); m_loop.m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd);
} }
} }
@ -299,33 +352,33 @@ abstract class PosixEventDriver : EventDriver,
} }
} }
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
readCallback = on_read_finish; readCallback = on_read_finish;
readMode = mode; readMode = mode;
bytesRead = ret > 0 ? ret : 0; bytesRead = ret > 0 ? ret : 0;
readBuffer = buffer; readBuffer = buffer;
} }
setNotifyCallback!(EventType.read)(socket, &onSocketRead); m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketRead);
} }
override void cancelRead(StreamSocketFD socket) override void cancelRead(StreamSocketFD socket)
{ {
assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress.");
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
readBuffer = null; readBuffer = null;
} }
} }
private void onSocketRead(FD fd) private void onSocketRead(FD fd)
{ {
auto slot = () @trusted { return &m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status) void finalize()(IOStatus status)
{ {
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null; //m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead); slot.readCallback(socket, status, slot.bytesRead);
} }
@ -394,26 +447,26 @@ abstract class PosixEventDriver : EventDriver,
} }
} }
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
writeCallback = on_write_finish; writeCallback = on_write_finish;
writeMode = mode; writeMode = mode;
bytesWritten = ret > 0 ? ret : 0; bytesWritten = ret > 0 ? ret : 0;
writeBuffer = buffer; writeBuffer = buffer;
} }
setNotifyCallback!(EventType.write)(socket, &onSocketWrite); m_loop.setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
} }
override void cancelWrite(StreamSocketFD socket) override void cancelWrite(StreamSocketFD socket)
{ {
assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress.");
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_fds[socket].writeBuffer = null; m_loop.m_fds[socket].writeBuffer = null;
} }
private void onSocketWrite(FD fd) private void onSocketWrite(FD fd)
{ {
auto slot = () @trusted { return &m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -422,14 +475,14 @@ abstract class PosixEventDriver : EventDriver,
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (err != EAGAIN) {
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(socket, IOStatus.error, slot.bytesRead); slot.writeCallback(socket, IOStatus.error, slot.bytesRead);
return; return;
} }
} }
if (ret == 0) { if (ret == 0) {
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten);
return; return;
} }
@ -438,7 +491,7 @@ abstract class PosixEventDriver : EventDriver,
slot.bytesWritten += ret; slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $]; slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
return; return;
} }
@ -471,24 +524,24 @@ abstract class PosixEventDriver : EventDriver,
return; return;
} }
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
readCallback = on_data_available; readCallback = on_data_available;
readMode = IOMode.once; readMode = IOMode.once;
bytesRead = 0; bytesRead = 0;
readBuffer = null; readBuffer = null;
} }
setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable); m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable);
} }
private void onSocketDataAvailable(FD fd) private void onSocketDataAvailable(FD fd)
{ {
auto slot = () @trusted { return &m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status) void finalize()(IOStatus status)
{ {
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null; //m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, 0); slot.readCallback(socket, status, 0);
} }
@ -522,9 +575,8 @@ abstract class PosixEventDriver : EventDriver,
return DatagramSocketFD.init; return DatagramSocketFD.init;
} }
registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.initFD(sock);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
initFD(sock);
return sock; return sock;
} }
@ -551,14 +603,14 @@ abstract class PosixEventDriver : EventDriver,
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_receive_finish(socket, IOStatus.wouldBlock, 0, null); on_receive_finish(socket, IOStatus.wouldBlock, 0, null);
} else { } else {
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } (); readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } ();
readMode = mode; readMode = mode;
bytesRead = 0; bytesRead = 0;
readBuffer = buffer; readBuffer = buffer;
} }
setNotifyCallback!(EventType.read)(socket, &onDgramRead); m_loop.setNotifyCallback!(EventType.read)(socket, &onDgramRead);
} }
return; return;
} }
@ -568,14 +620,14 @@ abstract class PosixEventDriver : EventDriver,
void cancelReceive(DatagramSocketFD socket) void cancelReceive(DatagramSocketFD socket)
{ {
assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress.");
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
m_fds[socket].readBuffer = null; m_loop.m_fds[socket].readBuffer = null;
} }
private void onDgramRead(FD fd) private void onDgramRead(FD fd)
@trusted { // DMD 2.072.0-b2: scope considered unsafe @trusted { // DMD 2.072.0-b2: scope considered unsafe
auto slot = () @trusted { return &m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
auto socket = cast(DatagramSocketFD)fd; auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -586,13 +638,13 @@ abstract class PosixEventDriver : EventDriver,
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (err != EAGAIN) {
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null); () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null);
return; return;
} }
} }
setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addr); () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addr);
} }
@ -603,7 +655,7 @@ abstract class PosixEventDriver : EventDriver,
sizediff_t ret; sizediff_t ret;
if (target_address) { if (target_address) {
() @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } (); () @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } ();
m_fds[socket].targetAddr = target_address; m_loop.m_fds[socket].targetAddr = target_address;
} else { } else {
() @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } ();
} }
@ -619,14 +671,14 @@ abstract class PosixEventDriver : EventDriver,
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_send_finish(socket, IOStatus.wouldBlock, 0, null); on_send_finish(socket, IOStatus.wouldBlock, 0, null);
} else { } else {
with (m_fds[socket]) { with (m_loop.m_fds[socket]) {
writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } (); writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } ();
writeMode = mode; writeMode = mode;
bytesWritten = 0; bytesWritten = 0;
writeBuffer = buffer; writeBuffer = buffer;
} }
setNotifyCallback!(EventType.write)(socket, &onDgramWrite); m_loop.setNotifyCallback!(EventType.write)(socket, &onDgramWrite);
} }
return; return;
} }
@ -636,14 +688,14 @@ abstract class PosixEventDriver : EventDriver,
void cancelSend(DatagramSocketFD socket) void cancelSend(DatagramSocketFD socket)
{ {
assert(m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress.");
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_fds[socket].writeBuffer = null; m_loop.m_fds[socket].writeBuffer = null;
} }
private void onDgramWrite(FD fd) private void onDgramWrite(FD fd)
{ {
auto slot = () @trusted { return &m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
auto socket = cast(DatagramSocketFD)fd; auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -656,74 +708,91 @@ abstract class PosixEventDriver : EventDriver,
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (err != EAGAIN) {
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
return; return;
} }
} }
setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
} }
final override void addRef(SocketFD fd) final override void addRef(SocketFD fd)
{ {
auto pfd = () @trusted { return &m_fds[fd]; } (); auto pfd = () @trusted { return &m_loop.m_fds[fd]; } ();
assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD."); assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD.");
m_fds[fd].refCount++; m_loop.m_fds[fd].refCount++;
} }
final override void releaseRef(SocketFD fd) final override void releaseRef(SocketFD fd)
{ {
auto pfd = () @trusted { return &m_fds[fd]; } (); auto pfd = () @trusted { return &m_loop.m_fds[fd]; } ();
assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD."); assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD.");
if (--m_fds[fd].refCount == 0) { if (--m_loop.m_fds[fd].refCount == 0) {
unregisterFD(fd); m_loop.unregisterFD(fd);
clearFD(fd); m_loop.clearFD(fd);
closeSocket(fd); closeSocket(fd);
} }
} }
private SocketFD createSocket(AddressFamily family, int type)
{
int sock;
() @trusted { sock = socket(family, type, 0); } ();
if (sock == -1) return SocketFD.invalid;
setSocketNonBlocking(cast(SocketFD)sock);
return cast(SocketFD)sock;
}
}
final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override EventID create() final override EventID create()
{ {
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
initFD(id); m_loop.initFD(id);
m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation
registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
startNotify!(EventType.read)(id, &onEvent); m_loop.startNotify!(EventType.read)(id, &onEvent);
return id; return id;
} }
final override void trigger(EventID event, bool notify_all = true) final override void trigger(EventID event, bool notify_all = true)
{ {
assert(event < m_fds.length, "Invalid event ID passed to triggerEvent."); assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent.");
if (notify_all) { if (notify_all) {
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
foreach (w; m_fds[event].waiters.consume) { foreach (w; m_loop.m_fds[event].waiters.consume) {
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
w(event); w(event);
} }
} else { } else {
if (!m_fds[event].waiters.empty) if (!m_loop.m_fds[event].waiters.empty)
m_fds[event].waiters.consumeOne(); m_loop.m_fds[event].waiters.consumeOne();
} }
} }
final override void trigger(EventID event, bool notify_all = true) final override void trigger(EventID event, bool notify_all = true)
shared @trusted { shared @trusted {
import core.atomic : atomicStore; import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriver)this; auto thisus = cast(PosixEventDriverEvents)this;
assert(event < thisus.m_fds.length, "Invalid event ID passed to shared triggerEvent."); assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
long one = 1; long one = 1;
//log("emitting for all threads"); //log("emitting for all threads");
if (notify_all) atomicStore(thisus.m_fds[event].triggerAll, true); if (notify_all) atomicStore(thisus.m_loop.m_fds[event].triggerAll, true);
() @trusted { .write(event, &one, one.sizeof); } (); () @trusted { .write(event, &one, one.sizeof); } ();
} }
final override void wait(EventID event, EventCallback on_event) final override void wait(EventID event, EventCallback on_event)
{ {
assert(event < m_fds.length, "Invalid event ID passed to waitForEvent."); assert(event < m_loop.m_fds.length, "Invalid event ID passed to waitForEvent.");
return m_fds[event].waiters.put(on_event); return m_loop.m_fds[event].waiters.put(on_event);
} }
final override void cancelWait(EventID event, EventCallback on_event) final override void cancelWait(EventID event, EventCallback on_event)
@ -731,7 +800,7 @@ abstract class PosixEventDriver : EventDriver,
import std.algorithm.searching : countUntil; import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove; import std.algorithm.mutation : remove;
m_fds[event].waiters.removePending(on_event); m_loop.m_fds[event].waiters.removePending(on_event);
} }
private void onEvent(FD event) private void onEvent(FD event)
@ -739,26 +808,32 @@ abstract class PosixEventDriver : EventDriver,
ulong cnt; ulong cnt;
() @trusted { .read(event, &cnt, cnt.sizeof); } (); () @trusted { .read(event, &cnt, cnt.sizeof); } ();
import core.atomic : cas; import core.atomic : cas;
auto all = cas(&m_fds[event].triggerAll, true, false); auto all = cas(&m_loop.m_fds[event].triggerAll, true, false);
trigger(cast(EventID)event, all); trigger(cast(EventID)event, all);
} }
final override void addRef(EventID descriptor) final override void addRef(EventID descriptor)
{ {
assert(m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD.");
m_fds[descriptor].refCount++; m_loop.m_fds[descriptor].refCount++;
} }
final override void releaseRef(EventID descriptor) final override void releaseRef(EventID descriptor)
{ {
assert(m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_fds[descriptor].refCount == 0) { if (--m_loop.m_fds[descriptor].refCount == 0) {
unregisterFD(descriptor); m_loop.unregisterFD(descriptor);
clearFD(descriptor); m_loop.clearFD(descriptor);
close(descriptor); close(descriptor);
} }
} }
}
final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override void wait(int sig, SignalCallback on_signal) final override void wait(int sig, SignalCallback on_signal)
{ {
@ -769,6 +844,13 @@ abstract class PosixEventDriver : EventDriver,
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
}
final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive) final override WatcherID watchDirectory(string path, bool recursive)
{ {
@ -794,21 +876,23 @@ abstract class PosixEventDriver : EventDriver,
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
}
final override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { package class PosixEventLoop {
FDSlot* fds = &m_fds[descriptor]; @safe: nothrow:
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, import core.time : Duration;
"Requesting user data with differing type (destructor).");
assert(size <= FDSlot.userData.length, "Requested user data is too large."); package {
if (size > FDSlot.userData.length) assert(false); ChoppedVector!FDSlot m_fds;
if (!fds.userDataDestructor) { size_t m_waiterCount = 0;
initialize(fds.userData.ptr);
fds.userDataDestructor = destroy;
}
return m_fds[descriptor].userData.ptr;
} }
protected @property int maxFD() const { return cast(int)m_fds.length; }
protected abstract void dispose();
protected abstract bool doProcessEvents(Duration dur);
/// Registers the FD for general notification reception. /// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask); protected abstract void registerFD(FD fd, EventMask mask);
@ -817,6 +901,13 @@ abstract class PosixEventDriver : EventDriver,
/// Updates the event mask to use for listening for notifications. /// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask mask); protected abstract void updateFD(FD fd, EventMask mask);
final protected void notify(EventType evt)(FD fd)
{
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd].callback[evt])
m_fds[fd].callback[evt](fd);
}
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
{ {
// TODO: optimize! // TODO: optimize!
@ -825,14 +916,7 @@ abstract class PosixEventDriver : EventDriver,
del(cast(FD)i); del(cast(FD)i);
} }
final protected void notify(EventType evt)(FD fd) package void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd].callback[evt])
m_fds[fd].callback[evt](fd);
}
private void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{ {
//log("start notify %s %s", evt, fd); //log("start notify %s %s", evt, fd);
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
@ -840,7 +924,7 @@ abstract class PosixEventDriver : EventDriver,
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd].eventMask);
} }
private void stopNotify(EventType evt)(FD fd) package void stopNotify(EventType evt)(FD fd)
{ {
//log("stop notify %s %s", evt, fd); //log("stop notify %s %s", evt, fd);
//ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
@ -848,7 +932,7 @@ abstract class PosixEventDriver : EventDriver,
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd].eventMask);
} }
private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert((callback !is null) != (m_fds[fd].callback[evt] !is null), assert((callback !is null) != (m_fds[fd].callback[evt] !is null),
"Overwriting notification callback."); "Overwriting notification callback.");
@ -863,21 +947,12 @@ abstract class PosixEventDriver : EventDriver,
m_fds[fd].callback[evt] = callback; m_fds[fd].callback[evt] = callback;
} }
private SocketFD createSocket(AddressFamily family, int type) package void initFD(FD fd)
{
int sock;
() @trusted { sock = socket(family, type, 0); } ();
if (sock == -1) return SocketFD.invalid;
setSocketNonBlocking(cast(SocketFD)sock);
return cast(SocketFD)sock;
}
private void initFD(FD fd)
{ {
m_fds[fd].refCount = 1; m_fds[fd].refCount = 1;
} }
private void clearFD(FD fd) package void clearFD(FD fd)
{ {
if (m_fds[fd].userDataDestructor) if (m_fds[fd].userDataDestructor)
() @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } ();
@ -889,6 +964,7 @@ abstract class PosixEventDriver : EventDriver,
} }
} }
alias FDEnumerateCallback = void delegate(FD); alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD); alias FDSlotCallback = void delegate(FD);

View file

@ -23,14 +23,8 @@ version (Windows) {
} }
final class SelectEventDriver : PosixEventDriver { final class SelectEventLoop : PosixEventLoop {
override @property SelectEventDriver core() { return this; } @safe: nothrow:
override @property SelectEventDriver sockets() { return this; }
override @property SelectEventDriver timers() { return this; }
override @property SelectEventDriver events() { return this; }
override @property SelectEventDriver signals() { return this; }
override @property SelectEventDriver watchers() { return this; }
override bool doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)
{ {
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
@ -74,7 +68,6 @@ final class SelectEventDriver : PosixEventDriver {
override void dispose() override void dispose()
{ {
super.dispose();
} }
override void registerFD(FD fd, EventMask mask) override void registerFD(FD fd, EventMask mask)

View file

@ -6,7 +6,7 @@ module eventcore.drivers.timer;
import eventcore.driver; import eventcore.driver;
mixin template DefaultTimerImpl() { final class LoopTimeoutTimerDriver : EventDriverTimers {
import std.experimental.allocator.building_blocks.free_list; import std.experimental.allocator.building_blocks.free_list;
import std.experimental.allocator.building_blocks.region; import std.experimental.allocator.building_blocks.region;
import std.experimental.allocator.mallocator; import std.experimental.allocator.mallocator;
@ -29,12 +29,12 @@ mixin template DefaultTimerImpl() {
ms_allocator.parent = Mallocator.instance; ms_allocator.parent = Mallocator.instance;
} }
final protected Duration getNextTimeout(long stdtime) final package Duration getNextTimeout(long stdtime)
{ @safe nothrow {
return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max; return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max;
} }
final protected bool processTimers(long stdtime) final package bool process(long stdtime)
@trusted nothrow { @trusted nothrow {
assert(m_firedTimers.length == 0); assert(m_firedTimers.length == 0);
if (m_timerQueue.empty) return false; if (m_timerQueue.empty) return false;
@ -48,7 +48,7 @@ mixin template DefaultTimerImpl() {
while (tm.timeout <= stdtime); while (tm.timeout <= stdtime);
auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(tail.release, tm); try m_timerQueue.insertBefore(tail.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); } catch (Exception e) { assert(false, e.msg); }
} else tm.pending = false; } else tm.pending = false;
m_firedTimers ~= tm; m_firedTimers ~= tm;
} }
@ -94,7 +94,7 @@ mixin template DefaultTimerImpl() {
auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm); auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(largerRange.release, tm); try m_timerQueue.insertBefore(largerRange.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); } catch (Exception e) { assert(false, e.msg); }
} }
final override void stop(TimerID timer) final override void stop(TimerID timer)