Improve cross-platform organization and fix compilation on Windows (posix driver).

This commit is contained in:
Sönke Ludwig 2016-10-18 11:53:52 +02:00
parent c12fefadcf
commit 5f243cbb18
7 changed files with 118 additions and 53 deletions

26
dub.sdl
View file

@ -1,15 +1,37 @@
name "eventcore" name "eventcore"
description "Experimental callback based abstraction layer over operating system asynchronous I/O facilities." description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities."
license "MIT" license "MIT"
copyright "Copyright © 2012-2016 rejectedsoftware e.K." copyright "Copyright © 2012-2016 rejectedsoftware e.K."
targetType "library" targetType "library"
libs "anl" platform="linux" libs "anl" platform="linux"
libs "ws2_32" platform="windows"
configuration "native" { configuration "epoll" {
platforms "linux"
versions "EventcoreEpollDriver"
}
configuration "kqueue" {
platforms "osx" "freebsd"
versions "EventcoreKqueueDriver"
}
configuration "winapi" {
platforms "windows"
versions "EventcoreWinAPIDriver"
}
configuration "select" {
versions "EventcoreSelectDriver"
} }
configuration "libasync" { configuration "libasync" {
dependency "libasync" version="~>0.7.9" dependency "libasync" version="~>0.7.9"
versions "EventcoreLibasyncDriver"
}
configuration "generic" {
// Defines eventDriver as the generic EventDriver interface. Setup must be done manually.
} }

View file

@ -2,21 +2,29 @@ module eventcore.core;
public import eventcore.driver; public import eventcore.driver;
import eventcore.drivers.epoll;
import eventcore.drivers.libasync;
import eventcore.drivers.select; import eventcore.drivers.select;
import eventcore.drivers.posix; import eventcore.drivers.epoll;
import eventcore.drivers.kqueue;
import eventcore.drivers.libasync;
import eventcore.drivers.winapi;
version (Have_libasync) alias NativeEventDriver = LibasyncEventDriver; version (EventcoreEpollDriver) alias NativeEventDriver = EpollEventDriver;
else version (linux) alias NativeEventDriver = PosixEventDriver!EpollEventLoop; else version (EventcoreKqueueDriver) alias NativeEventDriver = KqueueEventDriver;
else alias NativeEventDriver = PosixEventDriver!SelectEventLoop; else version (EventcoreWinAPIDriver) alias NativeEventDriver = WinAPIEventDriver;
else version (EventcoreLibasyncDriver) alias NativeEventDriver = LibasyncEventDriver;
else version (EventcoreSelectDriver) alias NativeEventDriver = SelectEventDriver;
else alias NativeEventDriver = EventDriver;
@property EventDriver eventDriver() @property NativeEventDriver eventDriver()
@safe @nogc nothrow { @safe @nogc nothrow {
static if (is(NativeEventDriver == EventDriver))
assert(s_driver !is null, "setupEventDriver() was not called for this thread.");
else
assert(s_driver !is null, "eventcore.core static constructor didn't run!?"); assert(s_driver !is null, "eventcore.core static constructor didn't run!?");
return s_driver; return s_driver;
} }
static if (!is(NativeEventDriver == EventDriver)) {
static this() static this()
{ {
if (!s_driver) s_driver = new NativeEventDriver; if (!s_driver) s_driver = new NativeEventDriver;
@ -26,5 +34,13 @@ shared static this()
{ {
s_driver = new NativeEventDriver; s_driver = new NativeEventDriver;
} }
} else {
void setupEventDriver(EventDriver driver)
{
assert(driver !is null, "The event driver instance must be non-null.");
assert(!s_driver, "Can only set up the event driver once per thread.");
s_driver = driver;
}
}
private NativeEventDriver s_driver; private NativeEventDriver s_driver;

View file

@ -16,6 +16,7 @@ import core.time : Duration;
import core.sys.posix.sys.time : timeval; import core.sys.posix.sys.time : timeval;
import core.sys.linux.epoll; import core.sys.linux.epoll;
alias EpollEventDriver = PosixEventDriver!EpollEventLoop;
final class EpollEventLoop : PosixEventLoop { final class EpollEventLoop : PosixEventLoop {
@safe: nothrow: @safe: nothrow:

View file

@ -27,6 +27,8 @@ else static assert(false, "Kqueue not supported on this OS.");
import core.sys.linux.epoll; import core.sys.linux.epoll;
alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop;
final class KqueueEventLoop : PosixEventLoop { final class KqueueEventLoop : PosixEventLoop {
private { private {
int m_queue; int m_queue;

View file

@ -12,8 +12,9 @@ import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils; import eventcore.internal.utils;
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnixAddress, UnknownAddress; import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
version (Posix) { version (Posix) {
import std.socket : UnixAddress;
import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo; import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
import core.sys.posix.netinet.in_; import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp; import core.sys.posix.netinet.tcp;
@ -24,7 +25,11 @@ version (Posix) {
} }
version (Windows) { version (Windows) {
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK; alias EAGAIN = WSAEWOULDBLOCK;
extern (C) int read(int fd, void *buffer, uint count) nothrow;
extern (C) int write(int fd, const(void) *buffer, uint count) nothrow;
extern (C) int close(int fd) nothrow @safe;
} }
version (linux) { version (linux) {
extern (C) int eventfd(uint initval, int flags); extern (C) int eventfd(uint initval, int flags);
@ -44,9 +49,10 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
private { private {
alias CoreDriver = PosixEventDriverCore!(Loop, LoopTimeoutTimerDriver); alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver);
alias EventsDriver = PosixEventDriverEvents!Loop; alias EventsDriver = PosixEventDriverEvents!Loop;
alias SignalsDriver = PosixEventDriverSignals!Loop; version (linx) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop;
alias TimerDriver = LoopTimeoutTimerDriver; alias TimerDriver = LoopTimeoutTimerDriver;
alias SocketsDriver = PosixEventDriverSockets!Loop; alias SocketsDriver = PosixEventDriverSockets!Loop;
version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
@ -72,7 +78,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
m_events = new EventsDriver(m_loop); m_events = new EventsDriver(m_loop);
m_signals = new SignalsDriver(m_loop); m_signals = new SignalsDriver(m_loop);
m_timers = new TimerDriver; m_timers = new TimerDriver;
m_core = new CoreDriver(m_loop, m_timers); m_core = new CoreDriver(m_loop, m_timers, m_events);
m_sockets = new SocketsDriver(m_loop); m_sockets = new SocketsDriver(m_loop);
m_dns = new DNSDriver(m_events, m_signals); m_dns = new DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events); m_files = new FileDriver(m_events);
@ -98,7 +104,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
} }
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers) : EventDriverCore { final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore {
@safe: nothrow: @safe: nothrow:
import core.time : Duration; import core.time : Duration;
@ -107,19 +113,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
private { private {
Loop m_loop; Loop m_loop;
Timers m_timers; Timers m_timers;
Events m_events;
bool m_exit = false; bool m_exit = false;
FD m_wakeupEvent; FD m_wakeupEvent;
} }
protected this(Loop loop, Timers timers) protected this(Loop loop, Timers timers, Events events)
{ {
m_loop = loop; m_loop = loop;
m_timers = timers; m_timers = timers;
m_wakeupEvent = events.create();
m_wakeupEvent = eventfd(0, EFD_NONBLOCK);
m_loop.initFD(m_wakeupEvent);
m_loop.registerFD(m_wakeupEvent, EventMask.read);
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
} }
@property size_t waiterCount() const { return m_loop.m_waiterCount; } @property size_t waiterCount() const { return m_loop.m_waiterCount; }
@ -810,7 +813,8 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event) static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event)
{ {
addrinfo hints; addrinfo hints;
hints.ai_flags = AI_ADDRCONFIG|AI_V4MAPPED; hints.ai_flags = AI_ADDRCONFIG;
version (linx) hints.ai_flags |= AI_V4MAPPED;
hints.ai_family = af; hints.ai_family = af;
() @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } (); () @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } ();
events.trigger(event); events.trigger(event);
@ -1008,12 +1012,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
final override EventID create() final override EventID create()
{ {
version (linux) {
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
m_loop.initFD(id); m_loop.initFD(id);
m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.startNotify!(EventType.read)(id, &onEvent); m_loop.startNotify!(EventType.read)(id, &onEvent);
return id; return id;
} else assert(false, "OS not supported!");
} }
final override void trigger(EventID event, bool notify_all = true) final override void trigger(EventID event, bool notify_all = true)
@ -1084,7 +1090,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
} }
} }
final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals { final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.sys.posix.signal; import core.sys.posix.signal;
import core.sys.linux.sys.signalfd; import core.sys.linux.sys.signalfd;
@ -1156,6 +1162,29 @@ final class PosixEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals
} }
} }
final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{
assert(false);
}
override void addRef(SignalListenID descriptor)
{
assert(false);
}
override bool releaseRef(SignalListenID descriptor)
{
assert(false);
}
}
final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers
{ {
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
@ -1278,17 +1307,7 @@ final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatcher
this(Loop loop) { m_loop = loop; } this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive) final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{
assert(false, "TODO!");
}
final override void wait(WatcherID watcher, FileChangesCallback callback)
{
assert(false, "TODO!");
}
final override void cancelWait(WatcherID watcher)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }

View file

@ -23,6 +23,8 @@ version (Windows) {
} }
alias SelectEventDriver = PosixEventDriver!SelectEventLoop;
final class SelectEventLoop : PosixEventLoop { final class SelectEventLoop : PosixEventLoop {
@safe: nothrow: @safe: nothrow:
override bool doProcessEvents(Duration timeout) override bool doProcessEvents(Duration timeout)

View file

@ -20,14 +20,14 @@ version(Windows){
private { private {
// TODO: use CreateFile/HANDLE instead of the Posix API on Windows // TODO: use CreateFile/HANDLE instead of the Posix API on Windows
extern(C) { extern(C) nothrow {
alias off_t = sizediff_t; alias off_t = sizediff_t;
int open(in char* name, int mode, ...); int open(in char* name, int mode, ...);
int chmod(in char* name, int mode); int chmod(in char* name, int mode);
int close(int fd); int close(int fd) @safe;
int read(int fd, void *buffer, uint count); int read(int fd, void *buffer, uint count);
int write(int fd, in void *buffer, uint count); int write(int fd, in void *buffer, uint count);
off_t lseek(int fd, off_t offset, int whence); off_t lseek(int fd, off_t offset, int whence) @safe;
} }
enum O_RDONLY = 0; enum O_RDONLY = 0;
@ -152,7 +152,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
void close(FileFD file) void close(FileFD file)
{ {
.close(file); () @trusted { .close(file); } ();
} }
ulong getSize(FileFD file) ulong getSize(FileFD file)
@ -162,7 +162,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
return .lseek(file, 0, SEEK_END); return .lseek(file, 0, SEEK_END);
} else { } else {
stat_t st; stat_t st;
fstat(file, &st); () @trusted { fstat(file, &st); } ();
return st.st_size; return st.st_size;
} }
} }
@ -262,7 +262,10 @@ log("start processing");
assert(res, "Concurrent file "~op~"s are disallowed."); assert(res, "Concurrent file "~op~"s are disallowed.");
auto bytes = buffer; auto bytes = buffer;
.lseek(file, offset, SEEK_SET); version (Windows) {
assert(offset <= off_t.max);
.lseek(file, cast(off_t)offset, SEEK_SET);
} else .lseek(file, offset, SEEK_SET);
scope (exit) { scope (exit) {
log("trigger event"); log("trigger event");