Make the API robust against using invalid handles. Fixes #105.
Introduces a "validationCounter" field for all handle types that gets incremented (at least) whenever an OS file descriptor/handle gets invalidated or re-allocated. This way, an old eventcore handle to a reused OS handle can always be distinguished from the current one to avoid interference.
This commit is contained in:
parent
768c6cf4c8
commit
496e99c3b4
24 changed files with 1243 additions and 586 deletions
|
@ -35,6 +35,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
|||
static struct Lookup {
|
||||
shared(bool) done;
|
||||
DNSLookupCallback callback;
|
||||
uint validationCounter;
|
||||
addrinfo* result;
|
||||
int retcode;
|
||||
string name;
|
||||
|
@ -44,6 +45,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
|||
Events m_events;
|
||||
EventID m_event = EventID.invalid;
|
||||
size_t m_maxHandle;
|
||||
uint m_validationCounter;
|
||||
}
|
||||
|
||||
this(Events events, Signals signals)
|
||||
|
@ -64,7 +66,7 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
|||
override DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished)
|
||||
{
|
||||
debug (EventCoreLogDNS) print("lookup %s", name);
|
||||
auto handle = getFreeHandle();
|
||||
auto handle = allocateHandle();
|
||||
if (handle > m_maxHandle) m_maxHandle = handle;
|
||||
|
||||
assert(on_lookup_finished !is null, "Null callback passed to lookupHost");
|
||||
|
@ -123,15 +125,22 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
|||
|
||||
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
|
||||
m_events.loop.m_waiterCount++;
|
||||
return handle;
|
||||
return DNSLookupID(handle, l.validationCounter);
|
||||
}
|
||||
|
||||
override void cancelLookup(DNSLookupID handle)
|
||||
{
|
||||
if (!isValid(handle)) return;
|
||||
m_lookups[handle].callback = null;
|
||||
m_events.loop.m_waiterCount--;
|
||||
}
|
||||
|
||||
override bool isValid(DNSLookupID handle)
|
||||
const {
|
||||
if (handle.value >= m_lookups.length) return false;
|
||||
return m_lookups[handle.value].validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
private void onDNSSignal(EventID event)
|
||||
@trusted nothrow
|
||||
{
|
||||
|
@ -171,20 +180,26 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
|
|||
l.done = false;
|
||||
if (i == m_maxHandle) m_maxHandle = lastmax;
|
||||
m_events.loop.m_waiterCount--;
|
||||
passToDNSCallback(cast(DNSLookupID)cast(int)i, cb, status, ai);
|
||||
passToDNSCallback(DNSLookupID(i, l.validationCounter), cb, status, ai);
|
||||
} else lastmax = i;
|
||||
}
|
||||
}
|
||||
debug (EventCoreLogDNS) print("Max active DNS handle: %s", m_maxHandle);
|
||||
}
|
||||
|
||||
private DNSLookupID getFreeHandle()
|
||||
private DNSLookupID allocateHandle()
|
||||
@safe nothrow {
|
||||
assert(m_lookups.length <= int.max);
|
||||
int id = cast(int)m_lookups.length;
|
||||
foreach (i, ref l; m_lookups)
|
||||
if (!l.callback)
|
||||
return cast(DNSLookupID)cast(int)i;
|
||||
return cast(DNSLookupID)cast(int)m_lookups.length;
|
||||
if (!l.callback) {
|
||||
id = cast(int)i;
|
||||
break;
|
||||
}
|
||||
|
||||
auto vc = ++m_validationCounter;
|
||||
m_lookups[id].validationCounter = vc;
|
||||
return DNSLookupID(cast(int)id, vc);
|
||||
}
|
||||
|
||||
private void setupEvent()
|
||||
|
@ -204,12 +219,14 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
|||
private {
|
||||
static struct Lookup {
|
||||
gaicb ctx;
|
||||
uint validationCounter;
|
||||
DNSLookupCallback callback;
|
||||
}
|
||||
ChoppedVector!Lookup m_lookups;
|
||||
Events m_events;
|
||||
Signals m_signals;
|
||||
int m_dnsSignal;
|
||||
uint m_validationCounter;
|
||||
SignalListenID m_sighandle;
|
||||
}
|
||||
|
||||
|
@ -232,7 +249,7 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
|||
{
|
||||
import std.string : toStringz;
|
||||
|
||||
auto handle = getFreeHandle();
|
||||
auto handle = allocateHandle();
|
||||
|
||||
sigevent evt;
|
||||
evt.sigev_notify = SIGEV_SIGNAL;
|
||||
|
@ -260,6 +277,13 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
|||
m_events.loop.m_waiterCount--;
|
||||
}
|
||||
|
||||
override bool isValid(DNSLookupID handle)
|
||||
{
|
||||
if (handle.value >= m_lookups.length)
|
||||
return false;
|
||||
return m_lookups[handle.value].validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
private void onDNSSignal(SignalListenID, SignalStatus status, int signal)
|
||||
@safe nothrow
|
||||
{
|
||||
|
@ -284,11 +308,14 @@ final class EventDriverDNS_GAIA(Events : EventDriverEvents, Signals : EventDrive
|
|||
}
|
||||
}
|
||||
|
||||
private DNSLookupID getFreeHandle()
|
||||
private DNSLookupID allocateHandle()
|
||||
{
|
||||
foreach (i, ref l; m_lookups)
|
||||
if (!l.callback)
|
||||
if (!l.callback) {
|
||||
m_lookups[i].validationCounter = ++m_validationCounter;
|
||||
return cast(DNSLookupID)cast(int)i;
|
||||
}
|
||||
m_lookups[m_lookups.length].validationCounter = ++m_validationCounter;
|
||||
return cast(DNSLookupID)cast(int)m_lookups.length;
|
||||
}
|
||||
}
|
||||
|
@ -344,7 +371,7 @@ final class EventDriverDNS_GHBN(Events : EventDriverEvents, Signals : EventDrive
|
|||
{
|
||||
import std.string : toStringz;
|
||||
|
||||
auto handle = DNSLookupID(m_maxHandle++);
|
||||
auto handle = DNSLookupID(m_maxHandle++, 0);
|
||||
|
||||
auto he = () @trusted { return gethostbyname(name.toStringz); } ();
|
||||
if (he is null) {
|
||||
|
@ -377,6 +404,11 @@ final class EventDriverDNS_GHBN(Events : EventDriverEvents, Signals : EventDrive
|
|||
}
|
||||
|
||||
override void cancelLookup(DNSLookupID) {}
|
||||
|
||||
override bool isValid(DNSLookupID)
|
||||
const {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
package struct DNSSlot {
|
||||
|
|
|
@ -160,6 +160,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
|
||||
protected alias ExtraEventsCallback = bool delegate(long);
|
||||
|
||||
private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t);
|
||||
|
||||
private {
|
||||
Loop m_loop;
|
||||
Timers m_timers;
|
||||
|
@ -169,7 +171,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
EventID m_wakeupEvent;
|
||||
|
||||
shared Mutex m_threadCallbackMutex;
|
||||
ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)) m_threadCallbacks;
|
||||
ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
|
||||
}
|
||||
|
||||
this(Loop loop, Timers timers, Events events, Processes processes)
|
||||
|
@ -186,7 +188,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
() @trusted { m_threadCallbackMutex = cast(shared)mallocT!Mutex; } ();
|
||||
}
|
||||
|
||||
m_threadCallbacks = mallocT!(ConsumableQueue!(Tuple!(ThreadCallback, intptr_t)));
|
||||
m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
|
||||
m_threadCallbacks.reserve(1000);
|
||||
}
|
||||
|
||||
|
@ -260,7 +262,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
m_exit = false;
|
||||
}
|
||||
|
||||
final override void runInOwnerThread(ThreadCallback del, intptr_t param)
|
||||
final override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2)
|
||||
shared {
|
||||
auto m = atomicLoad(m_threadCallbackMutex);
|
||||
auto evt = atomicLoad(m_wakeupEvent);
|
||||
|
@ -273,12 +275,14 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
try {
|
||||
synchronized (m)
|
||||
() @trusted { return (cast()this).m_threadCallbacks; } ()
|
||||
.put(tuple(del, param));
|
||||
.put(ThreadCallbackEntry(del, param1, param2));
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
|
||||
m_events.trigger(m_wakeupEvent, false);
|
||||
}
|
||||
|
||||
alias runInOwnerThread = EventDriverCore.runInOwnerThread;
|
||||
|
||||
|
||||
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
|
@ -300,14 +304,14 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
|
|||
import std.stdint : intptr_t;
|
||||
|
||||
while (true) {
|
||||
Tuple!(ThreadCallback, intptr_t) del;
|
||||
ThreadCallbackEntry del;
|
||||
try {
|
||||
synchronized (m_threadCallbackMutex) {
|
||||
if (m_threadCallbacks.empty) break;
|
||||
del = m_threadCallbacks.consumeOne;
|
||||
}
|
||||
} catch (Exception e) assert(false, e.msg);
|
||||
del[0](del[1]);
|
||||
del[0](del[1], del[2]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -336,11 +340,13 @@ package class PosixEventLoop {
|
|||
/// Updates the event mask to use for listening for notifications.
|
||||
protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc;
|
||||
|
||||
final protected void notify(EventType evt)(FD fd)
|
||||
final protected void notify(EventType evt)(size_t fd)
|
||||
{
|
||||
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
|
||||
if (m_fds[fd.value].common.callback[evt])
|
||||
m_fds[fd.value].common.callback[evt](fd);
|
||||
if (m_fds[fd].common.callback[evt]) {
|
||||
auto vc = m_fds[fd].common.validationCounter;
|
||||
m_fds[fd].common.callback[evt](FD(fd, vc));
|
||||
}
|
||||
}
|
||||
|
||||
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
|
||||
|
@ -348,7 +354,7 @@ package class PosixEventLoop {
|
|||
// TODO: optimize!
|
||||
foreach (i; 0 .. cast(int)m_fds.length)
|
||||
if (m_fds[i].common.callback[evt])
|
||||
del(cast(FD)i);
|
||||
del(FD(i, m_fds[i].common.validationCounter));
|
||||
}
|
||||
|
||||
package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
|
||||
|
@ -370,17 +376,23 @@ package class PosixEventLoop {
|
|||
}
|
||||
}
|
||||
|
||||
package void initFD(T)(FD fd, FDFlags flags, auto ref T slot_init)
|
||||
package FDType initFD(FDType, T)(size_t fd, FDFlags flags, auto ref T slot_init)
|
||||
{
|
||||
with (m_fds[fd.value]) {
|
||||
uint vc;
|
||||
|
||||
with (m_fds[fd]) {
|
||||
assert(common.refCount == 0, "Initializing referenced file descriptor slot.");
|
||||
assert(specific.kind == typeof(specific).Kind.none, "Initializing slot that has not been cleared.");
|
||||
common.refCount = 1;
|
||||
common.flags = flags;
|
||||
specific = slot_init;
|
||||
vc = common.validationCounter;
|
||||
}
|
||||
|
||||
if (!(flags & FDFlags.internal))
|
||||
m_handleCount++;
|
||||
|
||||
return FDType(fd, vc);
|
||||
}
|
||||
|
||||
package void clearFD(T)(FD fd)
|
||||
|
@ -388,6 +400,7 @@ package class PosixEventLoop {
|
|||
import taggedalgebraic : hasType;
|
||||
|
||||
auto slot = () @trusted { return &m_fds[fd.value]; } ();
|
||||
assert(slot.common.validationCounter == fd.validationCounter, "Clearing FD slot for invalid FD");
|
||||
assert(slot.common.refCount == 0, "Clearing referenced file descriptor slot.");
|
||||
assert(slot.specific.hasType!T, "Clearing file descriptor slot with unmatched type.");
|
||||
|
||||
|
@ -400,7 +413,10 @@ package class PosixEventLoop {
|
|||
foreach (cb; slot.common.callback)
|
||||
if (cb !is null)
|
||||
m_waiterCount--;
|
||||
|
||||
auto vc = slot.common.validationCounter;
|
||||
*slot = m_fds.FullField.init;
|
||||
slot.common.validationCounter = vc + 1;
|
||||
}
|
||||
|
||||
package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
|
@ -426,6 +442,7 @@ alias FDSlotCallback = void delegate(FD);
|
|||
private struct FDSlot {
|
||||
FDSlotCallback[EventType.max+1] callback;
|
||||
uint refCount;
|
||||
uint validationCounter;
|
||||
FDFlags flags;
|
||||
|
||||
DataInitializer userDataDestructor;
|
||||
|
|
|
@ -51,7 +51,7 @@ final class EpollEventLoop : PosixEventLoop {
|
|||
if (ret > 0) {
|
||||
foreach (ref evt; m_events[0 .. ret]) {
|
||||
debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events);
|
||||
auto fd = cast(FD)evt.data.fd;
|
||||
auto fd = cast(size_t)evt.data.fd;
|
||||
if (evt.events & (EPOLLERR|EPOLLHUP|EPOLLRDHUP)) notify!(EventType.status)(fd);
|
||||
if (evt.events & EPOLLIN) notify!(EventType.read)(fd);
|
||||
if (evt.events & EPOLLOUT) notify!(EventType.write)(fd);
|
||||
|
|
|
@ -45,9 +45,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
version (linux) {
|
||||
auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
|
||||
if (eid == -1) return EventID.invalid;
|
||||
auto id = cast(EventID)eid;
|
||||
// FIXME: avoid dynamic memory allocation for the queue
|
||||
m_loop.initFD(id, FDFlags.internal,
|
||||
auto id = m_loop.initFD!EventID(eid, FDFlags.internal,
|
||||
EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal));
|
||||
m_loop.registerFD(id, EventMask.read);
|
||||
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
||||
|
@ -101,19 +100,20 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
|
||||
// use the second socket as the event ID and as the sending end for
|
||||
// other threads
|
||||
auto id = cast(EventID)fd[1];
|
||||
try m_sockets.userData!EventID(s) = id;
|
||||
catch (Exception e) assert(false, e.msg);
|
||||
// FIXME: avoid dynamic memory allocation for the queue
|
||||
m_loop.initFD(id, FDFlags.internal,
|
||||
auto id = m_loop.initFD!EventID(fd[1], FDFlags.internal,
|
||||
EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s));
|
||||
assert(getRC(id) == 1);
|
||||
try m_sockets.userData!EventID(s) = id;
|
||||
catch (Exception e) assert(false, e.msg);
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
final override void trigger(EventID event, bool notify_all)
|
||||
{
|
||||
if (!isValid(event)) return;
|
||||
|
||||
auto slot = getSlot(event);
|
||||
if (notify_all) {
|
||||
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
|
||||
|
@ -141,6 +141,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
|
||||
final override void wait(EventID event, EventCallback on_event)
|
||||
@nogc {
|
||||
if (!isValid(event)) return;
|
||||
|
||||
if (!isInternal(event)) m_loop.m_waiterCount++;
|
||||
getSlot(event).waiters.put(on_event);
|
||||
}
|
||||
|
@ -150,6 +152,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
import std.algorithm.searching : countUntil;
|
||||
import std.algorithm.mutation : remove;
|
||||
|
||||
if (!isValid(event)) return;
|
||||
|
||||
if (!isInternal(event)) m_loop.m_waiterCount--;
|
||||
getSlot(event).waiters.removePending(on_event);
|
||||
}
|
||||
|
@ -176,14 +180,24 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
}
|
||||
}
|
||||
|
||||
override bool isValid(EventID handle)
|
||||
const {
|
||||
if (handle.value >= m_loop.m_fds.length) return false;
|
||||
return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
final override void addRef(EventID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return;
|
||||
|
||||
assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD.");
|
||||
getRC(descriptor)++;
|
||||
}
|
||||
|
||||
final override bool releaseRef(EventID descriptor)
|
||||
@nogc {
|
||||
if (!isValid(descriptor)) return true;
|
||||
|
||||
nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
||||
if (--getRC(descriptor) == 0) {
|
||||
if (!isInternal(descriptor))
|
||||
|
@ -209,6 +223,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
|||
|
||||
final protected override void* rawUserData(EventID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ final class KqueueEventLoop : PosixEventLoop {
|
|||
foreach (ref evt; m_events[0 .. ret]) {
|
||||
//print("event %s %s", evt.ident, evt.filter, evt.flags);
|
||||
assert(evt.ident <= uint.max);
|
||||
auto fd = cast(FD)cast(int)evt.ident;
|
||||
auto fd = cast(size_t)evt.ident;
|
||||
if (evt.flags & (EV_EOF|EV_ERROR))
|
||||
notify!(EventType.status)(fd);
|
||||
switch (evt.filter) {
|
||||
|
|
|
@ -10,383 +10,420 @@ import std.algorithm : min, max;
|
|||
|
||||
final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
|
||||
@safe: /*@nogc:*/ nothrow:
|
||||
import core.stdc.errno : errno, EAGAIN;
|
||||
import core.sys.posix.unistd : close, read, write;
|
||||
import core.sys.posix.fcntl;
|
||||
import core.sys.posix.poll;
|
||||
import core.stdc.errno : errno, EAGAIN;
|
||||
import core.sys.posix.unistd : close, read, write;
|
||||
import core.sys.posix.fcntl;
|
||||
import core.sys.posix.poll;
|
||||
|
||||
private Loop m_loop;
|
||||
private Loop m_loop;
|
||||
|
||||
this(Loop loop)
|
||||
@nogc {
|
||||
m_loop = loop;
|
||||
}
|
||||
this(Loop loop)
|
||||
@nogc {
|
||||
m_loop = loop;
|
||||
}
|
||||
|
||||
final override PipeFD adopt(int system_fd)
|
||||
{
|
||||
auto fd = PipeFD(system_fd);
|
||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||
return PipeFD.invalid;
|
||||
final override PipeFD adopt(int system_fd)
|
||||
{
|
||||
if (m_loop.m_fds[system_fd].common.refCount) // FD already in use?
|
||||
return PipeFD.invalid;
|
||||
|
||||
// Suprisingly cannot use O_CLOEXEC here, so use FD_CLOEXEC instead.
|
||||
() @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK | FD_CLOEXEC); } ();
|
||||
|
||||
m_loop.initFD(fd, FDFlags.none, PipeSlot.init);
|
||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||
return fd;
|
||||
}
|
||||
auto fd = m_loop.initFD!PipeFD(system_fd, FDFlags.none, PipeSlot.init);
|
||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||
return fd;
|
||||
}
|
||||
|
||||
final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
|
||||
{
|
||||
auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
|
||||
final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
|
||||
{
|
||||
if (!isValid(pipe)) {
|
||||
on_read_finish(pipe, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
// Read failed
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
print("Pipe error %s!", err);
|
||||
on_read_finish(pipe, IOStatus.error, 0);
|
||||
return;
|
||||
}
|
||||
}
|
||||
auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
|
||||
|
||||
// EOF
|
||||
if (ret == 0 && buffer.length > 0) {
|
||||
on_read_finish(pipe, IOStatus.disconnected, 0);
|
||||
return;
|
||||
}
|
||||
// Read failed
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
print("Pipe error %s!", err);
|
||||
on_read_finish(pipe, IOStatus.error, 0);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle immediate mode
|
||||
if (ret < 0 && mode == IOMode.immediate) {
|
||||
on_read_finish(pipe, IOStatus.wouldBlock, 0);
|
||||
return;
|
||||
}
|
||||
// EOF
|
||||
if (ret == 0 && buffer.length > 0) {
|
||||
on_read_finish(pipe, IOStatus.disconnected, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle successful read
|
||||
if (ret >= 0) {
|
||||
buffer = buffer[ret .. $];
|
||||
// Handle immediate mode
|
||||
if (ret < 0 && mode == IOMode.immediate) {
|
||||
on_read_finish(pipe, IOStatus.wouldBlock, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle completed read
|
||||
if (mode != IOMode.all || buffer.length == 0) {
|
||||
on_read_finish(pipe, IOStatus.ok, ret);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Handle successful read
|
||||
if (ret >= 0) {
|
||||
buffer = buffer[ret .. $];
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
|
||||
slot.readCallback = on_read_finish;
|
||||
slot.readMode = mode;
|
||||
slot.bytesRead = max(ret, 0);
|
||||
slot.readBuffer = buffer;
|
||||
// Handle completed read
|
||||
if (mode != IOMode.all || buffer.length == 0) {
|
||||
on_read_finish(pipe, IOStatus.ok, ret);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Need to use EventType.status as well, as pipes don't otherwise notify
|
||||
// of closes
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead);
|
||||
m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead);
|
||||
}
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
|
||||
slot.readCallback = on_read_finish;
|
||||
slot.readMode = mode;
|
||||
slot.bytesRead = max(ret, 0);
|
||||
slot.readBuffer = buffer;
|
||||
|
||||
private void onPipeRead(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
// Need to use EventType.status as well, as pipes don't otherwise notify
|
||||
// of closes
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead);
|
||||
m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead);
|
||||
}
|
||||
|
||||
void finalize(IOStatus status)
|
||||
{
|
||||
addRef(pipe);
|
||||
scope(exit) releaseRef(pipe);
|
||||
private void onPipeRead(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
m_loop.setNotifyCallback!(EventType.status)(pipe, null);
|
||||
auto cb = slot.readCallback;
|
||||
slot.readCallback = null;
|
||||
slot.readBuffer = null;
|
||||
cb(pipe, status, slot.bytesRead);
|
||||
}
|
||||
void finalize(IOStatus status)
|
||||
{
|
||||
addRef(pipe);
|
||||
scope(exit) releaseRef(pipe);
|
||||
|
||||
ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } ();
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
m_loop.setNotifyCallback!(EventType.status)(pipe, null);
|
||||
auto cb = slot.readCallback;
|
||||
slot.readCallback = null;
|
||||
slot.readBuffer = null;
|
||||
cb(pipe, status, slot.bytesRead);
|
||||
}
|
||||
|
||||
// Read failed
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
print("Pipe error %s!", err);
|
||||
finalize(IOStatus.error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } ();
|
||||
|
||||
// EOF
|
||||
if (ret == 0 && slot.readBuffer.length > 0) {
|
||||
finalize(IOStatus.disconnected);
|
||||
return;
|
||||
}
|
||||
// Read failed
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
print("Pipe error %s!", err);
|
||||
finalize(IOStatus.error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Successful read
|
||||
if (ret > 0 || !slot.readBuffer.length) {
|
||||
slot.readBuffer = slot.readBuffer[ret .. $];
|
||||
slot.bytesRead += ret;
|
||||
// EOF
|
||||
if (ret == 0 && slot.readBuffer.length > 0) {
|
||||
finalize(IOStatus.disconnected);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle completed read
|
||||
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
|
||||
finalize(IOStatus.ok);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Successful read
|
||||
if (ret > 0 || !slot.readBuffer.length) {
|
||||
slot.readBuffer = slot.readBuffer[ret .. $];
|
||||
slot.bytesRead += ret;
|
||||
|
||||
final override void cancelRead(PipeFD pipe)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.readCallback !is null, "Cancelling read when there is no read in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
slot.readBuffer = null;
|
||||
}
|
||||
// Handle completed read
|
||||
if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
|
||||
finalize(IOStatus.ok);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
|
||||
{
|
||||
if (buffer.length == 0) {
|
||||
on_write_finish(pipe, IOStatus.ok, 0);
|
||||
return;
|
||||
}
|
||||
final override void cancelRead(PipeFD pipe)
|
||||
{
|
||||
if (!isValid(pipe)) return;
|
||||
|
||||
ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.readCallback !is null, "Cancelling read when there is no read in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
slot.readBuffer = null;
|
||||
}
|
||||
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
on_write_finish(pipe, IOStatus.error, 0);
|
||||
return;
|
||||
}
|
||||
final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
|
||||
{
|
||||
if (!isValid(pipe)) {
|
||||
on_write_finish(pipe, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mode == IOMode.immediate) {
|
||||
on_write_finish(pipe, IOStatus.wouldBlock, 0);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
buffer = buffer[ret .. $];
|
||||
if (buffer.length == 0) {
|
||||
on_write_finish(pipe, IOStatus.ok, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mode != IOMode.all || buffer.length == 0) {
|
||||
on_write_finish(pipe, IOStatus.ok, ret);
|
||||
return;
|
||||
}
|
||||
}
|
||||
ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.writeCallback is null, "Concurrent writes not allowed.");
|
||||
slot.writeCallback = on_write_finish;
|
||||
slot.writeMode = mode;
|
||||
slot.bytesWritten = max(ret, 0);
|
||||
slot.writeBuffer = buffer;
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
on_write_finish(pipe, IOStatus.error, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite);
|
||||
}
|
||||
if (mode == IOMode.immediate) {
|
||||
on_write_finish(pipe, IOStatus.wouldBlock, 0);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
buffer = buffer[ret .. $];
|
||||
|
||||
private void onPipeWrite(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
if (mode != IOMode.all || buffer.length == 0) {
|
||||
on_write_finish(pipe, IOStatus.ok, ret);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void finalize(IOStatus status)
|
||||
{
|
||||
addRef(pipe);
|
||||
scope(exit) releaseRef(pipe);
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.writeCallback is null, "Concurrent writes not allowed.");
|
||||
slot.writeCallback = on_write_finish;
|
||||
slot.writeMode = mode;
|
||||
slot.bytesWritten = max(ret, 0);
|
||||
slot.writeBuffer = buffer;
|
||||
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
|
||||
auto cb = slot.writeCallback;
|
||||
slot.writeCallback = null;
|
||||
slot.writeBuffer = null;
|
||||
cb(pipe, status, slot.bytesWritten);
|
||||
}
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite);
|
||||
}
|
||||
|
||||
ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } ();
|
||||
private void onPipeWrite(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
finalize(IOStatus.error);
|
||||
}
|
||||
} else {
|
||||
slot.bytesWritten += ret;
|
||||
slot.writeBuffer = slot.writeBuffer[ret .. $];
|
||||
void finalize(IOStatus status)
|
||||
{
|
||||
addRef(pipe);
|
||||
scope(exit) releaseRef(pipe);
|
||||
|
||||
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
|
||||
finalize(IOStatus.ok);
|
||||
}
|
||||
}
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
|
||||
auto cb = slot.writeCallback;
|
||||
slot.writeCallback = null;
|
||||
slot.writeBuffer = null;
|
||||
cb(pipe, status, slot.bytesWritten);
|
||||
}
|
||||
|
||||
}
|
||||
ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } ();
|
||||
|
||||
final override void cancelWrite(PipeFD pipe)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
|
||||
slot.writeCallback = null;
|
||||
slot.writeBuffer = null;
|
||||
}
|
||||
if (ret < 0) {
|
||||
auto err = errno;
|
||||
if (err != EAGAIN) {
|
||||
finalize(IOStatus.error);
|
||||
}
|
||||
} else {
|
||||
slot.bytesWritten += ret;
|
||||
slot.writeBuffer = slot.writeBuffer[ret .. $];
|
||||
|
||||
final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
|
||||
{
|
||||
if (pollPipe(pipe, on_data_available))
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
|
||||
finalize(IOStatus.ok);
|
||||
}
|
||||
}
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
}
|
||||
|
||||
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
|
||||
slot.readCallback = on_data_available;
|
||||
slot.readMode = IOMode.once; // currently meaningless
|
||||
slot.bytesRead = 0; // currently meaningless
|
||||
slot.readBuffer = null;
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable);
|
||||
}
|
||||
final override void cancelWrite(PipeFD pipe)
|
||||
{
|
||||
if (!isValid(pipe)) return;
|
||||
|
||||
private void onPipeDataAvailable(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.write)(pipe, null);
|
||||
slot.writeCallback = null;
|
||||
slot.writeBuffer = null;
|
||||
}
|
||||
|
||||
auto callback = (PipeFD f, IOStatus s, size_t m) {
|
||||
addRef(f);
|
||||
scope(exit) releaseRef(f);
|
||||
final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
|
||||
{
|
||||
if (!isValid(pipe)) {
|
||||
on_data_available(pipe, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
auto cb = slot.readCallback;
|
||||
slot.readCallback = null;
|
||||
slot.readBuffer = null;
|
||||
cb(f, s, m);
|
||||
};
|
||||
if (pollPipe(pipe, on_data_available))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (pollPipe(pipe, callback))
|
||||
{
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
}
|
||||
}
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
|
||||
|
||||
private bool pollPipe(PipeFD pipe, PipeIOCallback callback)
|
||||
@trusted {
|
||||
// Use poll to check if any data is available
|
||||
pollfd pfd;
|
||||
pfd.fd = cast(int)pipe;
|
||||
pfd.events = POLLIN;
|
||||
int ret = poll(&pfd, 1, 0);
|
||||
assert(slot.readCallback is null, "Concurrent reads are not allowed.");
|
||||
slot.readCallback = on_data_available;
|
||||
slot.readMode = IOMode.once; // currently meaningless
|
||||
slot.bytesRead = 0; // currently meaningless
|
||||
slot.readBuffer = null;
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable);
|
||||
}
|
||||
|
||||
if (ret == -1) {
|
||||
print("Error polling pipe: %s!", errno);
|
||||
callback(pipe, IOStatus.error, 0);
|
||||
return true;
|
||||
}
|
||||
private void onPipeDataAvailable(FD fd)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
|
||||
auto pipe = cast(PipeFD)fd;
|
||||
|
||||
if (ret == 1) {
|
||||
callback(pipe, IOStatus.error, 0);
|
||||
return true;
|
||||
}
|
||||
auto callback = (PipeFD f, IOStatus s, size_t m) {
|
||||
addRef(f);
|
||||
scope(exit) releaseRef(f);
|
||||
|
||||
return false;
|
||||
}
|
||||
auto cb = slot.readCallback;
|
||||
slot.readCallback = null;
|
||||
slot.readBuffer = null;
|
||||
cb(f, s, m);
|
||||
};
|
||||
|
||||
final override void close(PipeFD pipe)
|
||||
{
|
||||
// TODO: Maybe actually close here instead of waiting for releaseRef?
|
||||
close(cast(int)pipe);
|
||||
}
|
||||
if (pollPipe(pipe, callback))
|
||||
{
|
||||
m_loop.setNotifyCallback!(EventType.read)(pipe, null);
|
||||
}
|
||||
}
|
||||
|
||||
final override void addRef(PipeFD pipe)
|
||||
{
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
|
||||
assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD.");
|
||||
slot.common.refCount++;
|
||||
}
|
||||
private bool pollPipe(PipeFD pipe, PipeIOCallback callback)
|
||||
@trusted {
|
||||
// Use poll to check if any data is available
|
||||
pollfd pfd;
|
||||
pfd.fd = cast(int)pipe;
|
||||
pfd.events = POLLIN;
|
||||
int ret = poll(&pfd, 1, 0);
|
||||
|
||||
final override bool releaseRef(PipeFD pipe)
|
||||
{
|
||||
import taggedalgebraic : hasType;
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
|
||||
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
|
||||
if (ret == -1) {
|
||||
print("Error polling pipe: %s!", errno);
|
||||
callback(pipe, IOStatus.error, 0);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (--slot.common.refCount == 0) {
|
||||
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
|
||||
m_loop.clearFD!PipeSlot(pipe);
|
||||
if (ret == 1) {
|
||||
callback(pipe, IOStatus.error, 0);
|
||||
return true;
|
||||
}
|
||||
|
||||
close(cast(int)pipe);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
|
||||
}
|
||||
final override void close(PipeFD pipe)
|
||||
{
|
||||
if (!isValid(pipe)) return;
|
||||
|
||||
// TODO: Maybe actually close here instead of waiting for releaseRef?
|
||||
close(cast(int)pipe);
|
||||
}
|
||||
|
||||
override bool isValid(PipeFD handle)
|
||||
const {
|
||||
if (handle.value >= m_loop.m_fds.length) return false;
|
||||
return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
final override void addRef(PipeFD pipe)
|
||||
{
|
||||
if (!isValid(pipe)) return;
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
|
||||
assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD.");
|
||||
slot.common.refCount++;
|
||||
}
|
||||
|
||||
final override bool releaseRef(PipeFD pipe)
|
||||
{
|
||||
import taggedalgebraic : hasType;
|
||||
|
||||
if (!isValid(pipe)) return true;
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
|
||||
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
|
||||
|
||||
if (--slot.common.refCount == 0) {
|
||||
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
|
||||
m_loop.clearFD!PipeSlot(pipe);
|
||||
|
||||
close(cast(int)pipe);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
|
||||
}
|
||||
>>>>>>> 568465d... Make the API robust against using invalid handles. Fixes #105.
|
||||
}
|
||||
|
||||
final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
|
||||
@safe: /*@nogc:*/ nothrow:
|
||||
this(Loop loop) {}
|
||||
this(Loop loop) {}
|
||||
|
||||
override PipeFD adopt(int system_pipe_handle)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override PipeFD adopt(int system_pipe_handle)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void cancelRead(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void cancelRead(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void cancelWrite(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void cancelWrite(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void close(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void close(PipeFD pipe)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override void addRef(PipeFD pid)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override bool isValid(PipeFD handle)
|
||||
const {
|
||||
return false;
|
||||
}
|
||||
|
||||
override bool releaseRef(PipeFD pid)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override void addRef(PipeFD pid)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
override bool releaseRef(PipeFD pid)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
package struct PipeSlot {
|
||||
alias Handle = PipeFD;
|
||||
alias Handle = PipeFD;
|
||||
|
||||
size_t bytesRead;
|
||||
ubyte[] readBuffer;
|
||||
IOMode readMode;
|
||||
PipeIOCallback readCallback;
|
||||
size_t bytesRead;
|
||||
ubyte[] readBuffer;
|
||||
IOMode readMode;
|
||||
PipeIOCallback readCallback;
|
||||
|
||||
size_t bytesWritten;
|
||||
const(ubyte)[] writeBuffer;
|
||||
IOMode writeMode;
|
||||
PipeIOCallback writeCallback;
|
||||
size_t bytesWritten;
|
||||
const(ubyte)[] writeBuffer;
|
||||
IOMode writeMode;
|
||||
PipeIOCallback writeCallback;
|
||||
}
|
||||
|
|
|
@ -22,12 +22,13 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
private {
|
||||
static shared Mutex s_mutex;
|
||||
static __gshared ProcessInfo[ProcessID] s_processes;
|
||||
static __gshared ProcessInfo[int] s_processes;
|
||||
static __gshared Thread s_waitThread;
|
||||
|
||||
Loop m_loop;
|
||||
// FIXME: avoid virtual funciton calls and use the final type instead
|
||||
EventDriver m_driver;
|
||||
uint m_validationCounter;
|
||||
}
|
||||
|
||||
this(Loop loop, EventDriver driver)
|
||||
|
@ -42,13 +43,14 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
final override ProcessID adopt(int system_pid)
|
||||
{
|
||||
auto pid = cast(ProcessID)system_pid;
|
||||
|
||||
ProcessInfo info;
|
||||
info.exited = false;
|
||||
info.refCount = 1;
|
||||
info.validationCounter = ++m_validationCounter;
|
||||
info.driver = this;
|
||||
add(pid, info);
|
||||
|
||||
auto pid = ProcessID(system_pid, info.validationCounter);
|
||||
add(system_pid, info);
|
||||
return pid;
|
||||
}
|
||||
|
||||
|
@ -154,7 +156,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
@trusted {
|
||||
import core.sys.posix.signal : pkill = kill;
|
||||
|
||||
assert(cast(int)pid > 0, "Invalid PID passed to kill.");
|
||||
if (!isValid(pid)) return;
|
||||
|
||||
if (cast(int)pid > 0)
|
||||
pkill(cast(int)pid, signal);
|
||||
|
@ -167,7 +169,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
size_t id = size_t.max;
|
||||
lockedProcessInfo(pid, (info) {
|
||||
assert(info !is null, "Unknown process ID");
|
||||
if (!info) return;
|
||||
|
||||
if (info.exited) {
|
||||
exited = true;
|
||||
|
@ -190,7 +192,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
if (wait_id == size_t.max) return;
|
||||
|
||||
lockedProcessInfo(pid, (info) {
|
||||
assert(info !is null, "Unknown process ID");
|
||||
if (!info) return;
|
||||
|
||||
assert(!info.exited, "Cannot cancel wait when none are pending");
|
||||
assert(info.callbacks.length > wait_id, "Invalid process wait ID");
|
||||
|
||||
|
@ -205,18 +208,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
private static void onLocalProcessExit(intptr_t system_pid)
|
||||
{
|
||||
auto pid = cast(ProcessID)system_pid;
|
||||
|
||||
int exitCode;
|
||||
ProcessWaitCallback[] callbacks;
|
||||
|
||||
ProcessID pid;
|
||||
|
||||
PosixEventDriverProcesses driver;
|
||||
lockedProcessInfo(pid, (info) {
|
||||
lockedProcessInfoPlain(cast(int)system_pid, (info) {
|
||||
assert(info !is null);
|
||||
|
||||
exitCode = info.exitCode;
|
||||
|
||||
callbacks = info.callbacks;
|
||||
pid = ProcessID(cast(int)system_pid, info.validationCounter);
|
||||
info.callbacks = null;
|
||||
|
||||
driver = info.driver;
|
||||
|
@ -234,15 +237,25 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
{
|
||||
bool ret;
|
||||
lockedProcessInfo(pid, (info) {
|
||||
assert(info !is null, "Unknown process ID");
|
||||
ret = info.exited;
|
||||
if (info) ret = info.exited;
|
||||
else ret = true;
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
override bool isValid(ProcessID handle)
|
||||
const {
|
||||
s_mutex.lock_nothrow();
|
||||
scope (exit) s_mutex.unlock_nothrow();
|
||||
auto info = () @trusted { return cast(int)handle.value in s_processes; } ();
|
||||
return info && info.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
final override void addRef(ProcessID pid)
|
||||
{
|
||||
lockedProcessInfo(pid, (info) {
|
||||
if (!info) return;
|
||||
|
||||
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
|
||||
info.refCount++;
|
||||
});
|
||||
|
@ -252,13 +265,18 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
{
|
||||
bool ret;
|
||||
lockedProcessInfo(pid, (info) {
|
||||
if (!info) {
|
||||
ret = true;
|
||||
return;
|
||||
}
|
||||
|
||||
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
|
||||
if (--info.refCount == 0) {
|
||||
// Remove/deallocate process
|
||||
if (info.userDataDestructor)
|
||||
() @trusted { info.userDataDestructor(info.userData.ptr); } ();
|
||||
|
||||
() @trusted { s_processes.remove(pid); } ();
|
||||
() @trusted { s_processes.remove(cast(int)pid.value); } ();
|
||||
ret = false;
|
||||
} else ret = true;
|
||||
});
|
||||
|
@ -290,7 +308,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
s_mutex = new shared Mutex;
|
||||
}
|
||||
|
||||
private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
|
||||
private static void lockedProcessInfoPlain(int pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
|
||||
{
|
||||
s_mutex.lock_nothrow();
|
||||
scope (exit) s_mutex.unlock_nothrow();
|
||||
|
@ -298,7 +316,14 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
fn(info);
|
||||
}
|
||||
|
||||
private static void add(ProcessID pid, ProcessInfo info) @trusted {
|
||||
private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
|
||||
{
|
||||
lockedProcessInfoPlain(cast(int)pid.value, (pi) {
|
||||
fn(pi.validationCounter == pid.validationCounter ? pi : null);
|
||||
});
|
||||
}
|
||||
|
||||
private static void add(int pid, ProcessInfo info) @trusted {
|
||||
s_mutex.lock_nothrow();
|
||||
scope (exit) s_mutex.unlock_nothrow();
|
||||
|
||||
|
@ -328,7 +353,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
break;
|
||||
}
|
||||
|
||||
ProcessID[] allprocs;
|
||||
int[] allprocs;
|
||||
|
||||
{
|
||||
s_mutex.lock_nothrow();
|
||||
|
@ -345,8 +370,8 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
foreach (pid; allprocs) {
|
||||
int status;
|
||||
ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
|
||||
if (ret == cast(int)pid) {
|
||||
ret = () @trusted { return waitpid(pid, &status, WNOHANG); } ();
|
||||
if (ret == pid) {
|
||||
int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
|
||||
onProcessExitStatic(ret, exitstatus);
|
||||
}
|
||||
|
@ -356,24 +381,21 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
|
||||
private static void onProcessExitStatic(int system_pid, int exit_status)
|
||||
{
|
||||
auto pid = cast(ProcessID)system_pid;
|
||||
|
||||
PosixEventDriverProcesses driver;
|
||||
lockedProcessInfo(pid, (ProcessInfo* info) @safe {
|
||||
lockedProcessInfoPlain(system_pid, (ProcessInfo* info) @safe {
|
||||
// We get notified of any child exiting, so ignore the ones we're
|
||||
// not aware of
|
||||
if (info is null) return;
|
||||
|
||||
// Increment the ref count to make sure it doesn't get removed
|
||||
info.refCount++;
|
||||
|
||||
info.exited = true;
|
||||
info.exitCode = exit_status;
|
||||
driver = info.driver;
|
||||
});
|
||||
|
||||
if (driver)
|
||||
() @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid);
|
||||
() @trusted { return cast(shared)driver; } ().onProcessExit(system_pid);
|
||||
}
|
||||
|
||||
private static struct ProcessInfo {
|
||||
|
@ -381,6 +403,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
int exitCode;
|
||||
ProcessWaitCallback[] callbacks;
|
||||
size_t refCount = 0;
|
||||
uint validationCounter;
|
||||
PosixEventDriverProcesses driver;
|
||||
|
||||
DataInitializer userDataDestructor;
|
||||
|
@ -425,6 +448,11 @@ final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
|
|||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
override bool isValid(ProcessID handle)
|
||||
const {
|
||||
return false;
|
||||
}
|
||||
|
||||
override void addRef(ProcessID pid)
|
||||
{
|
||||
assert(false, "TODO!");
|
||||
|
|
|
@ -26,19 +26,19 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
|
|||
|
||||
package SignalListenID listenInternal(int sig, SignalCallback on_signal, bool is_internal = true)
|
||||
{
|
||||
auto fd = () @trusted {
|
||||
auto sigfd = () @trusted {
|
||||
sigset_t sset;
|
||||
sigemptyset(&sset);
|
||||
sigaddset(&sset, sig);
|
||||
|
||||
if (sigprocmask(SIG_BLOCK, &sset, null) != 0)
|
||||
return SignalListenID.invalid;
|
||||
return -1;
|
||||
|
||||
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC));
|
||||
return signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC);
|
||||
} ();
|
||||
|
||||
|
||||
m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none, SignalSlot(on_signal));
|
||||
auto fd = m_loop.initFD!SignalListenID(sigfd, is_internal ? FDFlags.internal : FDFlags.none, SignalSlot(on_signal));
|
||||
m_loop.registerFD(cast(FD)fd, EventMask.read);
|
||||
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
|
||||
|
||||
|
@ -47,14 +47,24 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
|
|||
return fd;
|
||||
}
|
||||
|
||||
override bool isValid(SignalListenID handle)
|
||||
const {
|
||||
if (handle.value >= m_loop.m_fds.length) return false;
|
||||
return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
override void addRef(SignalListenID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return;
|
||||
|
||||
assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
|
||||
m_loop.m_fds[descriptor].common.refCount++;
|
||||
}
|
||||
|
||||
override bool releaseRef(SignalListenID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return true;
|
||||
|
||||
FD fd = cast(FD)descriptor;
|
||||
nogc_assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
||||
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference
|
||||
|
@ -104,6 +114,11 @@ final class DummyEventDriverSignals(Loop : PosixEventLoop) : EventDriverSignals
|
|||
assert(false);
|
||||
}
|
||||
|
||||
override bool isValid(SignalListenID handle)
|
||||
const {
|
||||
return false;
|
||||
}
|
||||
|
||||
override void addRef(SignalListenID descriptor)
|
||||
{
|
||||
assert(false);
|
||||
|
|
|
@ -135,21 +135,17 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
return StreamSocketFD.invalid;
|
||||
}
|
||||
|
||||
auto sock = cast(StreamSocketFD)sockfd;
|
||||
|
||||
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; }
|
||||
|
||||
int bret;
|
||||
if (bind_address !is null)
|
||||
() @trusted { bret = bind(cast(sock_t)sock, bind_address.name, bind_address.nameLen); } ();
|
||||
() @trusted { bret = bind(sockfd, bind_address.name, bind_address.nameLen); } ();
|
||||
|
||||
if (bret != 0) {
|
||||
invalidateSocket();
|
||||
on_connect(sock, ConnectStatus.bindFailure);
|
||||
return sock;
|
||||
closeSocket(sockfd);
|
||||
on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure);
|
||||
return StreamSocketFD.invalid;
|
||||
}
|
||||
|
||||
m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init);
|
||||
auto sock = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init);
|
||||
m_loop.registerFD(sock, EventMask.read|EventMask.write);
|
||||
|
||||
auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
|
||||
|
@ -167,7 +163,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
} else {
|
||||
m_loop.unregisterFD(sock, EventMask.read|EventMask.write);
|
||||
m_loop.clearFD!StreamSocketSlot(sock);
|
||||
invalidateSocket();
|
||||
closeSocket(sockfd);
|
||||
on_connect(StreamSocketFD.invalid, determineConnectStatus(err));
|
||||
return StreamSocketFD.invalid;
|
||||
}
|
||||
|
@ -178,7 +174,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void cancelConnectStream(StreamSocketFD sock)
|
||||
{
|
||||
assert(sock != StreamSocketFD.invalid, "Invalid socket descriptor");
|
||||
if (!isValid(sock)) return;
|
||||
|
||||
with (m_loop.m_fds[sock].streamSocket)
|
||||
{
|
||||
assert(state == ConnectionState.connecting,
|
||||
|
@ -191,11 +188,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override StreamSocketFD adoptStream(int socket)
|
||||
{
|
||||
auto fd = StreamSocketFD(socket);
|
||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||
if (m_loop.m_fds[socket].common.refCount) // FD already in use?
|
||||
return StreamSocketFD.invalid;
|
||||
setSocketNonBlocking(fd);
|
||||
m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
|
||||
setSocketNonBlocking(socket);
|
||||
auto fd = m_loop.initFD!StreamSocketFD(socket, FDFlags.none, StreamSocketSlot.init);
|
||||
m_loop.registerFD(fd, EventMask.read|EventMask.write);
|
||||
return fd;
|
||||
}
|
||||
|
@ -240,39 +236,35 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
auto sockfd = createSocket(address.addressFamily, SOCK_STREAM);
|
||||
if (sockfd == -1) return StreamListenSocketFD.invalid;
|
||||
|
||||
auto sock = cast(StreamListenSocketFD)sockfd;
|
||||
|
||||
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sockfd); sock = StreamSocketFD.invalid; }
|
||||
|
||||
() @trusted {
|
||||
auto succ = () @trusted {
|
||||
int tmp_reuse = 1;
|
||||
// FIXME: error handling!
|
||||
if (options & StreamListenOptions.reuseAddress) {
|
||||
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
|
||||
invalidateSocket();
|
||||
return;
|
||||
}
|
||||
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0)
|
||||
return false;
|
||||
}
|
||||
version (Windows) {} else {
|
||||
if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) {
|
||||
invalidateSocket();
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (bind(sockfd, address.name, address.nameLen) != 0) {
|
||||
invalidateSocket();
|
||||
return;
|
||||
}
|
||||
if (listen(sockfd, getBacklogSize()) != 0) {
|
||||
invalidateSocket();
|
||||
return;
|
||||
|
||||
version (Windows) {}
|
||||
else {
|
||||
if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (bind(sockfd, address.name, address.nameLen) != 0)
|
||||
return false;
|
||||
|
||||
if (listen(sockfd, getBacklogSize()) != 0)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
} ();
|
||||
|
||||
if (sock == StreamListenSocketFD.invalid)
|
||||
return sock;
|
||||
if (!succ) {
|
||||
closeSocket(sockfd);
|
||||
return StreamListenSocketFD.invalid;
|
||||
}
|
||||
|
||||
m_loop.initFD(sock, FDFlags.none, StreamListenSocketSlot.init);
|
||||
auto sock = m_loop.initFD!StreamListenSocketFD(sockfd, FDFlags.none, StreamListenSocketSlot.init);
|
||||
|
||||
if (on_accept) waitForConnections(sock, on_accept);
|
||||
|
||||
|
@ -281,6 +273,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
|
||||
{
|
||||
if (!isValid(sock)) return;
|
||||
|
||||
m_loop.registerFD(sock, EventMask.read, false);
|
||||
m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
|
||||
m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept);
|
||||
|
@ -298,10 +292,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
} else {
|
||||
() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
|
||||
if (sockfd == -1) return;
|
||||
setSocketNonBlocking(cast(SocketFD)sockfd, true);
|
||||
setSocketNonBlocking(sockfd, true);
|
||||
}
|
||||
auto fd = cast(StreamSocketFD)sockfd;
|
||||
m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
|
||||
auto fd = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init);
|
||||
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
|
||||
m_loop.registerFD(fd, EventMask.read|EventMask.write);
|
||||
//print("accept %d", sockfd);
|
||||
|
@ -311,11 +304,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
ConnectionState getConnectionState(StreamSocketFD sock)
|
||||
{
|
||||
if (!isValid(sock)) return ConnectionState.closed;
|
||||
return m_loop.m_fds[sock].streamSocket.state;
|
||||
}
|
||||
|
||||
final override bool getLocalAddress(SocketFD sock, scope RefAddress dst)
|
||||
{
|
||||
if (!isValid(sock)) return false;
|
||||
|
||||
socklen_t addr_len = dst.nameLen;
|
||||
if (() @trusted { return getsockname(cast(sock_t)sock, dst.name, &addr_len); } () != 0)
|
||||
return false;
|
||||
|
@ -325,6 +321,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override bool getRemoteAddress(SocketFD sock, scope RefAddress dst)
|
||||
{
|
||||
if (!isValid(sock)) return false;
|
||||
|
||||
socklen_t addr_len = dst.nameLen;
|
||||
if (() @trusted { return getpeername(cast(sock_t)sock, dst.name, &addr_len); } () != 0)
|
||||
return false;
|
||||
|
@ -334,12 +332,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
int opt = enable;
|
||||
() @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
|
||||
}
|
||||
|
||||
override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
int opt = enable;
|
||||
int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof);
|
||||
if (err != 0)
|
||||
|
@ -348,6 +350,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
// dunnno about BSD\OSX, maybe someone should fix it for them later
|
||||
version (linux) {
|
||||
setKeepAlive(socket, true);
|
||||
|
@ -371,6 +375,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
version (linux) {
|
||||
uint tmsecs = cast(uint) timeout.total!"msecs";
|
||||
int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof);
|
||||
|
@ -381,6 +387,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
|
||||
{
|
||||
if (!isValid(socket)) {
|
||||
on_read_finish(socket, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
/*if (buffer.length == 0) {
|
||||
on_read_finish(socket, IOStatus.ok, 0);
|
||||
return;
|
||||
|
@ -435,6 +446,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
override void cancelRead(StreamSocketFD socket)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||
with (m_loop.m_fds[socket].streamSocket) {
|
||||
|
@ -511,6 +524,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
|
||||
{
|
||||
if (!isValid(socket)) {
|
||||
on_write_finish(socket, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (buffer.length == 0) {
|
||||
on_write_finish(socket, IOStatus.ok, 0);
|
||||
return;
|
||||
|
@ -560,6 +578,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
override void cancelWrite(StreamSocketFD socket)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||
m_loop.m_fds[socket].streamSocket.writeBuffer = null;
|
||||
|
@ -616,6 +636,11 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
|
||||
{
|
||||
if (!isValid(socket)) {
|
||||
on_data_available(socket, IOStatus.invalidHandle, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
sizediff_t ret;
|
||||
ubyte dummy;
|
||||
() @trusted { ret = recv(cast(sock_t)socket, &dummy, 1, MSG_PEEK); } ();
|
||||
|
@ -674,6 +699,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
auto st = m_loop.m_fds[socket].streamSocket.state;
|
||||
() @trusted { .shutdown(cast(sock_t)socket, shut_read ? shut_write ? SHUT_RDWR : SHUT_RD : shut_write ? SHUT_WR : 0); } ();
|
||||
if (st == ConnectionState.passiveClose) shut_read = true;
|
||||
|
@ -690,7 +717,6 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
{
|
||||
auto sockfd = createSocket(bind_address.addressFamily, SOCK_DGRAM);
|
||||
if (sockfd == -1) return DatagramSocketFD.invalid;
|
||||
auto sock = cast(DatagramSocketFD)sockfd;
|
||||
|
||||
if (bind_address && () @trusted { return bind(sockfd, bind_address.name, bind_address.nameLen); } () != 0) {
|
||||
closeSocket(sockfd);
|
||||
|
@ -718,9 +744,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
}
|
||||
}
|
||||
|
||||
m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
|
||||
auto flags = is_internal ? FDFlags.internal : FDFlags.none;
|
||||
auto sock = m_loop.initFD!DatagramSocketFD(sockfd, flags, DgramSocketSlot.init);
|
||||
m_loop.registerFD(sock, EventMask.read|EventMask.write);
|
||||
|
||||
return sock;
|
||||
}
|
||||
|
||||
|
@ -731,28 +757,34 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false)
|
||||
@nogc {
|
||||
auto fd = DatagramSocketFD(socket);
|
||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||
if (m_loop.m_fds[socket].common.refCount) // FD already in use?
|
||||
return DatagramSocketFD.init;
|
||||
setSocketNonBlocking(fd, close_on_exec);
|
||||
m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
|
||||
setSocketNonBlocking(socket, close_on_exec);
|
||||
auto flags = is_internal ? FDFlags.internal : FDFlags.none;
|
||||
auto fd = m_loop.initFD!DatagramSocketFD(socket, flags, DgramSocketSlot.init);
|
||||
m_loop.registerFD(fd, EventMask.read|EventMask.write);
|
||||
return fd;
|
||||
}
|
||||
|
||||
final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
() @trusted { connect(cast(sock_t)socket, target_address.name, target_address.nameLen); } ();
|
||||
}
|
||||
|
||||
final override bool setBroadcast(DatagramSocketFD socket, bool enable)
|
||||
{
|
||||
if (!isValid(socket)) return false;
|
||||
|
||||
int tmp_broad = enable;
|
||||
return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
|
||||
}
|
||||
|
||||
final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0)
|
||||
{
|
||||
if (!isValid(socket)) return false;
|
||||
|
||||
switch (multicast_address.addressFamily) {
|
||||
default: assert(false, "Multicast only supported for IPv4/IPv6 sockets.");
|
||||
case AddressFamily.INET:
|
||||
|
@ -784,6 +816,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
@safe {
|
||||
assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
|
||||
|
||||
if (!isValid(socket)) {
|
||||
RefAddress addr;
|
||||
on_receive_finish(socket, IOStatus.invalidHandle, 0, addr);
|
||||
return;
|
||||
}
|
||||
|
||||
sizediff_t ret;
|
||||
sockaddr_storage src_addr;
|
||||
socklen_t src_addr_len = src_addr.sizeof;
|
||||
|
@ -826,6 +864,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
void cancelReceive(DatagramSocketFD socket)
|
||||
@nogc {
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||
m_loop.m_fds[socket].datagramSocket.readBuffer = null;
|
||||
|
@ -861,6 +901,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
{
|
||||
assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
|
||||
|
||||
if (!isValid(socket)) {
|
||||
RefAddress addr;
|
||||
on_send_finish(socket, IOStatus.invalidHandle, 0, addr);
|
||||
return;
|
||||
}
|
||||
|
||||
sizediff_t ret;
|
||||
if (target_address) {
|
||||
() @trusted { ret = .sendto(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS, target_address.name, target_address.nameLen); } ();
|
||||
|
@ -897,6 +943,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
void cancelSend(DatagramSocketFD socket)
|
||||
{
|
||||
if (!isValid(socket)) return;
|
||||
|
||||
assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
|
||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||
m_loop.m_fds[socket].datagramSocket.writeBuffer = null;
|
||||
|
@ -929,8 +977,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
|
||||
}
|
||||
|
||||
final override bool isValid(SocketFD handle)
|
||||
const {
|
||||
if (handle.value > m_loop.m_fds.length) return false;
|
||||
return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
final override void addRef(SocketFD fd)
|
||||
{
|
||||
if (!isValid(fd)) return;
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
|
||||
assert(slot.common.refCount > 0, "Adding reference to unreferenced socket FD.");
|
||||
slot.common.refCount++;
|
||||
|
@ -939,6 +995,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
final override bool releaseRef(SocketFD fd)
|
||||
@nogc {
|
||||
import taggedalgebraic : hasType;
|
||||
|
||||
if (!isValid(fd)) return true;
|
||||
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
|
||||
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");
|
||||
// listening sockets have an incremented the reference count because of setNotifyCallback
|
||||
|
@ -966,6 +1025,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable)
|
||||
{
|
||||
if (!isValid(socket)) return false;
|
||||
|
||||
int proto, opt;
|
||||
final switch (option) {
|
||||
case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break;
|
||||
|
@ -977,6 +1038,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable)
|
||||
{
|
||||
if (!isValid(socket)) return false;
|
||||
|
||||
int proto, opt;
|
||||
final switch (option) {
|
||||
case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break;
|
||||
|
@ -988,16 +1051,19 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
|
||||
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
|
@ -1010,7 +1076,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
|||
} else {
|
||||
() @trusted { sock = socket(family, type, 0); } ();
|
||||
if (sock == -1) return -1;
|
||||
setSocketNonBlocking(cast(SocketFD)sock, true);
|
||||
setSocketNonBlocking(sock, true);
|
||||
|
||||
// Prevent SIGPIPE on failed send
|
||||
version (OSX) {
|
||||
|
@ -1078,7 +1144,7 @@ private void closeSocket(sock_t sockfd)
|
|||
else close(sockfd);
|
||||
}
|
||||
|
||||
private void setSocketNonBlocking(SocketFD sockfd, bool close_on_exec = false)
|
||||
private void setSocketNonBlocking(SocketFD.BaseType sockfd, bool close_on_exec = false)
|
||||
@nogc nothrow {
|
||||
version (Windows) {
|
||||
uint enable = 1;
|
||||
|
|
|
@ -38,7 +38,9 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
|
|||
auto handle = () @trusted { return inotify_init1(IN_NONBLOCK | IN_CLOEXEC); } ();
|
||||
if (handle == -1) return WatcherID.invalid;
|
||||
|
||||
auto ret = WatcherID(handle);
|
||||
auto ret = m_loop.initFD!WatcherID(handle, FDFlags.none, WatcherSlot(callback));
|
||||
m_loop.registerFD(cast(FD)ret, EventMask.read);
|
||||
m_loop.setNotifyCallback!(EventType.read)(cast(FD)ret, &onChanges);
|
||||
|
||||
m_watches[ret] = WatchState(null, path, recursive);
|
||||
|
||||
|
@ -46,23 +48,28 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
|
|||
if (recursive)
|
||||
addSubWatches(ret, path, "");
|
||||
|
||||
m_loop.initFD(FD(handle), FDFlags.none, WatcherSlot(callback));
|
||||
m_loop.registerFD(FD(handle), EventMask.read);
|
||||
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
|
||||
|
||||
processEvents(WatcherID(handle));
|
||||
processEvents(ret);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
final override bool isValid(WatcherID handle)
|
||||
const {
|
||||
if (handle.value >= m_loop.m_fds.length) return false;
|
||||
return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
|
||||
}
|
||||
|
||||
final override void addRef(WatcherID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return;
|
||||
assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
|
||||
m_loop.m_fds[descriptor].common.refCount++;
|
||||
}
|
||||
|
||||
final override bool releaseRef(WatcherID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return true;
|
||||
|
||||
FD fd = cast(FD)descriptor;
|
||||
auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
|
||||
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced event FD.");
|
||||
|
@ -80,6 +87,7 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
|
|||
|
||||
final protected override void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
|
@ -204,13 +212,22 @@ final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDrive
|
|||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
final override bool isValid(WatcherID handle)
|
||||
const {
|
||||
return false;
|
||||
}
|
||||
|
||||
final override void addRef(WatcherID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return;
|
||||
|
||||
assert(false, "TODO!");
|
||||
}
|
||||
|
||||
final override bool releaseRef(WatcherID descriptor)
|
||||
{
|
||||
if (!isValid(descriptor)) return true;
|
||||
|
||||
/*FSEventStreamStop
|
||||
FSEventStreamUnscheduleFromRunLoop
|
||||
FSEventStreamInvalidate
|
||||
|
@ -220,6 +237,8 @@ final class FSEventsEventDriverWatchers(Events : EventDriverEvents) : EventDrive
|
|||
|
||||
final protected override void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
|
||||
@system {
|
||||
if (!isValid(descriptor)) return null;
|
||||
|
||||
return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
|
||||
}
|
||||
|
||||
|
@ -283,9 +302,15 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
|||
return cast(WatcherID)evt;
|
||||
}
|
||||
|
||||
final override bool isValid(WatcherID handle)
|
||||
const {
|
||||
return m_events.isValid(cast(EventID)handle);
|
||||
}
|
||||
|
||||
final override void addRef(WatcherID descriptor)
|
||||
{
|
||||
assert(descriptor != WatcherID.invalid);
|
||||
if (!isValid(descriptor)) return;
|
||||
|
||||
auto evt = cast(EventID)descriptor;
|
||||
auto pt = evt in m_pollers;
|
||||
assert(pt !is null);
|
||||
|
@ -294,7 +319,8 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
|||
|
||||
final override bool releaseRef(WatcherID descriptor)
|
||||
{
|
||||
nogc_assert(descriptor != WatcherID.invalid, "Invalid directory watcher ID released");
|
||||
if (!isValid(descriptor)) return true;
|
||||
|
||||
auto evt = cast(EventID)descriptor;
|
||||
auto pt = evt in m_pollers;
|
||||
nogc_assert(pt !is null, "Directory watcher polling thread does not exist");
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue