Split up Posix event driver into separate files.

This commit is contained in:
Sönke Ludwig 2017-01-25 00:11:57 +01:00
parent 2174b7d535
commit d1829669ec
11 changed files with 1922 additions and 1859 deletions

View file

@ -2,9 +2,9 @@ module eventcore.core;
public import eventcore.driver; public import eventcore.driver;
import eventcore.drivers.select; import eventcore.drivers.posix.select;
import eventcore.drivers.epoll; import eventcore.drivers.posix.epoll;
import eventcore.drivers.kqueue; import eventcore.drivers.posix.kqueue;
import eventcore.drivers.libasync; import eventcore.drivers.libasync;
import eventcore.drivers.winapi; import eventcore.drivers.winapi;

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,334 @@
module eventcore.drivers.posix.dns;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
version (Posix) {
import std.socket : UnixAddress;
import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp;
import core.sys.posix.sys.un;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl;
}
version (Windows) {
import core.sys.windows.windows;
import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK;
}
/// getaddrinfo+thread based lookup - does not support true cancellation
version (Posix)
final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS {
import std.parallelism : task, taskPool;
import std.string : toStringz;
private {
static struct Lookup {
DNSLookupCallback callback;
addrinfo* result;
int retcode;
string name;
}
ChoppedVector!Lookup m_lookups;
Events m_events;
EventID m_event;
size_t m_maxHandle;
}
this(Events events, Signals signals)
{
m_events = events;
m_event = events.create();
m_events.wait(m_event, &onDNSSignal);
}
void dispose()
{
m_events.cancelWait(m_event, &onDNSSignal);
m_events.releaseRef(m_event);
}
override DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished)
{
debug (EventCoreLogDNS) print("lookup %s", name);
auto handle = getFreeHandle();
if (handle > m_maxHandle) m_maxHandle = handle;
assert(!m_lookups[handle].result);
Lookup* l = () @trusted { return &m_lookups[handle]; } ();
l.name = name;
l.callback = on_lookup_finished;
auto events = () @trusted { return cast(shared)m_events; } ();
auto t = task!taskFun(l, AddressFamily.UNSPEC, events, m_event);
try t.executeInNewThread();//taskPool.put(t);
catch (Exception e) return DNSLookupID.invalid;
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
return handle;
}
/// public
static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event)
{
debug (EventCoreLogDNS) print("lookup %s start", lookup.name);
addrinfo hints;
hints.ai_flags = AI_ADDRCONFIG;
version (linx) hints.ai_flags |= AI_V4MAPPED;
hints.ai_family = af;
() @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } ();
events.trigger(event, true);
debug (EventCoreLogDNS) print("lookup %s finished", lookup.name);
}
override void cancelLookup(DNSLookupID handle)
{
m_lookups[handle].callback = null;
}
private void onDNSSignal(EventID event)
@trusted nothrow
{
debug (EventCoreLogDNS) print("DNS event triggered");
m_events.wait(m_event, &onDNSSignal);
size_t lastmax;
foreach (i, ref l; m_lookups) {
if (i > m_maxHandle) break;
if (l.callback) {
if (l.result || l.retcode) {
debug (EventCoreLogDNS) print("found finished lookup %s for %s", i, l.name);
auto cb = l.callback;
auto ai = l.result;
DNSStatus status;
switch (l.retcode) {
default: status = DNSStatus.error; break;
case 0: status = DNSStatus.ok; break;
}
l.callback = null;
l.result = null;
l.retcode = 0;
if (i == m_maxHandle) m_maxHandle = lastmax;
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
} else lastmax = i;
}
}
debug (EventCoreLogDNS) print("Max active DNS handle: %s", m_maxHandle);
}
private DNSLookupID getFreeHandle()
@safe nothrow {
assert(m_lookups.length <= int.max);
foreach (i, ref l; m_lookups)
if (!l.callback)
return cast(DNSLookupID)cast(int)i;
return cast(DNSLookupID)cast(int)m_lookups.length;
}
}
/// getaddrinfo_a based asynchronous lookups
final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS {
import core.sys.posix.signal : SIGEV_SIGNAL, SIGRTMIN, sigevent;
private {
static struct Lookup {
gaicb ctx;
DNSLookupCallback callback;
}
ChoppedVector!Lookup m_lookups;
Signals m_signals;
int m_dnsSignal;
SignalListenID m_sighandle;
}
@safe nothrow:
this(Events events, Signals signals)
{
m_signals = signals;
m_dnsSignal = () @trusted { return SIGRTMIN; } ();
m_sighandle = signals.listen(m_dnsSignal, &onDNSSignal);
}
void dispose()
{
m_signals.releaseRef(m_sighandle);
}
override DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished)
{
import std.string : toStringz;
auto handle = getFreeHandle();
sigevent evt;
evt.sigev_notify = SIGEV_SIGNAL;
evt.sigev_signo = m_dnsSignal;
gaicb* res = &m_lookups[handle].ctx;
res.ar_name = name.toStringz();
auto ret = () @trusted { return getaddrinfo_a(GAI_NOWAIT, &res, 1, &evt); } ();
if (ret != 0)
return DNSLookupID.invalid;
m_lookups[handle].callback = on_lookup_finished;
return handle;
}
override void cancelLookup(DNSLookupID handle)
{
gai_cancel(&m_lookups[handle].ctx);
m_lookups[handle].callback = null;
}
private void onDNSSignal(SignalListenID, SignalStatus status, int signal)
@safe nothrow
{
assert(status == SignalStatus.ok);
foreach (i, ref l; m_lookups) {
if (!l.callback) continue;
auto err = gai_error(&l.ctx);
if (err == EAI_INPROGRESS) continue;
DNSStatus status;
switch (err) {
default: status = DNSStatus.error; break;
case 0: status = DNSStatus.ok; break;
}
auto cb = l.callback;
auto ai = l.ctx.ar_result;
l.callback = null;
l.ctx.ar_result = null;
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
}
}
private DNSLookupID getFreeHandle()
{
foreach (i, ref l; m_lookups)
if (!l.callback)
return cast(DNSLookupID)cast(int)i;
return cast(DNSLookupID)cast(int)m_lookups.length;
}
}
version (linux) extern(C) {
import core.sys.posix.signal : sigevent;
struct gaicb {
const(char)* ar_name;
const(char)* ar_service;
const(addrinfo)* ar_request;
addrinfo* ar_result;
}
enum GAI_NOWAIT = 1;
enum EAI_INPROGRESS = -100;
int getaddrinfo_a(int mode, gaicb** list, int nitems, sigevent *sevp);
int gai_error(gaicb *req);
int gai_cancel(gaicb *req);
}
/// ghbn based lookup - does not support cancellation and blocks the thread!
final class EventDriverDNS_GHBN(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS {
import std.parallelism : task, taskPool;
import std.string : toStringz;
private {
static struct Lookup {
DNSLookupCallback callback;
bool success;
int retcode;
string name;
}
size_t m_maxHandle;
}
this(Events events, Signals signals)
{
}
void dispose()
{
}
override DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished)
{
import std.string : toStringz;
auto handle = DNSLookupID(m_maxHandle++);
auto he = () @trusted { return gethostbyname(name.toStringz); } ();
if (he is null) {
on_lookup_finished(handle, DNSStatus.error, null);
return handle;
}
switch (he.h_addrtype) {
default: assert(false, "Invalid address family returned from host lookup.");
case AF_INET: {
sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_addr = () @trusted { return *cast(in_addr*)he.h_addr_list[0]; } ();
scope addr = new RefAddress(() @trusted { return cast(sockaddr*)&sa; } (), sa.sizeof);
RefAddress[1] aa;
aa[0] = addr;
on_lookup_finished(handle, DNSStatus.ok, aa);
} break;
case AF_INET6: {
sockaddr_in6 sa;
sa.sin6_family = AF_INET6;
sa.sin6_addr = () @trusted { return *cast(in6_addr*)he.h_addr_list[0]; } ();
scope addr = new RefAddress(() @trusted { return cast(sockaddr*)&sa; } (), sa.sizeof);
RefAddress[1] aa;
aa[0] = addr;
on_lookup_finished(handle, DNSStatus.ok, aa);
} break;
}
return handle;
}
override void cancelLookup(DNSLookupID) {}
}
package struct DNSSlot {
alias Handle = DNSLookupID;
DNSLookupCallback callback;
}
private void passToDNSCallback()(DNSLookupID id, scope DNSLookupCallback cb, DNSStatus status, addrinfo* ai_orig)
@trusted nothrow
{
import std.typecons : scoped;
try {
typeof(scoped!RefAddress())[16] addrs_prealloc = [
scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(),
scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress(), scoped!RefAddress()
];
//Address[16] addrs;
RefAddress[16] addrs;
auto ai = ai_orig;
size_t addr_count = 0;
while (ai !is null && addr_count < addrs.length) {
RefAddress ua = addrs_prealloc[addr_count]; // FIXME: avoid heap allocation
ua.set(ai.ai_addr, ai.ai_addrlen);
addrs[addr_count] = ua;
addr_count++;
ai = ai.ai_next;
}
cb(id, status, addrs[0 .. addr_count]);
freeaddrinfo(ai_orig);
} catch (Exception e) assert(false, e.msg);
}

View file

@ -0,0 +1,352 @@
/**
Base class for BSD socket based driver implementations.
See_also: `eventcore.drivers.select`, `eventcore.drivers.epoll`, `eventcore.drivers.kqueue`
*/
module eventcore.drivers.posix.driver;
@safe: /*@nogc:*/ nothrow:
public import eventcore.driver;
import eventcore.drivers.posix.dns;
import eventcore.drivers.posix.events;
import eventcore.drivers.posix.signals;
import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers;
import eventcore.drivers.timer;
import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils;
import std.algorithm.comparison : among, min, max;
version (Posix) {
package alias sock_t = int;
}
version (Windows) {
package alias sock_t = size_t;
}
private long currStdTime()
{
import std.datetime : Clock;
scope (failure) assert(false);
return Clock.currStdTime;
}
final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
@safe: /*@nogc:*/ nothrow:
private {
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver);
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop;
alias TimerDriver = LoopTimeoutTimerDriver;
alias SocketsDriver = PosixEventDriverSockets!Loop;
version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
//version (linux) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!Loop;
else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!Loop;
else alias WatcherDriver = PosixEventDriverWatchers!Loop;
Loop m_loop;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
LoopTimeoutTimerDriver m_timers;
SocketsDriver m_sockets;
DNSDriver m_dns;
FileDriver m_files;
WatcherDriver m_watchers;
}
this()
{
m_loop = new Loop;
m_sockets = new SocketsDriver(m_loop);
m_events = new EventsDriver(m_loop, m_sockets);
m_signals = new SignalsDriver(m_loop);
m_timers = new TimerDriver;
m_core = new CoreDriver(m_loop, m_timers, m_events);
m_dns = new DNSDriver(m_events, m_signals);
m_files = new FileDriver(m_events);
m_watchers = new WatcherDriver(m_loop);
}
// force overriding these in the (final) sub classes to avoid virtual calls
final override @property CoreDriver core() { return m_core; }
final override @property EventsDriver events() { return m_events; }
final override @property shared(EventsDriver) events() shared { return m_events; }
final override @property SignalsDriver signals() { return m_signals; }
final override @property TimerDriver timers() { return m_timers; }
final override @property SocketsDriver sockets() { return m_sockets; }
final override @property DNSDriver dns() { return m_dns; }
final override @property FileDriver files() { return m_files; }
final override @property WatcherDriver watchers() { return m_watchers; }
final override void dispose()
{
m_files.dispose();
m_dns.dispose();
m_loop.dispose();
}
}
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents) : EventDriverCore {
@safe: nothrow:
import core.time : Duration;
protected alias ExtraEventsCallback = bool delegate(long);
private {
Loop m_loop;
Timers m_timers;
Events m_events;
bool m_exit = false;
EventID m_wakeupEvent;
}
protected this(Loop loop, Timers timers, Events events)
{
m_loop = loop;
m_timers = timers;
m_events = events;
m_wakeupEvent = events.create();
}
@property size_t waiterCount() const { return m_loop.m_waiterCount; }
final override ExitReason processEvents(Duration timeout)
{
import core.time : hnsecs, seconds;
if (m_exit) {
m_exit = false;
return ExitReason.exited;
}
if (!waiterCount) {
return ExitReason.outOfWaiters;
}
bool got_events;
if (timeout <= 0.seconds) {
got_events = m_loop.doProcessEvents(0.seconds);
m_timers.process(currStdTime);
} else {
long now = currStdTime;
do {
auto nextto = max(min(m_timers.getNextTimeout(now), timeout), 0.seconds);
got_events = m_loop.doProcessEvents(nextto);
long prev_step = now;
now = currStdTime;
got_events |= m_timers.process(now);
if (timeout != Duration.max)
timeout -= (now - prev_step).hnsecs;
} while (timeout > 0.seconds && !m_exit && !got_events);
}
if (m_exit) {
m_exit = false;
return ExitReason.exited;
}
if (!waiterCount) {
return ExitReason.outOfWaiters;
}
if (got_events) return ExitReason.idle;
return ExitReason.timeout;
}
final override void exit()
{
m_exit = true;
() @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } ();
}
final override void clearExitFlag()
{
m_exit = false;
}
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
return rawUserDataImpl(descriptor, size, initialize, destroy);
}
final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
return rawUserDataImpl(descriptor, size, initialize, destroy);
}
private void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
FDSlot* fds = &m_loop.m_fds[descriptor].common;
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");
assert(size <= FDSlot.userData.length, "Requested user data is too large.");
if (size > FDSlot.userData.length) assert(false);
if (!fds.userDataDestructor) {
initialize(fds.userData.ptr);
fds.userDataDestructor = destroy;
}
return m_loop.m_fds[descriptor].common.userData.ptr;
}
}
package class PosixEventLoop {
@safe: nothrow:
import core.time : Duration;
package {
AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds;
size_t m_waiterCount = 0;
}
protected @property int maxFD() const { return cast(int)m_fds.length; }
protected abstract void dispose();
protected abstract bool doProcessEvents(Duration dur);
/// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask);
/// Unregisters the FD for general notification reception.
protected abstract void unregisterFD(FD fd, EventMask mask);
/// Updates the event mask to use for listening for notifications.
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask);
final protected void notify(EventType evt)(FD fd)
{
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd.value].common.callback[evt])
m_fds[fd.value].common.callback[evt](fd);
}
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
{
// TODO: optimize!
foreach (i; 0 .. cast(int)m_fds.length)
if (m_fds[i].common.callback[evt])
del(cast(FD)i);
}
package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
"Overwriting notification callback.");
// ensure that the FD doesn't get closed before the callback gets called.
if (callback !is null) {
m_waiterCount++;
m_fds[fd.value].common.refCount++;
} else {
m_fds[fd.value].common.refCount--;
m_waiterCount--;
}
m_fds[fd.value].common.callback[evt] = callback;
}
package void initFD(FD fd)
{
m_fds[fd.value].common.refCount = 1;
}
package void clearFD(FD fd)
{
if (m_fds[fd.value].common.userDataDestructor)
() @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } ();
m_fds[fd.value] = m_fds.FullField.init;
}
}
alias FDEnumerateCallback = void delegate(FD);
alias FDSlotCallback = void delegate(FD);
private struct FDSlot {
FDSlotCallback[EventType.max+1] callback;
uint refCount;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
@property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0;
if (callback[EventType.read] !is null) ret |= EventMask.read;
if (callback[EventType.write] !is null) ret |= EventMask.write;
if (callback[EventType.status] !is null) ret |= EventMask.status;
return ret;
}
}
enum EventType {
read,
write,
status
}
enum EventMask {
read = 1<<0,
write = 1<<1,
status = 1<<2
}
void log(ARGS...)(string fmt, ARGS args)
@trusted {
import std.stdio : writef, writefln;
import core.thread : Thread;
try {
writef("[%s]: ", Thread.getThis().name);
writefln(fmt, args);
} catch (Exception) {}
}
/*version (Windows) {
import std.c.windows.windows;
import std.c.windows.winsock;
alias EWOULDBLOCK = WSAEWOULDBLOCK;
extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments);
class WSAErrorException : Exception {
int error;
this(string message, string file = __FILE__, size_t line = __LINE__)
{
error = WSAGetLastError();
this(message, error, file, line);
}
this(string message, int error, string file = __FILE__, size_t line = __LINE__)
{
import std.string : format;
ushort* errmsg;
FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS,
null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null);
size_t len = 0;
while (errmsg[len]) len++;
auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup;
LocalFree(errmsg);
super(format("%s: %s (%s)", message, errmsgd, error), file, line);
}
}
alias SystemSocketException = WSAErrorException;
} else {
import std.exception : ErrnoException;
alias SystemSocketException = ErrnoException;
}
T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__)
{
import std.exception : enforceEx;
return enforceEx!SystemSocketException(value, msg, file, line);
}*/

View file

@ -4,12 +4,12 @@
Epoll is an efficient API for asynchronous I/O on Linux, suitable for large Epoll is an efficient API for asynchronous I/O on Linux, suitable for large
numbers of concurrently open sockets. numbers of concurrently open sockets.
*/ */
module eventcore.drivers.epoll; module eventcore.drivers.posix.epoll;
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
version (linux): version (linux):
public import eventcore.drivers.posix; public import eventcore.drivers.posix.driver;
import eventcore.internal.utils; import eventcore.internal.utils;
import core.time : Duration; import core.time : Duration;

View file

@ -0,0 +1,177 @@
module eventcore.drivers.posix.events;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.internal.consumablequeue : ConsumableQueue;
import std.socket : InternetAddress;
version (linux) {
extern (C) int eventfd(uint initval, int flags);
enum EFD_NONBLOCK = 0x800;
}
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
@safe: /*@nogc:*/ nothrow:
private {
Loop m_loop;
Sockets m_sockets;
version (linux) {}
else {
EventSlot[DatagramSocketFD] m_events;
ubyte[long.sizeof] m_buf;
}
}
this(Loop loop, Sockets sockets)
{
m_loop = loop;
m_sockets = sockets;
}
final override EventID create()
{
version (linux) {
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
m_loop.initFD(id);
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
return id;
} else {
auto addr = new InternetAddress(0x7F000001, 0);
auto s = m_sockets.createDatagramSocket(addr, addr);
if (s == DatagramSocketFD.invalid) return EventID.invalid;
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
m_events[s] = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
return cast(EventID)s;
}
}
final override void trigger(EventID event, bool notify_all)
{
auto slot = getSlot(event);
if (notify_all) {
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
foreach (w; slot.waiters.consume) {
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
m_loop.m_waiterCount--;
w(event);
}
} else {
if (!slot.waiters.empty) {
m_loop.m_waiterCount--;
slot.waiters.consumeOne()(event);
}
}
}
final override void trigger(EventID event, bool notify_all)
shared @trusted {
import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriverEvents)this;
assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
long one = 1;
//log("emitting for all threads");
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
version (linux) () @trusted { .write(cast(int)event, &one, one.sizeof); } ();
else thisus.m_sockets.send(cast(DatagramSocketFD)event, thisus.m_buf, IOMode.once, null, &thisus.onSocketDataSent);
}
final override void wait(EventID event, EventCallback on_event)
{
m_loop.m_waiterCount++;
getSlot(event).waiters.put(on_event);
}
final override void cancelWait(EventID event, EventCallback on_event)
{
import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove;
m_loop.m_waiterCount--;
getSlot(event).waiters.removePending(on_event);
}
private void onEvent(FD fd)
@trusted {
EventID event = cast(EventID)fd;
version (linux) {
ulong cnt;
() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
}
import core.atomic : cas;
auto all = cas(&getSlot(event).triggerAll, true, false);
trigger(event, all);
}
version (linux) {}
else {
private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress)
{
}
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
{
onEvent(cast(EventID)s);
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
}
}
final override void addRef(EventID descriptor)
{
assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD.");
getRC(descriptor)++;
}
final override bool releaseRef(EventID descriptor)
{
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
void destroy() {
() @trusted nothrow {
scope (failure) assert(false);
.destroy(getSlot(descriptor).waiters);
assert(getSlot(descriptor).waiters is null);
} ();
}
version (linux) {
if (--getRC(descriptor) == 0) {
destroy();
m_loop.unregisterFD(descriptor, EventMask.read);
m_loop.clearFD(descriptor);
close(cast(int)descriptor);
return false;
}
} else {
if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) {
destroy();
m_events.remove(cast(DatagramSocketFD)descriptor);
return false;
}
}
return true;
}
private EventSlot* getSlot(EventID id)
{
version (linux) {
assert(id < m_loop.m_fds.length, "Invalid event ID.");
return () @trusted { return &m_loop.m_fds[id].event(); } ();
} else {
assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID.");
return &m_events[cast(DatagramSocketFD)id];
}
}
private ref uint getRC(EventID id)
{
return m_loop.m_fds[id].common.refCount;
}
}
package struct EventSlot {
alias Handle = EventID;
ConsumableQueue!EventCallback waiters;
shared bool triggerAll;
}

View file

@ -4,7 +4,7 @@
Kqueue is an efficient API for asynchronous I/O on BSD flavors, including Kqueue is an efficient API for asynchronous I/O on BSD flavors, including
OS X/macOS, suitable for large numbers of concurrently open sockets. OS X/macOS, suitable for large numbers of concurrently open sockets.
*/ */
module eventcore.drivers.kqueue; module eventcore.drivers.posix.kqueue;
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
version (FreeBSD) enum have_kqueue = true; version (FreeBSD) enum have_kqueue = true;
@ -13,7 +13,7 @@ else enum have_kqueue = false;
static if (have_kqueue): static if (have_kqueue):
public import eventcore.drivers.posix; public import eventcore.drivers.posix.driver;
import eventcore.internal.utils; import eventcore.internal.utils;
import core.time : Duration; import core.time : Duration;

View file

@ -5,10 +5,10 @@
Windows. It has a good performance for small numbers of cuncurrently open Windows. It has a good performance for small numbers of cuncurrently open
files/sockets, but is not suited for larger amounts. files/sockets, but is not suited for larger amounts.
*/ */
module eventcore.drivers.select; module eventcore.drivers.posix.select;
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
public import eventcore.drivers.posix; public import eventcore.drivers.posix.driver;
import eventcore.internal.utils; import eventcore.internal.utils;
import core.time : Duration; import core.time : Duration;

View file

@ -0,0 +1,106 @@
module eventcore.drivers.posix.signals;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals {
@safe: /*@nogc:*/ nothrow:
import core.sys.posix.signal;
import core.sys.linux.sys.signalfd;
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{
auto fd = () @trusted {
sigset_t sset;
sigemptyset(&sset);
sigaddset(&sset, sig);
if (sigprocmask(SIG_BLOCK, &sset, null) != 0)
return SignalListenID.invalid;
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK));
} ();
m_loop.initFD(cast(FD)fd);
m_loop.m_fds[fd].specific = SignalSlot(on_signal);
m_loop.registerFD(cast(FD)fd, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
onSignal(cast(FD)fd);
return fd;
}
override void addRef(SignalListenID descriptor)
{
assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
m_loop.m_fds[descriptor].common.refCount++;
}
override bool releaseRef(SignalListenID descriptor)
{
FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd);
close(cast(int)fd);
return false;
}
return true;
}
private void onSignal(FD fd)
{
SignalListenID lid = cast(SignalListenID)fd;
signalfd_siginfo nfo;
do {
auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS))
break;
auto cb = m_loop.m_fds[fd].signal.callback;
if (ret != nfo.sizeof) {
cb(lid, SignalStatus.error, -1);
return;
}
addRef(lid);
cb(lid, SignalStatus.ok, nfo.ssi_signo);
releaseRef(lid);
} while (m_loop.m_fds[fd].common.refCount > 0);
}
}
final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
override SignalListenID listen(int sig, SignalCallback on_signal)
{
assert(false);
}
override void addRef(SignalListenID descriptor)
{
assert(false);
}
override bool releaseRef(SignalListenID descriptor)
{
assert(false);
}
}
package struct SignalSlot {
alias Handle = SignalListenID;
SignalCallback callback;
}

View file

@ -0,0 +1,763 @@
module eventcore.drivers.posix.sockets;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
import eventcore.internal.utils;
import std.algorithm.comparison : among, min, max;
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
version (Posix) {
import std.socket : UnixAddress;
import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
import core.sys.posix.netinet.in_;
import core.sys.posix.netinet.tcp;
import core.sys.posix.sys.un;
import core.sys.posix.unistd : close, read, write;
import core.stdc.errno : errno, EAGAIN, EINPROGRESS;
import core.sys.posix.fcntl;
}
version (Windows) {
import core.sys.windows.windows;
import core.sys.windows.winsock2;
alias sockaddr_storage = SOCKADDR_STORAGE;
alias EAGAIN = WSAEWOULDBLOCK;
enum SHUT_RDWR = SD_BOTH;
enum SHUT_RD = SD_RECEIVE;
enum SHUT_WR = SD_SEND;
extern (C) int read(int fd, void *buffer, uint count) nothrow;
extern (C) int write(int fd, const(void) *buffer, uint count) nothrow;
extern (C) int close(int fd) nothrow @safe;
}
final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override StreamSocketFD connectStream(scope Address address, scope Address bind_address, ConnectCallback on_connect)
{
auto sockfd = createSocket(address.addressFamily, SOCK_STREAM);
if (sockfd == -1) return StreamSocketFD.invalid;
auto sock = cast(StreamSocketFD)sockfd;
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; }
int bret;
() @trusted { // scope + bind()
if (bind_address !is null) {
bret = bind(cast(sock_t)sock, bind_address.name, bind_address.nameLen);
} else {
scope bind_addr = new UnknownAddress;
bind_addr.name.sa_family = cast(ubyte)address.addressFamily;
bind_addr.name.sa_data[] = 0;
bret = bind(cast(sock_t)sock, bind_addr.name, bind_addr.nameLen);
}
} ();
if (bret != 0) {
invalidateSocket();
on_connect(sock, ConnectStatus.bindFailure);
return sock;
}
m_loop.initFD(sock);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
if (ret == 0) {
m_loop.m_fds[sock].specific.state = ConnectionState.connected;
on_connect(sock, ConnectStatus.connected);
} else {
auto err = getSocketError();
if (err.among!(EAGAIN, EINPROGRESS)) {
with (m_loop.m_fds[sock].streamSocket) {
connectCallback = on_connect;
state = ConnectionState.connecting;
}
m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
} else {
m_loop.clearFD(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
invalidateSocket();
on_connect(sock, ConnectStatus.unknownError);
return sock;
}
}
return sock;
}
final override StreamSocketFD adoptStream(int socket)
{
auto fd = StreamSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return StreamSocketFD.invalid;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
return fd;
}
private void onConnect(FD sock)
{
m_loop.setNotifyCallback!(EventType.write)(sock, null);
with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.connected;
connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
connectCallback = null;
}
}
private void onConnectError(FD sock)
{
// FIXME: determine the correct kind of error!
with (m_loop.m_fds[sock].streamSocket) {
state = ConnectionState.closed;
connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
connectCallback = null;
}
}
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
{
log("Listen stream");
auto sockfd = createSocket(address.addressFamily, SOCK_STREAM);
if (sockfd == -1) return StreamListenSocketFD.invalid;
auto sock = cast(StreamListenSocketFD)sockfd;
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; }
() @trusted {
int tmp_reuse = 1;
// FIXME: error handling!
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
log("setsockopt failed.");
invalidateSocket();
} else if (bind(sockfd, address.name, address.nameLen) != 0) {
log("bind failed.");
invalidateSocket();
} else if (listen(sockfd, 128) != 0) {
log("listen failed.");
invalidateSocket();
} else log("Success!");
} ();
if (sock == StreamListenSocketFD.invalid)
return sock;
m_loop.initFD(sock);
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
if (on_accept) waitForConnections(sock, on_accept);
return sock;
}
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{
log("wait for conn");
m_loop.registerFD(sock, EventMask.read);
m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept);
onAccept(sock);
}
private void onAccept(FD listenfd)
{
foreach (i; 0 .. 20) {
sock_t sockfd;
sockaddr_storage addr;
socklen_t addr_len = addr.sizeof;
() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
if (sockfd == -1) break;
setSocketNonBlocking(cast(SocketFD)sockfd);
auto fd = cast(StreamSocketFD)sockfd;
m_loop.initFD(fd);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.m_fds[fd].specific.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
//print("accept %d", sockfd);
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
}
}
ConnectionState getConnectionState(StreamSocketFD sock)
{
return m_loop.m_fds[sock].streamSocket.state;
}
bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst)
{
socklen_t addr_len = dst.nameLen;
if (() @trusted { return getsockname(cast(sock_t)sock, dst.name, &addr_len); } () != 0)
return false;
dst.cap(addr_len);
return true;
}
final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{
int opt = enable;
() @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
}
final override void setKeepAlive(StreamSocketFD socket, bool enable)
{
ubyte opt = enable;
() @trusted { setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, cast(char*)&opt, opt.sizeof); } ();
}
final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
{
/*if (buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, 0);
return;
}*/
sizediff_t ret;
() @trusted { ret = .recv(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0);
return;
}
}
if (ret == 0 && buffer.length > 0) {
print("disconnect");
on_read_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
print("wouldblock");
on_read_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret >= 0) {
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, ret);
return;
}
}
// NOTE: since we know that not all data was read from the stream
// socket, the next call to recv is guaranteed to return EGAIN
// and we can avoid that call.
with (m_loop.m_fds[socket].streamSocket) {
readCallback = on_read_finish;
readMode = mode;
bytesRead = ret > 0 ? ret : 0;
readBuffer = buffer;
}
m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketRead);
}
override void cancelRead(StreamSocketFD socket)
{
assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null);
with (m_loop.m_fds[socket].streamSocket) {
readBuffer = null;
}
}
private void onSocketRead(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
m_loop.setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead);
}
sizediff_t ret = 0;
() @trusted { ret = .recv(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
finalize(IOStatus.error);
return;
}
}
if (ret == 0 && slot.readBuffer.length) {
slot.state = ConnectionState.passiveClose;
finalize(IOStatus.disconnected);
return;
}
if (ret > 0 || !slot.readBuffer.length) {
slot.bytesRead += ret;
slot.readBuffer = slot.readBuffer[ret .. $];
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
finalize(IOStatus.ok);
return;
}
}
}
final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{
if (buffer.length == 0) {
on_write_finish(socket, IOStatus.ok, 0);
return;
}
sizediff_t ret;
() @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
on_write_finish(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_written = 0;
if (ret == 0) {
on_write_finish(socket, IOStatus.disconnected, 0);
return;
}
if (ret < 0 && mode == IOMode.immediate) {
on_write_finish(socket, IOStatus.wouldBlock, 0);
return;
}
if (ret > 0) {
bytes_written += ret;
buffer = buffer[ret .. $];
if (mode != IOMode.all || buffer.length == 0) {
on_write_finish(socket, IOStatus.ok, bytes_written);
return;
}
}
// NOTE: since we know that not all data was writtem to the stream
// socket, the next call to send is guaranteed to return EGAIN
// and we can avoid that call.
with (m_loop.m_fds[socket].streamSocket) {
writeCallback = on_write_finish;
writeMode = mode;
bytesWritten = ret > 0 ? ret : 0;
writeBuffer = buffer;
}
m_loop.setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
}
override void cancelWrite(StreamSocketFD socket)
{
assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_loop.m_fds[socket].streamSocket.writeBuffer = null;
}
private void onSocketWrite(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd;
sizediff_t ret;
() @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), 0); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(socket, IOStatus.error, slot.bytesRead);
return;
}
}
if (ret == 0) {
m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.disconnected, slot.bytesWritten);
return;
}
if (ret > 0) {
slot.bytesWritten += ret;
slot.writeBuffer = slot.writeBuffer[ret .. $];
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
return;
}
}
}
final override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
{
sizediff_t ret;
ubyte dummy;
() @trusted { ret = recv(cast(sock_t)socket, &dummy, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
on_data_available(socket, IOStatus.error, 0);
return;
}
}
size_t bytes_read = 0;
if (ret == 0) {
on_data_available(socket, IOStatus.disconnected, 0);
return;
}
if (ret > 0) {
on_data_available(socket, IOStatus.ok, 0);
return;
}
with (m_loop.m_fds[socket].streamSocket) {
readCallback = on_data_available;
readMode = IOMode.once;
bytesRead = 0;
readBuffer = null;
}
m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable);
}
private void onSocketDataAvailable(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status)
{
m_loop.setNotifyCallback!(EventType.read)(socket, null);
//m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, 0);
}
sizediff_t ret;
ubyte tmp;
() @trusted { ret = recv(cast(sock_t)socket, &tmp, 1, MSG_PEEK); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) finalize(IOStatus.error);
} else finalize(ret ? IOStatus.ok : IOStatus.disconnected);
}
final override void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write)
{
auto st = m_loop.m_fds[socket].streamSocket.state;
() @trusted { .shutdown(cast(sock_t)socket, shut_read ? shut_write ? SHUT_RDWR : SHUT_RD : shut_write ? SHUT_WR : 0); } ();
if (st == ConnectionState.passiveClose) shut_read = true;
if (st == ConnectionState.activeClose) shut_write = true;
m_loop.m_fds[socket].streamSocket.state = shut_read ? shut_write ? ConnectionState.closed : ConnectionState.passiveClose : shut_write ? ConnectionState.activeClose : ConnectionState.connected;
}
final override DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address)
{
auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM);
if (sockfd == -1) return DatagramSocketFD.invalid;
auto sock = cast(DatagramSocketFD)sockfd;
if (bind_address && () @trusted { return bind(sockfd, bind_address.name, bind_address.nameLen); } () != 0) {
closeSocket(sockfd);
return DatagramSocketFD.init;
}
if (target_address) {
int ret;
if (target_address is bind_address) {
// special case of bind_address==target_address: determine the actual bind address
// in case of a zero port
sockaddr_storage sa;
socklen_t addr_len = sa.sizeof;
if (() @trusted { return getsockname(sockfd, cast(sockaddr*)&sa, &addr_len); } () != 0) {
closeSocket(sockfd);
return DatagramSocketFD.init;
}
ret = () @trusted { return connect(sockfd, cast(sockaddr*)&sa, addr_len); } ();
} else ret = () @trusted { return connect(sockfd, target_address.name, target_address.nameLen); } ();
if (ret != 0) {
closeSocket(sockfd);
return DatagramSocketFD.init;
}
}
m_loop.initFD(sock);
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
return sock;
}
final override DatagramSocketFD adoptDatagramSocket(int socket)
{
auto fd = DatagramSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
return fd;
}
final override bool setBroadcast(DatagramSocketFD socket, bool enable)
{
int tmp_broad = enable;
return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
}
void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish)
@trusted { // DMD 2.072.0-b2: scope considered unsafe
import std.typecons : scoped;
assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
sizediff_t ret;
sockaddr_storage src_addr;
socklen_t src_addr_len = src_addr.sizeof;
() @trusted { ret = .recvfrom(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s for %s!", err, socket);
on_receive_finish(socket, IOStatus.error, 0, null);
return;
}
if (mode == IOMode.immediate) {
on_receive_finish(socket, IOStatus.wouldBlock, 0, null);
} else {
with (m_loop.m_fds[socket].datagramSocket) {
readCallback = on_receive_finish;
readMode = mode;
bytesRead = 0;
readBuffer = buffer;
}
m_loop.setNotifyCallback!(EventType.read)(socket, &onDgramRead);
}
return;
}
scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr_len);
on_receive_finish(socket, IOStatus.ok, ret, src_addrc);
}
void cancelReceive(DatagramSocketFD socket)
{
assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null);
m_loop.m_fds[socket].datagramSocket.readBuffer = null;
}
private void onDgramRead(FD fd)
@trusted { // DMD 2.072.0-b2: scope considered unsafe
auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret;
sockaddr_storage src_addr;
socklen_t src_addr_len = src_addr.sizeof;
() @trusted { ret = .recvfrom(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.read)(socket, null);
slot.readCallback(socket, IOStatus.error, 0, null);
return;
}
}
m_loop.setNotifyCallback!(EventType.read)(socket, null);
scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addrc);
}
void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish)
{
assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
sizediff_t ret;
if (target_address) {
() @trusted { ret = .sendto(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0, target_address.name, target_address.nameLen); } ();
m_loop.m_fds[socket].datagramSocket.targetAddr = target_address;
} else {
() @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0); } ();
}
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s!", err);
on_send_finish(socket, IOStatus.error, 0, null);
return;
}
if (mode == IOMode.immediate) {
on_send_finish(socket, IOStatus.wouldBlock, 0, null);
} else {
with (m_loop.m_fds[socket].datagramSocket) {
writeCallback = on_send_finish;
writeMode = mode;
bytesWritten = 0;
writeBuffer = buffer;
}
m_loop.setNotifyCallback!(EventType.write)(socket, &onDgramWrite);
}
return;
}
on_send_finish(socket, IOStatus.ok, ret, null);
}
void cancelSend(DatagramSocketFD socket)
{
assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_loop.m_fds[socket].datagramSocket.writeBuffer = null;
}
private void onDgramWrite(FD fd)
{
auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret;
if (slot.targetAddr) {
() @trusted { ret = .sendto(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), 0, slot.targetAddr.name, slot.targetAddr.nameLen); } ();
} else {
() @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), 0); } ();
}
if (ret < 0) {
auto err = getSocketError();
if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.write)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
return;
}
}
m_loop.setNotifyCallback!(EventType.write)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
}
final override void addRef(SocketFD fd)
{
assert(m_loop.m_fds[fd].common.refCount > 0, "Adding reference to unreferenced socket FD.");
m_loop.m_fds[fd].common.refCount++;
}
final override bool releaseRef(SocketFD fd)
{
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD(fd);
closeSocket(cast(sock_t)fd);
return false;
}
return true;
}
private sock_t createSocket(AddressFamily family, int type)
{
sock_t sock;
() @trusted { sock = socket(family, type, 0); } ();
if (sock == -1) return -1;
setSocketNonBlocking(cast(SocketFD)sock);
return sock;
}
}
package struct StreamSocketSlot {
alias Handle = StreamSocketFD;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
IOCallback readCallback; // FIXME: this type only works for stream sockets
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
IOCallback writeCallback; // FIXME: this type only works for stream sockets
ConnectCallback connectCallback;
ConnectionState state;
}
package struct StreamListenSocketSlot {
alias Handle = StreamListenSocketFD;
AcceptCallback acceptCallback;
}
package struct DgramSocketSlot {
alias Handle = DatagramSocketFD;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
DatagramIOCallback readCallback; // FIXME: this type only works for stream sockets
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
DatagramIOCallback writeCallback; // FIXME: this type only works for stream sockets
Address targetAddr;
}
private void closeSocket(sock_t sockfd)
@nogc nothrow {
version (Windows) () @trusted { closesocket(sockfd); } ();
else close(sockfd);
}
private void setSocketNonBlocking(SocketFD sockfd)
@nogc nothrow {
version (Windows) {
uint enable = 1;
() @trusted { ioctlsocket(sockfd, FIONBIO, &enable); } ();
} else {
() @trusted { fcntl(cast(int)sockfd, F_SETFL, O_NONBLOCK, 1); } ();
}
}
private int getSocketError()
@nogc nothrow {
version (Windows) return WSAGetLastError();
else return errno;
}

View file

@ -0,0 +1,181 @@
module eventcore.drivers.posix.watchers;
@safe:
import eventcore.driver;
import eventcore.drivers.posix.driver;
final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers
{
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
import std.file;
private {
Loop m_loop;
string[int][WatcherID] m_watches; // TODO: use a @nogc (allocator based) map
}
this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback)
{
enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
auto handle = () @trusted { return inotify_init1(IN_NONBLOCK); } ();
if (handle == -1) return WatcherID.invalid;
auto ret = WatcherID(handle);
addWatch(ret, path);
if (recursive) {
try {
if (path.isDir) () @trusted {
foreach (de; path.dirEntries(SpanMode.shallow))
if (de.isDir) addWatch(ret, de.name);
} ();
} catch (Exception e) {
// TODO: decide if this should be ignored or if the error should be forwarded
}
}
m_loop.initFD(FD(handle));
m_loop.registerFD(FD(handle), EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
m_loop.m_fds[handle].specific = WatcherSlot(callback);
processEvents(WatcherID(handle));
return ret;
}
final override void addRef(WatcherID descriptor)
{
assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
m_loop.m_fds[descriptor].common.refCount++;
}
final override bool releaseRef(WatcherID descriptor)
{
FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd);
m_watches.remove(descriptor);
/*errnoEnforce(*/close(cast(int)fd)/* == 0)*/;
return false;
}
return true;
}
private void onChanges(FD fd)
{
processEvents(cast(WatcherID)fd);
}
private void processEvents(WatcherID id)
{
import core.stdc.stdio : FILENAME_MAX;
import core.stdc.string : strlen;
ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
while (true) {
auto ret = () @trusted { return read(cast(int)id, &buf[0], buf.length); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS))
break;
assert(ret <= buf.length);
auto rem = buf[0 .. ret];
while (rem.length > 0) {
auto ev = () @trusted { return cast(inotify_event*)rem.ptr; } ();
FileChange ch;
if (ev.mask & (IN_CREATE|IN_MOVED_TO))
ch.kind = FileChangeKind.added;
else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM))
ch.kind = FileChangeKind.removed;
else if (ev.mask & IN_MODIFY)
ch.kind = FileChangeKind.modified;
auto name = () @trusted { return ev.name.ptr[0 .. strlen(ev.name.ptr)]; } ();
ch.directory = m_watches[id][ev.wd];
ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
ch.name = name;
addRef(id);
auto cb = m_loop.m_fds[ id].watcher.callback;
cb(id, ch);
if (!releaseRef(id)) break;
rem = rem[inotify_event.sizeof + ev.len .. $];
}
}
}
private bool addWatch(WatcherID handle, string path)
{
import std.string : toStringz;
enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY |
IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO;
immutable wd = () @trusted { return inotify_add_watch(cast(int)handle, path.toStringz, EVENTS); } ();
if (wd == -1) return false;
m_watches[handle][wd] = path;
return true;
}
}
version (OSX)
final class FSEventsEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{
/*FSEventStreamCreate
FSEventStreamScheduleWithRunLoop
FSEventStreamStart*/
assert(false, "TODO!");
}
final override void addRef(WatcherID descriptor)
{
assert(false, "TODO!");
}
final override bool releaseRef(WatcherID descriptor)
{
/*FSEventStreamStop
FSEventStreamUnscheduleFromRunLoop
FSEventStreamInvalidate
FSEventStreamRelease*/
assert(false, "TODO!");
}
}
final class PosixEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
private Loop m_loop;
this(Loop loop) { m_loop = loop; }
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
{
assert(false, "TODO!");
}
final override void addRef(WatcherID descriptor)
{
assert(false, "TODO!");
}
final override bool releaseRef(WatcherID descriptor)
{
assert(false, "TODO!");
}
}
package struct WatcherSlot {
alias Handle = WatcherID;
FileChangesCallback callback;
}