Make the FD map in the Posix event driver type safe.

This commit is contained in:
Sönke Ludwig 2016-10-24 00:15:42 +02:00
parent 2846637f95
commit 5450cda724

View file

@ -180,7 +180,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
FDSlot* fds = &m_loop.m_fds[descriptor]; FDSlot* fds = &m_loop.m_fds[descriptor].common;
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor)."); "Requesting user data with differing type (destructor).");
assert(size <= FDSlot.userData.length, "Requested user data is too large."); assert(size <= FDSlot.userData.length, "Requested user data is too large.");
@ -189,7 +189,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
initialize(fds.userData.ptr); initialize(fds.userData.ptr);
fds.userDataDestructor = destroy; fds.userDataDestructor = destroy;
} }
return m_loop.m_fds[descriptor].userData.ptr; return m_loop.m_fds[descriptor].common.userData.ptr;
} }
} }
@ -222,6 +222,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.initFD(sock); m_loop.initFD(sock);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } (); auto ret = () @trusted { return connect(sock, address.name, address.nameLen); } ();
if (ret == 0) { if (ret == 0) {
@ -229,7 +230,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} else { } else {
auto err = getSocketError(); auto err = getSocketError();
if (err == EINPROGRESS) { if (err == EINPROGRESS) {
with (m_loop.m_fds[sock]) { with (m_loop.m_fds[sock].streamSocket) {
connectCallback = on_connect; connectCallback = on_connect;
} }
m_loop.startNotify!(EventType.write)(sock, &onConnect); m_loop.startNotify!(EventType.write)(sock, &onConnect);
@ -248,13 +249,13 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
private void onConnect(FD sock) private void onConnect(FD sock)
{ {
m_loop.setNotifyCallback!(EventType.write)(sock, null); m_loop.setNotifyCallback!(EventType.write)(sock, null);
m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected); m_loop.m_fds[sock].streamSocket.connectCallback(cast(StreamSocketFD)sock, ConnectStatus.connected);
} }
private void onConnectError(FD sock) private void onConnectError(FD sock)
{ {
// FIXME: determine the correct kind of error! // FIXME: determine the correct kind of error!
m_loop.m_fds[sock].connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused); m_loop.m_fds[sock].streamSocket.connectCallback(cast(StreamSocketFD)sock, ConnectStatus.refused);
} }
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
@ -283,6 +284,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return sock; return sock;
m_loop.initFD(sock); m_loop.initFD(sock);
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
if (on_accept) waitForConnections(sock, on_accept); if (on_accept) waitForConnections(sock, on_accept);
@ -293,7 +295,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
{ {
log("wait for conn"); log("wait for conn");
m_loop.registerFD(sock, EventMask.read); m_loop.registerFD(sock, EventMask.read);
m_loop.m_fds[sock].acceptCallback = on_accept; m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
m_loop.startNotify!(EventType.read)(sock, &onAccept); m_loop.startNotify!(EventType.read)(sock, &onAccept);
onAccept(sock); onAccept(sock);
} }
@ -313,7 +315,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
m_loop.initFD(fd); m_loop.initFD(fd);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
//print("accept %d", sockfd); //print("accept %d", sockfd);
m_loop.m_fds[listenfd].acceptCallback(cast(StreamListenSocketFD)listenfd, fd); m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd);
} }
} }
@ -365,7 +367,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].streamSocket) {
readCallback = on_read_finish; readCallback = on_read_finish;
readMode = mode; readMode = mode;
bytesRead = ret > 0 ? ret : 0; bytesRead = ret > 0 ? ret : 0;
@ -377,16 +379,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
override void cancelRead(StreamSocketFD socket) override void cancelRead(StreamSocketFD socket)
{ {
assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].streamSocket) {
readBuffer = null; readBuffer = null;
} }
} }
private void onSocketRead(FD fd) private void onSocketRead(FD fd)
{ {
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status) void finalize()(IOStatus status)
@ -460,7 +462,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].streamSocket) {
writeCallback = on_write_finish; writeCallback = on_write_finish;
writeMode = mode; writeMode = mode;
bytesWritten = ret > 0 ? ret : 0; bytesWritten = ret > 0 ? ret : 0;
@ -472,14 +474,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
override void cancelWrite(StreamSocketFD socket) override void cancelWrite(StreamSocketFD socket)
{ {
assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_loop.m_fds[socket].writeBuffer = null; m_loop.m_fds[socket].streamSocket.writeBuffer = null;
} }
private void onSocketWrite(FD fd) private void onSocketWrite(FD fd)
{ {
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -537,7 +539,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return; return;
} }
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].streamSocket) {
readCallback = on_data_available; readCallback = on_data_available;
readMode = IOMode.once; readMode = IOMode.once;
bytesRead = 0; bytesRead = 0;
@ -549,7 +551,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
private void onSocketDataAvailable(FD fd) private void onSocketDataAvailable(FD fd)
{ {
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
auto socket = cast(StreamSocketFD)fd; auto socket = cast(StreamSocketFD)fd;
void finalize()(IOStatus status) void finalize()(IOStatus status)
@ -589,6 +591,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
m_loop.initFD(sock); m_loop.initFD(sock);
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
return sock; return sock;
@ -616,8 +619,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_receive_finish(socket, IOStatus.wouldBlock, 0, null); on_receive_finish(socket, IOStatus.wouldBlock, 0, null);
} else { } else {
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].datagramSocket) {
readCallback = () @trusted { return cast(IOCallback)on_receive_finish; } (); readCallback = on_receive_finish;
readMode = mode; readMode = mode;
bytesRead = 0; bytesRead = 0;
readBuffer = buffer; readBuffer = buffer;
@ -633,14 +636,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
void cancelReceive(DatagramSocketFD socket) void cancelReceive(DatagramSocketFD socket)
{ {
assert(m_loop.m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress."); assert(m_loop.m_fds[socket].datagramSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
m_loop.m_fds[socket].readBuffer = null; m_loop.m_fds[socket].datagramSocket.readBuffer = null;
} }
private void onDgramRead(FD fd) private void onDgramRead(FD fd)
@trusted { // DMD 2.072.0-b2: scope considered unsafe @trusted { // DMD 2.072.0-b2: scope considered unsafe
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
auto socket = cast(DatagramSocketFD)fd; auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -652,7 +655,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (err != EAGAIN) {
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.error, 0, null); slot.readCallback(socket, IOStatus.error, 0, null);
return; return;
} }
} }
@ -668,7 +671,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
sizediff_t ret; sizediff_t ret;
if (target_address) { if (target_address) {
() @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } (); () @trusted { ret = .sendto(socket, buffer.ptr, buffer.length, 0, target_address.name, target_address.nameLen); } ();
m_loop.m_fds[socket].targetAddr = target_address; m_loop.m_fds[socket].datagramSocket.targetAddr = target_address;
} else { } else {
() @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = .send(socket, buffer.ptr, buffer.length, 0); } ();
} }
@ -684,8 +687,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (mode == IOMode.immediate) { if (mode == IOMode.immediate) {
on_send_finish(socket, IOStatus.wouldBlock, 0, null); on_send_finish(socket, IOStatus.wouldBlock, 0, null);
} else { } else {
with (m_loop.m_fds[socket]) { with (m_loop.m_fds[socket].datagramSocket) {
writeCallback = () @trusted { return cast(IOCallback)on_send_finish; } (); writeCallback = on_send_finish;
writeMode = mode; writeMode = mode;
bytesWritten = 0; bytesWritten = 0;
writeBuffer = buffer; writeBuffer = buffer;
@ -701,14 +704,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
void cancelSend(DatagramSocketFD socket) void cancelSend(DatagramSocketFD socket)
{ {
assert(m_loop.m_fds[socket].writeCallback !is null, "Cancelling write when there is no write in progress."); assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
m_loop.m_fds[socket].writeBuffer = null; m_loop.m_fds[socket].datagramSocket.writeBuffer = null;
} }
private void onDgramWrite(FD fd) private void onDgramWrite(FD fd)
{ {
auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
auto socket = cast(DatagramSocketFD)fd; auto socket = cast(DatagramSocketFD)fd;
sizediff_t ret; sizediff_t ret;
@ -733,16 +736,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
final override void addRef(SocketFD fd) final override void addRef(SocketFD fd)
{ {
auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); assert(m_loop.m_fds[fd].common.refCount > 0, "Adding reference to unreferenced socket FD.");
assert(pfd.refCount > 0, "Adding reference to unreferenced socket FD."); m_loop.m_fds[fd].common.refCount++;
m_loop.m_fds[fd].refCount++;
} }
final override bool releaseRef(SocketFD fd) final override bool releaseRef(SocketFD fd)
{ {
auto pfd = () @trusted { return &m_loop.m_fds[fd]; } (); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced socket FD.");
assert(pfd.refCount > 0, "Releasing reference to unreferenced socket FD."); if (--m_loop.m_fds[fd].common.refCount == 0) {
if (--m_loop.m_fds[fd].refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd);
m_loop.clearFD(fd); m_loop.clearFD(fd);
closeSocket(fd); closeSocket(fd);
@ -1015,7 +1016,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
version (linux) { version (linux) {
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK); auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
m_loop.initFD(id); m_loop.initFD(id);
m_loop.m_fds[id].waiters = new ConsumableQueue!EventCallback; // FIXME: avoid dynamic memory allocation m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback); // FIXME: avoid dynamic memory allocation
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.startNotify!(EventType.read)(id, &onEvent); m_loop.startNotify!(EventType.read)(id, &onEvent);
return id; return id;
@ -1027,13 +1028,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent."); assert(event < m_loop.m_fds.length, "Invalid event ID passed to triggerEvent.");
if (notify_all) { if (notify_all) {
//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
foreach (w; m_loop.m_fds[event].waiters.consume) { foreach (w; m_loop.m_fds[event].event.waiters.consume) {
//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
w(event); w(event);
} }
} else { } else {
if (!m_loop.m_fds[event].waiters.empty) if (!m_loop.m_fds[event].event.waiters.empty)
m_loop.m_fds[event].waiters.consumeOne(); m_loop.m_fds[event].event.waiters.consumeOne();
} }
} }
@ -1044,14 +1045,14 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
long one = 1; long one = 1;
//log("emitting for all threads"); //log("emitting for all threads");
if (notify_all) atomicStore(thisus.m_loop.m_fds[event].triggerAll, true); if (notify_all) atomicStore(thisus.m_loop.m_fds[event].event.triggerAll, true);
() @trusted { .write(event, &one, one.sizeof); } (); () @trusted { .write(event, &one, one.sizeof); } ();
} }
final override void wait(EventID event, EventCallback on_event) final override void wait(EventID event, EventCallback on_event)
{ {
assert(event < m_loop.m_fds.length, "Invalid event ID passed to waitForEvent."); assert(event < m_loop.m_fds.length, "Invalid event ID passed to waitForEvent.");
return m_loop.m_fds[event].waiters.put(on_event); return m_loop.m_fds[event].event.waiters.put(on_event);
} }
final override void cancelWait(EventID event, EventCallback on_event) final override void cancelWait(EventID event, EventCallback on_event)
@ -1059,28 +1060,34 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop) : EventDriverEvents {
import std.algorithm.searching : countUntil; import std.algorithm.searching : countUntil;
import std.algorithm.mutation : remove; import std.algorithm.mutation : remove;
m_loop.m_fds[event].waiters.removePending(on_event); m_loop.m_fds[event].event.waiters.removePending(on_event);
} }
private void onEvent(FD event) private void onEvent(FD fd)
@trusted { @trusted {
ulong cnt; ulong cnt;
EventID event = cast(EventID)fd;
() @trusted { .read(event, &cnt, cnt.sizeof); } (); () @trusted { .read(event, &cnt, cnt.sizeof); } ();
import core.atomic : cas; import core.atomic : cas;
auto all = cas(&m_loop.m_fds[event].triggerAll, true, false); auto all = cas(&m_loop.m_fds[event].event.triggerAll, true, false);
trigger(cast(EventID)event, all); trigger(event, all);
} }
final override void addRef(EventID descriptor) final override void addRef(EventID descriptor)
{ {
assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor.value].common.refCount > 0, "Adding reference to unreferenced event FD.");
m_loop.m_fds[descriptor].refCount++; m_loop.m_fds[descriptor.value].common.refCount++;
} }
final override bool releaseRef(EventID descriptor) final override bool releaseRef(EventID descriptor)
{ {
assert(m_loop.m_fds[descriptor].refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[descriptor].refCount == 0) { if (--m_loop.m_fds[descriptor].common.refCount == 0) {
() @trusted nothrow {
scope (failure) assert(false);
destroy(m_loop.m_fds[descriptor].event.waiters);
assert(m_loop.m_fds[descriptor].event.waiters is null);
} ();
m_loop.unregisterFD(descriptor); m_loop.unregisterFD(descriptor);
m_loop.clearFD(descriptor); m_loop.clearFD(descriptor);
close(descriptor); close(descriptor);
@ -1114,7 +1121,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
m_loop.initFD(cast(FD)fd); m_loop.initFD(cast(FD)fd);
m_loop.m_fds[fd].readCallback = () @trusted { return cast(IOCallback)on_signal; } (); // FIXME: avoid unsafe cast m_loop.m_fds[fd].specific = SignalSlot(on_signal);
m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.registerFD(cast(FD)fd, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
@ -1125,15 +1132,15 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
override void addRef(SignalListenID descriptor) override void addRef(SignalListenID descriptor)
{ {
assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
m_loop.m_fds[descriptor].refCount++; m_loop.m_fds[descriptor].common.refCount++;
} }
override bool releaseRef(SignalListenID descriptor) override bool releaseRef(SignalListenID descriptor)
{ {
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].refCount == 0) { if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd);
m_loop.clearFD(fd); m_loop.clearFD(fd);
close(fd); close(fd);
@ -1145,7 +1152,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
private void onSignal(FD fd) private void onSignal(FD fd)
{ {
SignalListenID lid = cast(SignalListenID)fd; SignalListenID lid = cast(SignalListenID)fd;
auto cb = () @trusted { return cast(SignalCallback)m_loop.m_fds[fd].readCallback; } (); auto cb = m_loop.m_fds[fd].signal.callback;
signalfd_siginfo nfo; signalfd_siginfo nfo;
do { do {
auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } (); auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } ();
@ -1158,7 +1165,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
addRef(lid); addRef(lid);
cb(lid, SignalStatus.ok, nfo.ssi_signo); cb(lid, SignalStatus.ok, nfo.ssi_signo);
releaseRef(lid); releaseRef(lid);
} while (m_loop.m_fds[fd].refCount > 0); } while (m_loop.m_fds[fd].common.refCount > 0);
} }
} }
@ -1218,7 +1225,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
m_loop.initFD(FD(handle)); m_loop.initFD(FD(handle));
m_loop.registerFD(FD(handle), EventMask.read); m_loop.registerFD(FD(handle), EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
m_loop.m_fds[handle].readCallback = () @trusted { return cast(IOCallback)callback; } (); m_loop.m_fds[handle].specific = WatcherSlot(callback);
processEvents(WatcherID(handle)); processEvents(WatcherID(handle));
@ -1227,15 +1234,15 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
final override void addRef(WatcherID descriptor) final override void addRef(WatcherID descriptor)
{ {
assert(m_loop.m_fds[descriptor].refCount > 0, "Adding reference to unreferenced event FD."); assert(m_loop.m_fds[descriptor].common.refCount > 0, "Adding reference to unreferenced event FD.");
m_loop.m_fds[descriptor].refCount++; m_loop.m_fds[descriptor].common.refCount++;
} }
final override bool releaseRef(WatcherID descriptor) final override bool releaseRef(WatcherID descriptor)
{ {
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
assert(m_loop.m_fds[fd].refCount > 0, "Releasing reference to unreferenced event FD."); assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].refCount == 0) { if (--m_loop.m_fds[fd].common.refCount == 0) {
m_loop.unregisterFD(fd); m_loop.unregisterFD(fd);
m_loop.clearFD(fd); m_loop.clearFD(fd);
m_watches.remove(fd); m_watches.remove(fd);
@ -1280,7 +1287,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
ch.isDirectory = (ev.mask & IN_ISDIR) != 0; ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
ch.name = name; ch.name = name;
addRef(id); addRef(id);
auto cb = () @trusted { return cast(FileChangesCallback)m_loop.m_fds[id].readCallback; } (); auto cb = m_loop.m_fds[ id].watcher.callback;
cb(id, ch); cb(id, ch);
if (!releaseRef(id)) break; if (!releaseRef(id)) break;
@ -1329,7 +1336,7 @@ package class PosixEventLoop {
import core.time : Duration; import core.time : Duration;
package { package {
ChoppedVector!FDSlot m_fds; AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot) m_fds;
size_t m_waiterCount = 0; size_t m_waiterCount = 0;
} }
@ -1349,15 +1356,15 @@ package class PosixEventLoop {
final protected void notify(EventType evt)(FD fd) final protected void notify(EventType evt)(FD fd)
{ {
//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
if (m_fds[fd].callback[evt]) if (m_fds[fd.value].common.callback[evt])
m_fds[fd].callback[evt](fd); m_fds[fd.value].common.callback[evt](fd);
} }
final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
{ {
// TODO: optimize! // TODO: optimize!
foreach (i; 0 .. cast(int)m_fds.length) foreach (i; 0 .. cast(int)m_fds.length)
if (m_fds[i].callback[evt]) if (m_fds[i].common.callback[evt])
del(cast(FD)i); del(cast(FD)i);
} }
@ -1366,7 +1373,7 @@ package class PosixEventLoop {
//log("start notify %s %s", evt, fd); //log("start notify %s %s", evt, fd);
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
if (callback) setNotifyCallback!evt(fd, callback); if (callback) setNotifyCallback!evt(fd, callback);
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd.value].common.eventMask);
} }
package void stopNotify(EventType evt)(FD fd) package void stopNotify(EventType evt)(FD fd)
@ -1374,38 +1381,34 @@ package class PosixEventLoop {
//log("stop notify %s %s", evt, fd); //log("stop notify %s %s", evt, fd);
//ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
if (m_fds[fd].callback) setNotifyCallback!evt(fd, null); if (m_fds[fd].callback) setNotifyCallback!evt(fd, null);
updateFD(fd, m_fds[fd].eventMask); updateFD(fd, m_fds[fd.value].common.eventMask);
} }
package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert((callback !is null) != (m_fds[fd].callback[evt] !is null), assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
"Overwriting notification callback."); "Overwriting notification callback.");
// ensure that the FD doesn't get closed before the callback gets called. // ensure that the FD doesn't get closed before the callback gets called.
if (callback !is null) { if (callback !is null) {
m_waiterCount++; m_waiterCount++;
m_fds[fd].refCount++; m_fds[fd.value].common.refCount++;
} else { } else {
m_fds[fd].refCount--; m_fds[fd.value].common.refCount--;
m_waiterCount--; m_waiterCount--;
} }
m_fds[fd].callback[evt] = callback; m_fds[fd.value].common.callback[evt] = callback;
} }
package void initFD(FD fd) package void initFD(FD fd)
{ {
m_fds[fd].refCount = 1; m_fds[fd.value].common.refCount = 1;
} }
package void clearFD(FD fd) package void clearFD(FD fd)
{ {
if (m_fds[fd].userDataDestructor) if (m_fds[fd.value].common.userDataDestructor)
() @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } (); () @trusted { m_fds[fd.value].common.userDataDestructor(m_fds[fd.value].common.userData.ptr); } ();
() @trusted nothrow { m_fds[fd.value].common = FDSlot.init;
scope (failure) assert(false);
destroy(m_fds[fd].waiters);
} ();
m_fds[fd] = FDSlot.init;
} }
} }
@ -1416,9 +1419,23 @@ alias FDSlotCallback = void delegate(FD);
private struct FDSlot { private struct FDSlot {
FDSlotCallback[EventType.max+1] callback; FDSlotCallback[EventType.max+1] callback;
uint refCount; uint refCount;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
@property EventMask eventMask() const nothrow {
EventMask ret = cast(EventMask)0;
if (callback[EventType.read] !is null) ret |= EventMask.read;
if (callback[EventType.write] !is null) ret |= EventMask.write;
if (callback[EventType.status] !is null) ret |= EventMask.status;
return ret;
}
}
private struct StreamSocketSlot {
alias Handle = StreamSocketFD;
size_t bytesRead; size_t bytesRead;
ubyte[] readBuffer; ubyte[] readBuffer;
IOMode readMode; IOMode readMode;
@ -1428,24 +1445,49 @@ private struct FDSlot {
const(ubyte)[] writeBuffer; const(ubyte)[] writeBuffer;
IOMode writeMode; IOMode writeMode;
IOCallback writeCallback; // FIXME: this type only works for stream sockets IOCallback writeCallback; // FIXME: this type only works for stream sockets
Address targetAddr;
ConnectCallback connectCallback; ConnectCallback connectCallback;
}
private struct StreamListenSocketSlot {
alias Handle = StreamListenSocketFD;
AcceptCallback acceptCallback; AcceptCallback acceptCallback;
}
private struct DgramSocketSlot {
alias Handle = DatagramSocketFD;
size_t bytesRead;
ubyte[] readBuffer;
IOMode readMode;
DatagramIOCallback readCallback; // FIXME: this type only works for stream sockets
size_t bytesWritten;
const(ubyte)[] writeBuffer;
IOMode writeMode;
DatagramIOCallback writeCallback; // FIXME: this type only works for stream sockets
Address targetAddr;
}
private struct DNSSlot {
alias Handle = DNSLookupID;
DNSLookupCallback callback;
}
private struct WatcherSlot {
alias Handle = WatcherID;
FileChangesCallback callback;
}
private struct EventSlot {
alias Handle = EventID;
ConsumableQueue!EventCallback waiters; ConsumableQueue!EventCallback waiters;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
shared bool triggerAll; shared bool triggerAll;
}
@property EventMask eventMask() const nothrow { private struct SignalSlot {
EventMask ret = cast(EventMask)0; alias Handle = SignalListenID;
if (callback[EventType.read] !is null) ret |= EventMask.read; SignalCallback callback;
if (callback[EventType.write] !is null) ret |= EventMask.write;
if (callback[EventType.status] !is null) ret |= EventMask.status;
return ret;
}
} }
enum EventType { enum EventType {
@ -1483,10 +1525,13 @@ private int getSocketError()
} }
void log(ARGS...)(string fmt, ARGS args) void log(ARGS...)(string fmt, ARGS args)
{ @trusted {
import std.stdio; import std.stdio : writef, writefln;
try writefln(fmt, args); import core.thread : Thread;
catch (Exception) {} try {
writef("[%s]: ", Thread.getThis().name);
writefln(fmt, args);
} catch (Exception) {}
} }