Move drivers to an own package.

This commit is contained in:
Sönke Ludwig 2016-01-16 15:50:47 +01:00
parent be99b48216
commit b92be4a29d
5 changed files with 9 additions and 9 deletions

View file

@ -0,0 +1,90 @@
module eventcore.drivers.epoll;
@safe: /*@nogc:*/ nothrow:
version (linux):
public import eventcore.drivers.posix;
import eventcore.internal.utils;
import core.time : Duration;
import core.sys.posix.sys.time : timeval;
import core.sys.linux.epoll;
final class EpollEventDriver : PosixEventDriver {
private {
int m_epoll;
epoll_event[] m_events;
}
this()
{
m_epoll = () @trusted { return epoll_create1(0); } ();
m_events.length = 100;
}
override void doProcessEvents(Duration timeout)
@trusted {
import std.algorithm : min;
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
auto ts = timeout.toTimeVal;
//print("wait %s", m_events.length);
auto ret = epoll_wait(m_epoll, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? -1 : cast(int)min(timeout.total!"msecs", int.max));
//print("wait done %s", ret);
if (ret > 0) {
foreach (ref evt; m_events[0 .. ret]) {
//print("event %s %s", evt.data.fd, evt.events);
auto fd = cast(FD)evt.data.fd;
if (evt.events & EPOLLIN) notify!(EventType.read)(fd);
if (evt.events & EPOLLOUT) notify!(EventType.write)(fd);
if (evt.events & EPOLLERR) notify!(EventType.status)(fd);
else if (evt.events & EPOLLHUP) notify!(EventType.status)(fd);
}
}
}
override void dispose()
{
close(m_epoll);
}
override void registerFD(FD fd, EventMask mask)
{
//print("register %s %s", fd, mask);
epoll_event ev;
ev.events |= EPOLLET;
if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
ev.data.fd = fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_ADD, fd, &ev); } ();
}
override void unregisterFD(FD fd)
{
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_DEL, fd, null); } ();
}
override void updateFD(FD fd, EventMask mask)
{
//print("update %s %s", fd, mask);
epoll_event ev;
ev.events |= EPOLLET;
//ev.events = EPOLLONESHOT;
if (mask & EventMask.read) ev.events |= EPOLLIN;
if (mask & EventMask.write) ev.events |= EPOLLOUT;
if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
ev.data.fd = fd;
() @trusted { epoll_ctl(m_epoll, EPOLL_CTL_MOD, fd, &ev); } ();
}
}
private timeval toTimeVal(Duration dur)
{
timeval tvdur;
dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
return tvdur;
}

View file

@ -0,0 +1,634 @@
module eventcore.drivers.posix;
@safe: /*@nogc:*/ nothrow:
public import eventcore.driver;
import eventcore.drivers.timer;
import eventcore.internal.utils;
import std.socket : Address, AddressFamily, UnknownAddress;
version (Posix) {
import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp;
import core.sys.posix.unistd : close;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl;
}
version (Windows) {
import core.sys.windows.winsock2;
alias EAGAIN = WSAEWOULDBLOCK;
}
private long currStdTime()
{
import std.datetime : Clock;
scope (failure) assert(false);
return Clock.currStdTime;
}
abstract class PosixEventDriver : EventDriver {
@safe: /*@nogc:*/ nothrow:
private {
ChoppedVector!FDSlot m_fds;
size_t m_waiterCount = 0;
}
mixin DefaultTimerImpl!();
protected int maxFD() const { return cast(int)m_fds.length; }
@property size_t waiterCount() const { return m_waiterCount; }
final override void processEvents(Duration timeout)
{
import std.algorithm : min;
import core.time : seconds;
if (timeout <= 0.seconds) {
doProcessEvents(0.seconds);
processTimers(currStdTime);
} else {
long now = currStdTime;
do {
auto nextto = min(getNextTimeout(now), timeout);
doProcessEvents(nextto);
long prev_step = now;
now = currStdTime;
processTimers(now);
if (timeout != Duration.max)
timeout -= (now - prev_step).hnsecs;
} while (timeout > 0.seconds);
}
}
protected abstract void doProcessEvents(Duration dur);
abstract void dispose();
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
{
auto sock = cast(StreamSocketFD)createSocket(address.addressFamily);
if (sock == -1) return StreamSocketFD.invalid;
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; }
scope bind_addr = new UnknownAddress;
bind_addr.name.sa_family = cast(ushort)address.addressFamily;
bind_addr.name.sa_data[] = 0;
if (() @trusted { return bind(sock, bind_addr.name, bind_addr.nameLen); } () != 0) {
invalidateSocket();
on_connect(sock, ConnectStatus.bindFailure);
return sock;
}
registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } ();
if (ret == 0) {
on_connect(sock, ConnectStatus.connected);
} else {
auto err = getSocketError();
if (err == EINPROGRESS) {
with (m_fds[sock]) {
connectCallback = on_connect;
}
startNotify!(EventType.write)(sock, &onConnect);
} else {
unregisterFD(sock);
invalidateSocket();
on_connect(sock, ConnectStatus.unknownError);
}
}
addFD(sock);
return sock;
}
private void onConnect(FD sock)
{
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
}
private void onConnectError(FD sock)
{
// FIXME: determine the correct kind of error!
m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
}
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
{
auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily);
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; }
() @trusted {
int tmp_reuse = 1;
// FIXME: error handling!
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
invalidateSocket();
} else if (bind(sock, address.name, address.nameLen) != 0) {
invalidateSocket();
} else if (listen(sock, 128) != 0) {
invalidateSocket();
} else { scope (failure) assert(false); import std.stdio; writeln("Success!"); }
} ();
if (on_accept && sock != StreamListenSocketFD.invalid)
waitForConnections(sock, on_accept);
return sock;
}
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{
registerFD(sock, EventMask.read);
addFD(sock);
m_fds[sock].acceptCallback = on_accept;
startNotify!(EventType.read)(sock, &onAccept);
}
private void onAccept(FD listenfd)
{
scope addr = new UnknownAddress;
foreach (i; 0 .. 20) {
int sockfd;
version (Windows) int addr_len = addr.nameLen;
else uint addr_len = addr.nameLen;
() @trusted { sockfd = accept(listenfd, addr.name, &addr_len); } ();
if (sockfd == -1) break;
setSocketNonBlocking(cast(SocketFD)sockfd);
auto fd = cast(StreamSocketFD)sockfd;
registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
addFD(fd);
//print("accept %d", sockfd);
m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd);
}
}
final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{
int opt = enable;
() @trusted { setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
}
final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOCallback on_read_finish, IOMode mode = IOMode.all)
{
sizediff_t ret;
() @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) {
print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_read = 0;
if (ret == 0) {
on_read_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret > 0) {
bytes_read += ret;
buffer = buffer[bytes_read .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, bytes_read);
return;
}
}
with (m_fds[socket]) {
readCallback = on_read_finish;
readMode = mode;
bytesRead = ret > 0 ? ret : 0;
readBuffer = buffer;
}
setNotifyCallback!(EventType.read)(socket, &onSocketRead);
}
private void onSocketRead(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead);
}
sizediff_t ret;
() @trusted { ret = recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) {
finalize(IOStatus.error);
return;
}
}
if (ret == 0) {
finalize(IOStatus.disconnected);
return;
}
if (ret > 0) {
slot.bytesRead += ret;
slot.readBuffer = slot.readBuffer[ret .. $];
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
finalize(IOStatus.ok);
return;
}
}
}
final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOCallback on_write_finish, IOMode mode = IOMode.all)
{
sizediff_t ret;
() @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) {
on_write_finish(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_written = 0;
if (ret == 0) {
on_write_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
on_write_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret > 0) {
bytes_written += ret;
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_write_finish(socket, IOStatus.ok, bytes_written);
return;
}
}
with (m_fds[socket]) {
writeCallback = on_write_finish;
writeMode = mode;
bytesWritten = ret > 0 ? ret : 0;
writeBuffer = buffer;
}
setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
}
private void onSocketWrite(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
sizediff_t ret;
() @trusted { ret = send(socket, slot.writeBuffer.ptr, slot.writeBuffer.length, 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) {
setNotifyCallback!(EventType.write)(socket, null);
slot.readCallback(socket, IOStatus.error, slot.bytesRead);
return;
}
}
if (ret == 0) {
setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten);
return;
}
if (ret > 0) {
slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
return;
}
}
}
final override void waitSocketData(StreamSocketFD socket, IOCallback on_data_available)
{
sizediff_t ret;
ubyte dummy;
() @trusted { ret = recv(socket, &dummy, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) {
on_data_available(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_read = 0;
if (ret == 0) {
on_data_available(socket, IOStatus.disconnected, 0);
return;
}
if (ret > 0) {
on_data_available(socket, IOStatus.ok, 0);
return;
}
with (m_fds[socket]) {
readCallback = on_data_available;
readMode = IOMode.once;
bytesRead = 0;
readBuffer = null;
}
setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable);
}
private void onSocketDataAvailable(FD fd)
{
auto slot = &m_fds[fd];
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, 0);
}
sizediff_t ret;
ubyte tmp;
() @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = getSocketError();
if (err != EAGAIN) finalize(IOStatus.error);
} else finalize(ret ? IOStatus.ok : IOStatus.disconnected);
}
final override void shutdownSocket(StreamSocketFD socket, bool shut_read, bool shut_write)
{
// TODO!
}
final override EventID createEvent()
{
assert(false);
}
final override void triggerEvent(EventID event, bool notify_all = true)
{
assert(false);
}
final override EventWaitID waitForEvent(EventID event, EventCallback on_event)
{
assert(false);
}
final override void stopWaitingForEvent(EventID event, EventWaitID wait_id)
{
assert(false);
}
final override void addRef(SocketFD fd)
{
auto pfd = &m_fds[fd];
assert(pfd.refCount > 0);
m_fds[fd].refCount++;
}
final override void addRef(FileFD descriptor)
{
assert(false);
}
final override void addRef(EventID descriptor)
{
assert(false);
}
final override void releaseRef(SocketFD fd)
{
auto pfd = &m_fds[fd];
assert(pfd.refCount > 0);
if (--m_fds[fd].refCount == 0) {
unregisterFD(fd);
clearFD(fd);
closeSocket(fd);
}
}
final override void releaseRef(FileFD descriptor)
{
assert(false);
}
final override void releaseRef(TimerID descriptor)
{
assert(false);
}
final override void releaseRef(EventID descriptor)
{
assert(false);
}
/// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask);
/// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd);
/// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask mask);
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
{
// TODO: optimize!
foreach (i; 0 .. cast(int)m_fds.length)
if (m_fds[i].callback[evt])
del(cast(FD)i);
}
final protected void notify(EventType evt)(FD fd)
{
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd].callback[evt])
m_fds[fd].callback[evt](fd);
}
private void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
m_fds[fd].callback[evt] = callback;
assert(m_fds[0].callback[evt] is null);
m_waiterCount++;
updateFD(fd, m_fds[fd].eventMask);
}
private void stopNotify(EventType evt)(FD fd)
{
//ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
m_fds[fd].callback[evt] = null;
m_waiterCount--;
updateFD(fd, m_fds[fd].eventMask);
}
private void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{
assert((callback !is null) != (m_fds[fd].callback[evt] !is null));
m_fds[fd].callback[evt] = callback;
}
private SocketFD createSocket(AddressFamily family)
{
int sock;
() @trusted { sock = socket(family, SOCK_STREAM, 0); } ();
if (sock == -1) return SocketFD.invalid;
setSocketNonBlocking(cast(SocketFD)sock);
return cast(SocketFD)sock;
}
private void addFD(FD fd)
{
m_fds[fd].refCount = 1;
}
private void clearFD(FD fd)
{
m_fds[fd] = FDSlot.init;
}
}
alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD);
private struct FDSlot {
FDSlotCallback[EventType.max+1] callback;
uint refCount;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
IOCallback readCallback;
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
IOCallback writeCallback;
ConnectCallback connectCallback;
AcceptCallback acceptCallback;
@property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0;
if (callback[EventType.read] !is null) ret |= EventMask.read;
if (callback[EventType.write] !is null) ret |= EventMask.write;
if (callback[EventType.status] !is null) ret |= EventMask.status;
return ret;
}
}
enum EventType {
read,
write,
status
}
enum EventMask {
read = 1<<0,
write = 1<<1,
status = 1<<2
}
private void closeSocket(SocketFD sockfd)
@nogc {
version (Windows) () @trusted { closesocket(sockfd); } ();
else close(sockfd);
}
private void setSocketNonBlocking(SocketFD sockfd)
{
version (Windows) {
size_t enable = 1;
() @trusted { ioctlsocket(sockfd, FIONBIO, &enable); } ();
} else {
() @trusted { fcntl(sockfd, F_SETFL, O_NONBLOCK, 1); } ();
}
}
private int getSocketError()
@nogc {
version (Windows) return WSAGetLastError();
else return errno;
}
/*version (Windows) {
import std.c.windows.windows;
import std.c.windows.winsock;
alias EWOULDBLOCK = WSAEWOULDBLOCK;
extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments);
class WSAErrorException : Exception {
int error;
this(string message, string file = __FILE__, size_t line = __LINE__)
{
error = WSAGetLastError();
this(message, error, file, line);
}
this(string message, int error, string file = __FILE__, size_t line = __LINE__)
{
import std.string : format;
ushort* errmsg;
FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS,
null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null);
size_t len = 0;
while (errmsg[len]) len++;
auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup;
LocalFree(errmsg);
super(format("%s: %s (%s)", message, errmsgd, error), file, line);
}
}
alias SystemSocketException = WSAErrorException;
} else {
import std.exception : ErrnoException;
alias SystemSocketException = ErrnoException;
}
T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__)
{
import std.exception : enforceEx;
return enforceEx!SystemSocketException(value, msg, file, line);
}*/

View file

@ -0,0 +1,83 @@
module eventcore.drivers.select;
@safe: /*@nogc:*/ nothrow:
public import eventcore.drivers.posix;
import eventcore.internal.utils;
import core.time : Duration;
version (Posix) {
import core.sys.posix.sys.time : timeval;
import core.sys.posix.sys.select;
}
version (Windows) {
import core.sys.windows.winsock2;
}
final class SelectEventDriver : PosixEventDriver {
override void doProcessEvents(Duration timeout)
{
//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
//scope (failure) assert(false); import std.stdio; writefln("%.3f: process %s ms", Clock.currAppTick.usecs * 1e-3, timeout.total!"msecs");
//scope (success) writefln("%.3f: process out", Clock.currAppTick.usecs * 1e-3);
auto ts = timeout.toTimeVal;
fd_set readfds, writefds, statusfds;
() @trusted {
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&statusfds);
} ();
enumerateFDs!(EventType.read)((fd) @trusted { FD_SET(fd, &readfds); });
enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); });
enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); });
//print("Wait for event...");
//writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3);
auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } ();
//writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3);
//print("Done wait for event...");
if (ret > 0) {
enumerateFDs!(EventType.read)((fd) @trusted {
if (FD_ISSET(fd, &readfds))
notify!(EventType.read)(fd);
});
enumerateFDs!(EventType.write)((fd) @trusted {
if (FD_ISSET(fd, &writefds))
notify!(EventType.write)(fd);
});
enumerateFDs!(EventType.status)((fd) @trusted {
if (FD_ISSET(fd, &statusfds))
notify!(EventType.status)(fd);
});
}
}
override void dispose()
{
}
override void registerFD(FD fd, EventMask mask)
{
}
override void unregisterFD(FD fd)
{
}
override void updateFD(FD fd, EventMask mask)
{
}
}
private timeval toTimeVal(Duration dur)
{
timeval tvdur;
dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
return tvdur;
}

View file

@ -0,0 +1,147 @@
module eventcore.drivers.timer;
import eventcore.driver;
mixin template DefaultTimerImpl() {
import std.experimental.allocator.building_blocks.free_list;
import std.experimental.allocator.building_blocks.region;
import std.experimental.allocator.mallocator;
import std.experimental.allocator : dispose, make;
import std.container.array;
import std.datetime : Clock;
import std.range : SortedRange, assumeSorted, take;
import core.time : hnsecs;
private {
static FreeList!(Mallocator, TimerSlot.sizeof) ms_allocator;
TimerSlot*[TimerID] m_timers;
Array!(TimerSlot*) m_timerQueue;
TimerID m_lastTimerID;
TimerSlot*[] m_firedTimers;
}
static this()
{
ms_allocator.parent = Mallocator.instance;
}
final protected Duration getNextTimeout(long stdtime)
{
return m_timerQueue.length ? (m_timerQueue.front.timeout - stdtime).hnsecs : Duration.max;
}
final protected void processTimers(long stdtime)
@trusted {
assert(m_firedTimers.length == 0);
if (m_timerQueue.empty) return;
TimerSlot ts = void;
ts.timeout = stdtime+1;
auto fired = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).lowerBound(&ts);
foreach (tm; fired) {
if (tm.repeatDuration > 0) {
do tm.timeout += tm.repeatDuration;
while (tm.timeout <= stdtime);
auto tail = m_timerQueue[fired.length .. $].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(tail.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); }
} else tm.pending = false;
m_firedTimers ~= tm;
}
// NOTE: this isn't yet verified to work under all circumstances
auto elems = m_timerQueue[0 .. fired.length];
scope (failure) assert(false);
m_timerQueue.linearRemove(elems);
foreach (tm; m_firedTimers)
tm.callback(tm.id);
m_firedTimers.length = 0;
m_firedTimers.assumeSafeAppend();
}
final override TimerID createTimer(TimerCallback callback)
@trusted {
auto id = cast(TimerID)(m_lastTimerID + 1);
TimerSlot* tm;
try tm = ms_allocator.make!TimerSlot;
catch (Exception e) return TimerID.invalid;
assert(tm !is null);
tm.id = id;
tm.refCount = 1;
tm.callback = callback;
m_timers[id] = tm;
return id;
}
final override void setTimer(TimerID timer, Duration timeout, Duration repeat)
@trusted {
scope (failure) assert(false);
auto tm = m_timers[timer];
if (tm.pending) stopTimer(timer);
tm.timeout = Clock.currStdTime + timeout.total!"hnsecs";
tm.repeatDuration = repeat.total!"hnsecs";
tm.pending = true;
auto largerRange = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(tm);
try m_timerQueue.insertBefore(largerRange.release, tm);
catch (Exception e) { print("Failed to insert timer: %s", e.msg); }
}
final override void stopTimer(TimerID timer)
@trusted {
auto tm = m_timers[timer];
if (!tm.pending) return;
tm.pending = false;
tm.callback = null;
TimerSlot cmp = void;
cmp.timeout = tm.timeout-1;
auto upper = m_timerQueue[].assumeSorted!((a, b) => a.timeout < b.timeout).upperBound(&cmp);
assert(!upper.empty);
while (!upper.empty) {
assert(upper.front.timeout == tm.timeout);
if (upper.front is tm) {
scope (failure) assert(false);
m_timerQueue.linearRemove(upper.release.take(1));
break;
}
}
}
final override bool isTimerPending(TimerID descriptor)
{
return m_timers[descriptor].pending;
}
final override bool isTimerPeriodic(TimerID descriptor)
{
return m_timers[descriptor].repeatDuration > 0;
}
final override void addRef(TimerID descriptor)
{
m_timers[descriptor].refCount++;
}
final override void releaseRef(TimerID descriptor)
{
auto tm = m_timers[descriptor];
if (!--tm.refCount) {
if (tm.pending) stopTimer(tm.id);
m_timers.remove(descriptor);
() @trusted { scope (failure) assert(false); ms_allocator.dispose(tm); } ();
}
}
}
struct TimerSlot {
TimerID id;
uint refCount;
bool pending;
long timeout; // stdtime
long repeatDuration;
TimerCallback callback;
}