diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index 85c2415..e13372e 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -180,7 +180,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system { - FDSlot* fds = &m_loop.m_fds[descriptor]; + FDSlot* fds = &m_loop.m_fds[descriptor].common; assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, "Requesting user data with differing type (destructor)."); assert(size <= FDSlot.userData.length, "Requested user data is too large."); @@ -189,7 +189,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime initialize(fds.userData.ptr); fds.userDataDestructor = destroy; } - return m_loop.m_fds[descriptor].userData.ptr; + return m_loop.m_fds[descriptor].common.userData.ptr; } } @@ -222,6 +222,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets m_loop.initFD(sock); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); + m_loop.m_fds[sock].specific = StreamSocketSlot.init; auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } (); if (ret == 0) { @@ -229,7 +230,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } else { auto err = getSocketError(); if (err == EINPROGRESS) { - with (m_loop.m_fds[sock]) { + with (m_loop.m_fds[sock].streamSocket) { connectCallback = on_connect; } m_loop.startNotify!(EventType.write)(sock, &onConnect); @@ -248,13 +249,13 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets private void onConnect(FD sock) { m_loop.setNotifyCallback!(EventType.write)(sock, null); - m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); + m_loop.m_fds[sock].streamSocket.connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); } private void onConnectError(FD sock) { // FIXME: determine the correct kind of error! - m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); + m_loop.m_fds[sock].streamSocket.connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); } final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) @@ -283,6 +284,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return sock; m_loop.initFD(sock); + m_loop.m_fds[sock].specific = StreamListenSocketSlot.init; if (on_accept) waitForConnections(sock, on_accept); @@ -293,7 +295,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { log("wait for conn"); m_loop.registerFD(sock, EventMask.read); - m_loop.m_fds[sock].acceptCallback = on_accept; + m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; m_loop.startNotify!(EventType.read)(sock, &onAccept); onAccept(sock); } @@ -313,7 +315,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets m_loop.initFD(fd); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); //print("accept %d", sockfd); - m_loop.m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); + m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd); } } @@ -365,7 +367,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - with (m_loop.m_fds[socket]) { + with (m_loop.m_fds[socket].streamSocket) { readCallback = on_read_finish; readMode = mode; bytesRead = ret > 0 ? ret : 0; @@ -377,16 +379,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void cancelRead(StreamSocketFD socket) { - assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); - with (m_loop.m_fds[socket]) { + with (m_loop.m_fds[socket].streamSocket) { readBuffer = null; } } private void onSocketRead(FD fd) { - auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); auto socket = cast(StreamSocketFD)fd; void finalize()(IOStatus status) @@ -460,7 +462,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - with (m_loop.m_fds[socket]) { + with (m_loop.m_fds[socket].streamSocket) { writeCallback = on_write_finish; writeMode = mode; bytesWritten = ret > 0 ? ret : 0; @@ -472,14 +474,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets override void cancelWrite(StreamSocketFD socket) { - assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); + assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); m_loop.setNotifyCallback!(EventType.write)(socket, null); - m_loop.m_fds[socket].writeBuffer = null; + m_loop.m_fds[socket].streamSocket.writeBuffer = null; } private void onSocketWrite(FD fd) { - auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); auto socket = cast(StreamSocketFD)fd; sizediff_t ret; @@ -537,7 +539,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets return; } - with (m_loop.m_fds[socket]) { + with (m_loop.m_fds[socket].streamSocket) { readCallback = on_data_available; readMode = IOMode.once; bytesRead = 0; @@ -549,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets private void onSocketDataAvailable(FD fd) { - auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); auto socket = cast(StreamSocketFD)fd; void finalize()(IOStatus status) @@ -589,6 +591,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } m_loop.initFD(sock); + m_loop.m_fds[sock].specific = DgramSocketSlot.init; m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); return sock; @@ -616,8 +619,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (mode == IOMode.immediate) { on_receive_finish(socket, IOStatus.wouldBlock, 0, null); } else { - with (m_loop.m_fds[socket]) { - readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } (); + with (m_loop.m_fds[socket].datagramSocket) { + readCallback = on_receive_finish; readMode = mode; bytesRead = 0; readBuffer = buffer; @@ -633,14 +636,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void cancelReceive(DatagramSocketFD socket) { - assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); + assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress."); m_loop.setNotifyCallback!(EventType.read)(socket, null); - m_loop.m_fds[socket].readBuffer = null; + m_loop.m_fds[socket].datagramSocket.readBuffer = null; } private void onDgramRead(FD fd) @trusted { // DMD 2.072.0-b2: scope considered unsafe - auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } (); auto socket = cast(DatagramSocketFD)fd; sizediff_t ret; @@ -652,7 +655,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets auto err = getSocketError(); if (err != EAGAIN) { m_loop.setNotifyCallback!(EventType.read)(socket, null); - () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null); + slot.readCallback(socket, IOStatus.error, 0, null); return; } } @@ -668,7 +671,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets sizediff_t ret; if (target_address) { () @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } (); - m_loop.m_fds[socket].targetAddr = target_address; + m_loop.m_fds[socket].datagramSocket.targetAddr = target_address; } else { () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); } @@ -684,8 +687,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets if (mode == IOMode.immediate) { on_send_finish(socket, IOStatus.wouldBlock, 0, null); } else { - with (m_loop.m_fds[socket]) { - writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } (); + with (m_loop.m_fds[socket].datagramSocket) { + writeCallback = on_send_finish; writeMode = mode; bytesWritten = 0; writeBuffer = buffer; @@ -701,14 +704,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets void cancelSend(DatagramSocketFD socket) { - assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); + assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); m_loop.setNotifyCallback!(EventType.write)(socket, null); - m_loop.m_fds[socket].writeBuffer = null; + m_loop.m_fds[socket].datagramSocket.writeBuffer = null; } private void onDgramWrite(FD fd) { - auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); + auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } (); auto socket = cast(DatagramSocketFD)fd; sizediff_t ret; @@ -733,16 +736,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void addRef(SocketFD fd) { - auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); - assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD."); - m_loop.m_fds[fd].refCount++; + assert(m_loop.m_fds[fd].common.refCount > 0, "Adding reference to unreferenced socket FD."); + m_loop.m_fds[fd].common.refCount++; } final override bool releaseRef(SocketFD fd) { - auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); - assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD."); - if (--m_loop.m_fds[fd].refCount == 0) { + assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD."); + if (--m_loop.m_fds[fd].common.refCount == 0) { m_loop.unregisterFD(fd); m_loop.clearFD(fd); closeSocket(fd); @@ -1015,7 +1016,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { version (linux) { auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); m_loop.initFD(id); - m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation + m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation m_loop.registerFD(id, EventMask.read); m_loop.startNotify!(EventType.read)(id, &onEvent); return id; @@ -1027,13 +1028,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent."); if (notify_all) { //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); - foreach (w; m_loop.m_fds[event].waiters.consume) { + foreach (w; m_loop.m_fds[event].event.waiters.consume) { //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); w(event); } } else { - if (!m_loop.m_fds[event].waiters.empty) - m_loop.m_fds[event].waiters.consumeOne(); + if (!m_loop.m_fds[event].event.waiters.empty) + m_loop.m_fds[event].event.waiters.consumeOne(); } } @@ -1044,14 +1045,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); long one = 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.m_loop.m_fds[event].triggerAll, true); + if (notify_all) atomicStore(thisus.m_loop.m_fds[event].event.triggerAll, true); () @trusted { .write(event, &one, one.sizeof); } (); } final override void wait(EventID event, EventCallback on_event) { assert(event < m_loop.m_fds.length, "Invalid event ID passed to waitForEvent."); - return m_loop.m_fds[event].waiters.put(on_event); + return m_loop.m_fds[event].event.waiters.put(on_event); } final override void cancelWait(EventID event, EventCallback on_event) @@ -1059,28 +1060,34 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents { import std.algorithm.searching : countUntil; import std.algorithm.mutation : remove; - m_loop.m_fds[event].waiters.removePending(on_event); + m_loop.m_fds[event].event.waiters.removePending(on_event); } - private void onEvent(FD event) + private void onEvent(FD fd) @trusted { ulong cnt; + EventID event = cast(EventID)fd; () @trusted { .read(event, &cnt, cnt.sizeof); } (); import core.atomic : cas; - auto all = cas(&m_loop.m_fds[event].triggerAll, true, false); - trigger(cast(EventID)event, all); + auto all = cas(&m_loop.m_fds[event].event.triggerAll, true, false); + trigger(event, all); } final override void addRef(EventID descriptor) { - assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); - m_loop.m_fds[descriptor].refCount++; + assert(m_loop.m_fds[descriptor.value].common.refCount > 0, "Adding reference to unreferenced event FD."); + m_loop.m_fds[descriptor.value].common.refCount++; } final override bool releaseRef(EventID descriptor) { - assert(m_loop.m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[descriptor].refCount == 0) { + assert(m_loop.m_fds[descriptor].common.refCount > 0, "Releasing reference to unreferenced event FD."); + if (--m_loop.m_fds[descriptor].common.refCount == 0) { + () @trusted nothrow { + scope (failure) assert(false); + destroy(m_loop.m_fds[descriptor].event.waiters); + assert(m_loop.m_fds[descriptor].event.waiters is null); + } (); m_loop.unregisterFD(descriptor); m_loop.clearFD(descriptor); close(descriptor); @@ -1114,7 +1121,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna m_loop.initFD(cast(FD)fd); - m_loop.m_fds[fd].readCallback = () @trusted { return cast(IOCallback)on_signal; } (); // FIXME: avoid unsafe cast + m_loop.m_fds[fd].specific = SignalSlot(on_signal); m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); @@ -1125,15 +1132,15 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna override void addRef(SignalListenID descriptor) { - assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); - m_loop.m_fds[descriptor].refCount++; + assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD."); + m_loop.m_fds[descriptor].common.refCount++; } override bool releaseRef(SignalListenID descriptor) { FD fd = cast(FD)descriptor; - assert(m_loop.m_fds[fd].refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].refCount == 0) { + assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); + if (--m_loop.m_fds[fd].common.refCount == 0) { m_loop.unregisterFD(fd); m_loop.clearFD(fd); close(fd); @@ -1145,7 +1152,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna private void onSignal(FD fd) { SignalListenID lid = cast(SignalListenID)fd; - auto cb = () @trusted { return cast(SignalCallback)m_loop.m_fds[fd].readCallback; } (); + auto cb = m_loop.m_fds[fd].signal.callback; signalfd_siginfo nfo; do { auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } (); @@ -1158,7 +1165,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna addRef(lid); cb(lid, SignalStatus.ok, nfo.ssi_signo); releaseRef(lid); - } while (m_loop.m_fds[fd].refCount > 0); + } while (m_loop.m_fds[fd].common.refCount > 0); } } @@ -1218,7 +1225,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch m_loop.initFD(FD(handle)); m_loop.registerFD(FD(handle), EventMask.read); m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); - m_loop.m_fds[handle].readCallback = () @trusted { return cast(IOCallback)callback; } (); + m_loop.m_fds[handle].specific = WatcherSlot(callback); processEvents(WatcherID(handle)); @@ -1227,15 +1234,15 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch final override void addRef(WatcherID descriptor) { - assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); - m_loop.m_fds[descriptor].refCount++; + assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD."); + m_loop.m_fds[descriptor].common.refCount++; } final override bool releaseRef(WatcherID descriptor) { FD fd = cast(FD)descriptor; - assert(m_loop.m_fds[fd].refCount > 0, "Releasing reference to unreferenced event FD."); - if (--m_loop.m_fds[fd].refCount == 0) { + assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); + if (--m_loop.m_fds[fd].common.refCount == 0) { m_loop.unregisterFD(fd); m_loop.clearFD(fd); m_watches.remove(fd); @@ -1280,7 +1287,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.name = name; addRef(id); - auto cb = () @trusted { return cast(FileChangesCallback)m_loop.m_fds[id].readCallback; } (); + auto cb = m_loop.m_fds[ id].watcher.callback; cb(id, ch); if (!releaseRef(id)) break; @@ -1329,7 +1336,7 @@ package class PosixEventLoop { import core.time : Duration; package { - ChoppedVector!FDSlot m_fds; + AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds; size_t m_waiterCount = 0; } @@ -1349,15 +1356,15 @@ package class PosixEventLoop { final protected void notify(EventType evt)(FD fd) { //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); - if (m_fds[fd].callback[evt]) - m_fds[fd].callback[evt](fd); + if (m_fds[fd.value].common.callback[evt]) + m_fds[fd.value].common.callback[evt](fd); } final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) { // TODO: optimize! foreach (i; 0 .. cast(int)m_fds.length) - if (m_fds[i].callback[evt]) + if (m_fds[i].common.callback[evt]) del(cast(FD)i); } @@ -1366,7 +1373,7 @@ package class PosixEventLoop { //log("start notify %s %s", evt, fd); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); if (callback) setNotifyCallback!evt(fd, callback); - updateFD(fd, m_fds[fd].eventMask); + updateFD(fd, m_fds[fd.value].common.eventMask); } package void stopNotify(EventType evt)(FD fd) @@ -1374,38 +1381,34 @@ package class PosixEventLoop { //log("stop notify %s %s", evt, fd); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); if (m_fds[fd].callback) setNotifyCallback!evt(fd, null); - updateFD(fd, m_fds[fd].eventMask); + updateFD(fd, m_fds[fd.value].common.eventMask); } package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) { - assert((callback !is null) != (m_fds[fd].callback[evt] !is null), + assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), "Overwriting notification callback."); // ensure that the FD doesn't get closed before the callback gets called. if (callback !is null) { m_waiterCount++; - m_fds[fd].refCount++; + m_fds[fd.value].common.refCount++; } else { - m_fds[fd].refCount--; + m_fds[fd.value].common.refCount--; m_waiterCount--; } - m_fds[fd].callback[evt] = callback; + m_fds[fd.value].common.callback[evt] = callback; } package void initFD(FD fd) { - m_fds[fd].refCount = 1; + m_fds[fd.value].common.refCount = 1; } package void clearFD(FD fd) { - if (m_fds[fd].userDataDestructor) - () @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); - () @trusted nothrow { - scope (failure) assert(false); - destroy(m_fds[fd].waiters); - } (); - m_fds[fd] = FDSlot.init; + if (m_fds[fd.value].common.userDataDestructor) + () @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } (); + m_fds[fd.value].common = FDSlot.init; } } @@ -1416,9 +1419,23 @@ alias FDSlotCallback = void delegate(FD); private struct FDSlot { FDSlotCallback[EventType.max+1] callback; - uint refCount; + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; + + @property EventMask eventMask() const nothrow { + EventMask ret = cast(EventMask)0; + if (callback[EventType.read] !is null) ret |= EventMask.read; + if (callback[EventType.write] !is null) ret |= EventMask.write; + if (callback[EventType.status] !is null) ret |= EventMask.status; + return ret; + } +} + +private struct StreamSocketSlot { + alias Handle = StreamSocketFD; + size_t bytesRead; ubyte[] readBuffer; IOMode readMode; @@ -1428,24 +1445,49 @@ private struct FDSlot { const(ubyte)[] writeBuffer; IOMode writeMode; IOCallback writeCallback; // FIXME: this type only works for stream sockets - Address targetAddr; ConnectCallback connectCallback; +} + +private struct StreamListenSocketSlot { + alias Handle = StreamListenSocketFD; + AcceptCallback acceptCallback; +} + +private struct DgramSocketSlot { + alias Handle = DatagramSocketFD; + size_t bytesRead; + ubyte[] readBuffer; + IOMode readMode; + DatagramIOCallback readCallback; // FIXME: this type only works for stream sockets + + size_t bytesWritten; + const(ubyte)[] writeBuffer; + IOMode writeMode; + DatagramIOCallback writeCallback; // FIXME: this type only works for stream sockets + Address targetAddr; +} + +private struct DNSSlot { + alias Handle = DNSLookupID; + DNSLookupCallback callback; +} + +private struct WatcherSlot { + alias Handle = WatcherID; + FileChangesCallback callback; +} + +private struct EventSlot { + alias Handle = EventID; ConsumableQueue!EventCallback waiters; - - DataInitializer userDataDestructor; - ubyte[16*size_t.sizeof] userData; - shared bool triggerAll; +} - @property EventMask eventMask() const nothrow { - EventMask ret = cast(EventMask)0; - if (callback[EventType.read] !is null) ret |= EventMask.read; - if (callback[EventType.write] !is null) ret |= EventMask.write; - if (callback[EventType.status] !is null) ret |= EventMask.status; - return ret; - } +private struct SignalSlot { + alias Handle = SignalListenID; + SignalCallback callback; } enum EventType { @@ -1483,10 +1525,13 @@ private int getSocketError() } void log(ARGS...)(string fmt, ARGS args) -{ - import std.stdio; - try writefln(fmt, args); - catch (Exception) {} +@trusted { + import std.stdio : writef, writefln; + import core.thread : Thread; + try { + writef("[%s]: ", Thread.getThis().name); + writefln(fmt, args); + } catch (Exception) {} }