From 5f243cbb18ee0b7b36b682953421d112e41a2110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Tue, 18 Oct 2016 11:53:52 +0200 Subject: [PATCH] Improve cross-platform organization and fix compilation on Windows (posix driver). --- dub.sdl | 26 +++++++- source/eventcore/core.d | 46 +++++++++----- source/eventcore/drivers/epoll.d | 1 + source/eventcore/drivers/kqueue.d | 2 + source/eventcore/drivers/posix.d | 79 +++++++++++++++---------- source/eventcore/drivers/select.d | 2 + source/eventcore/drivers/threadedfile.d | 15 +++-- 7 files changed, 118 insertions(+), 53 deletions(-) diff --git a/dub.sdl b/dub.sdl index d64eabb..c50b3a0 100644 --- a/dub.sdl +++ b/dub.sdl @@ -1,15 +1,37 @@ name "eventcore" -description "Experimental callback based abstraction layer over operating system asynchronous I/O facilities." +description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities." license "MIT" copyright "Copyright © 2012-2016 rejectedsoftware e.K." targetType "library" libs "anl" platform="linux" +libs "ws2_32" platform="windows" -configuration "native" { +configuration "epoll" { + platforms "linux" + versions "EventcoreEpollDriver" +} + +configuration "kqueue" { + platforms "osx" "freebsd" + versions "EventcoreKqueueDriver" +} + +configuration "winapi" { + platforms "windows" + versions "EventcoreWinAPIDriver" +} + +configuration "select" { + versions "EventcoreSelectDriver" } configuration "libasync" { dependency "libasync" version="~>0.7.9" + versions "EventcoreLibasyncDriver" +} + +configuration "generic" { + // Defines eventDriver as the generic EventDriver interface. Setup must be done manually. } diff --git a/source/eventcore/core.d b/source/eventcore/core.d index 6c96ac0..f3ed806 100644 --- a/source/eventcore/core.d +++ b/source/eventcore/core.d @@ -2,29 +2,45 @@ module eventcore.core; public import eventcore.driver; -import eventcore.drivers.epoll; -import eventcore.drivers.libasync; import eventcore.drivers.select; -import eventcore.drivers.posix; +import eventcore.drivers.epoll; +import eventcore.drivers.kqueue; +import eventcore.drivers.libasync; +import eventcore.drivers.winapi; -version (Have_libasync) alias NativeEventDriver = LibasyncEventDriver; -else version (linux) alias NativeEventDriver = PosixEventDriver!EpollEventLoop; -else alias NativeEventDriver = PosixEventDriver!SelectEventLoop; +version (EventcoreEpollDriver) alias NativeEventDriver = EpollEventDriver; +else version (EventcoreKqueueDriver) alias NativeEventDriver = KqueueEventDriver; +else version (EventcoreWinAPIDriver) alias NativeEventDriver = WinAPIEventDriver; +else version (EventcoreLibasyncDriver) alias NativeEventDriver = LibasyncEventDriver; +else version (EventcoreSelectDriver) alias NativeEventDriver = SelectEventDriver; +else alias NativeEventDriver = EventDriver; -@property EventDriver eventDriver() +@property NativeEventDriver eventDriver() @safe @nogc nothrow { - assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); + static if (is(NativeEventDriver == EventDriver)) + assert(s_driver !is null, "setupEventDriver() was not called for this thread."); + else + assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); return s_driver; } -static this() -{ - if (!s_driver) s_driver = new NativeEventDriver; -} +static if (!is(NativeEventDriver == EventDriver)) { + static this() + { + if (!s_driver) s_driver = new NativeEventDriver; + } -shared static this() -{ - s_driver = new NativeEventDriver; + shared static this() + { + s_driver = new NativeEventDriver; + } +} else { + void setupEventDriver(EventDriver driver) + { + assert(driver !is null, "The event driver instance must be non-null."); + assert(!s_driver, "Can only set up the event driver once per thread."); + s_driver = driver; + } } private NativeEventDriver s_driver; diff --git a/source/eventcore/drivers/epoll.d b/source/eventcore/drivers/epoll.d index 5babbe8..cbaf2d4 100644 --- a/source/eventcore/drivers/epoll.d +++ b/source/eventcore/drivers/epoll.d @@ -16,6 +16,7 @@ import core.time : Duration; import core.sys.posix.sys.time : timeval; import core.sys.linux.epoll; +alias EpollEventDriver = PosixEventDriver!EpollEventLoop; final class EpollEventLoop : PosixEventLoop { @safe: nothrow: diff --git a/source/eventcore/drivers/kqueue.d b/source/eventcore/drivers/kqueue.d index 4ed3316..945d64e 100644 --- a/source/eventcore/drivers/kqueue.d +++ b/source/eventcore/drivers/kqueue.d @@ -27,6 +27,8 @@ else static assert(false, "Kqueue not supported on this OS."); import core.sys.linux.epoll; +alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop; + final class KqueueEventLoop : PosixEventLoop { private { int m_queue; diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 6c39a52..17b1a9d 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -12,8 +12,9 @@ import eventcore.drivers.threadedfile; import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.utils; -import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnixAddress, UnknownAddress; +import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress; version (Posix) { + import std.socket : UnixAddress; import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo; import core.sys.posix.netinet.in_; import core.sys.posix.netinet.tcp; @@ -24,7 +25,11 @@ version (Posix) { } version (Windows) { import core.sys.windows.winsock2; + alias sockaddr_storage = SOCKADDR_STORAGE; alias EAGAIN = WSAEWOULDBLOCK; + extern (C) int read(int fd, void *buffer, uint count) nothrow; + extern (C) int write(int fd, const(void) *buffer, uint count) nothrow; + extern (C) int close(int fd) nothrow @safe; } version (linux) { extern (C) int eventfd(uint initval, int flags); @@ -44,9 +49,10 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { private { - alias CoreDriver = PosixEventDriverCore!(Loop, LoopTimeoutTimerDriver); + alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver); alias EventsDriver = PosixEventDriverEvents!Loop; - alias SignalsDriver = PosixEventDriverSignals!Loop; + version (linx) alias SignalsDriver = SignalFDEventDriverSignals!Loop; + else alias SignalsDriver = DummyEventDriverSignals!Loop; alias TimerDriver = LoopTimeoutTimerDriver; alias SocketsDriver = PosixEventDriverSockets!Loop; version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); @@ -72,7 +78,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { m_events = new EventsDriver(m_loop); m_signals = new SignalsDriver(m_loop); m_timers = new TimerDriver; - m_core = new CoreDriver(m_loop, m_timers); + m_core = new CoreDriver(m_loop, m_timers, m_events); m_sockets = new SocketsDriver(m_loop); m_dns = new DNSDriver(m_events, m_signals); m_files = new FileDriver(m_events); @@ -98,7 +104,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { } -final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers) : EventDriverCore { +final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore { @safe: nothrow: import core.time : Duration; @@ -107,19 +113,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime private { Loop m_loop; Timers m_timers; + Events m_events; bool m_exit = false; FD m_wakeupEvent; } - protected this(Loop loop, Timers timers) + protected this(Loop loop, Timers timers, Events events) { m_loop = loop; m_timers = timers; - - m_wakeupEvent = eventfd(0, EFD_NONBLOCK); - m_loop.initFD(m_wakeupEvent); - m_loop.registerFD(m_wakeupEvent, EventMask.read); - //startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD + m_wakeupEvent = events.create(); } @property size_t waiterCount() const { return m_loop.m_waiterCount; } @@ -810,7 +813,8 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event) { addrinfo hints; - hints.ai_flags = AI_ADDRCONFIG|AI_V4MAPPED; + hints.ai_flags = AI_ADDRCONFIG; + version (linx) hints.ai_flags |= AI_V4MAPPED; hints.ai_family = af; () @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } (); events.trigger(event); @@ -1008,12 +1012,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { final override EventID create() { - auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); - m_loop.initFD(id); - m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation - m_loop.registerFD(id, EventMask.read); - m_loop.startNotify!(EventType.read)(id, &onEvent); - return id; + version (linux) { + auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); + m_loop.initFD(id); + m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation + m_loop.registerFD(id, EventMask.read); + m_loop.startNotify!(EventType.read)(id, &onEvent); + return id; + } else assert(false, "OS not supported!"); } final override void trigger(EventID event, bool notify_all = true) @@ -1084,7 +1090,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { } } -final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals { +final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals { @safe: /*@nogc:*/ nothrow: import core.sys.posix.signal; import core.sys.linux.sys.signalfd; @@ -1156,6 +1162,29 @@ final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals } } +final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals { +@safe: /*@nogc:*/ nothrow: + + private Loop m_loop; + + this(Loop loop) { m_loop = loop; } + + override SignalListenID listen(int sig, SignalCallback on_signal) + { + assert(false); + } + + override void addRef(SignalListenID descriptor) + { + assert(false); + } + + override bool releaseRef(SignalListenID descriptor) + { + assert(false); + } +} + final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers { import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; @@ -1278,17 +1307,7 @@ final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatcher this(Loop loop) { m_loop = loop; } - final override WatcherID watchDirectory(string path, bool recursive) - { - assert(false, "TODO!"); - } - - final override void wait(WatcherID watcher, FileChangesCallback callback) - { - assert(false, "TODO!"); - } - - final override void cancelWait(WatcherID watcher) + final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change) { assert(false, "TODO!"); } diff --git a/source/eventcore/drivers/select.d b/source/eventcore/drivers/select.d index 13f95db..a36ba2b 100644 --- a/source/eventcore/drivers/select.d +++ b/source/eventcore/drivers/select.d @@ -23,6 +23,8 @@ version (Windows) { } +alias SelectEventDriver = PosixEventDriver!SelectEventLoop; + final class SelectEventLoop : PosixEventLoop { @safe: nothrow: override bool doProcessEvents(Duration timeout) diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 1d0a1dd..b0bf2bf 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -20,14 +20,14 @@ version(Windows){ private { // TODO: use CreateFile/HANDLE instead of the Posix API on Windows - extern(C) { + extern(C) nothrow { alias off_t = sizediff_t; int open(in char* name, int mode, ...); int chmod(in char* name, int mode); - int close(int fd); + int close(int fd) @safe; int read(int fd, void *buffer, uint count); int write(int fd, in void *buffer, uint count); - off_t lseek(int fd, off_t offset, int whence); + off_t lseek(int fd, off_t offset, int whence) @safe; } enum O_RDONLY = 0; @@ -152,7 +152,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil void close(FileFD file) { - .close(file); + () @trusted { .close(file); } (); } ulong getSize(FileFD file) @@ -162,7 +162,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil return .lseek(file, 0, SEEK_END); } else { stat_t st; - fstat(file, &st); + () @trusted { fstat(file, &st); } (); return st.st_size; } } @@ -262,7 +262,10 @@ log("start processing"); assert(res, "Concurrent file "~op~"s are disallowed."); auto bytes = buffer; - .lseek(file, offset, SEEK_SET); + version (Windows) { + assert(offset <= off_t.max); + .lseek(file, cast(off_t)offset, SEEK_SET); + } else .lseek(file, offset, SEEK_SET); scope (exit) { log("trigger event");