Tighten the file descriptor slot checks.

Gives the chance to detect possible dangling file descriptors earlier.
This commit is contained in:
Sönke Ludwig 2018-03-17 11:36:58 +01:00
parent d2cac0cca1
commit 7da41301af
5 changed files with 52 additions and 30 deletions

View file

@ -297,6 +297,8 @@ package class PosixEventLoop {
package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
{ {
assert(m_fds[fd.value].common.refCount > 0,
"Setting notification callback on unreferenced file descriptor slot.");
assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
"Overwriting notification callback."); "Overwriting notification callback.");
// ensure that the FD doesn't get closed before the callback gets called. // ensure that the FD doesn't get closed before the callback gets called.
@ -312,17 +314,24 @@ package class PosixEventLoop {
} }
} }
package void initFD(FD fd, FDFlags flags) package void initFD(T)(FD fd, FDFlags flags, auto ref T slot_init)
{ {
with (m_fds[fd.value]) { with (m_fds[fd.value]) {
assert(common.refCount == 0, "Initializing referenced file descriptor slot.");
assert(specific.kind == typeof(specific).Kind.none, "Initializing slot that has not been cleared.");
common.refCount = 1; common.refCount = 1;
common.flags = flags; common.flags = flags;
specific = slot_init;
} }
} }
package void clearFD(FD fd) package void clearFD(T)(FD fd)
{ {
import taggedalgebraic : hasType;
auto slot = () @trusted { return &m_fds[fd.value]; } (); auto slot = () @trusted { return &m_fds[fd.value]; } ();
assert(slot.common.refCount == 0, "Clearing referenced file descriptor slot.");
assert(slot.specific.hasType!T, "Clearing file descriptor slot with unmatched type.");
if (slot.common.userDataDestructor) if (slot.common.userDataDestructor)
() @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } (); () @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } ();
if (!(slot.common.flags & FDFlags.internal)) if (!(slot.common.flags & FDFlags.internal))

View file

@ -55,8 +55,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (eid == -1) return EventID.invalid; if (eid == -1) return EventID.invalid;
auto id = cast(EventID)eid; auto id = cast(EventID)eid;
m_loop.initFD(id, FDFlags.internal); // FIXME: avoid dynamic memory allocation for the queue
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal));
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
@ -114,8 +115,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
synchronized (m_eventsMutex) synchronized (m_eventsMutex)
m_events[s] = id; m_events[s] = id;
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
m_loop.initFD(id, FDFlags.internal); // FIXME: avoid dynamic memory allocation for the queue
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s); // FIXME: avoid dynamic memory allocation m_loop.initFD(id, FDFlags.internal,
EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s));
assert(getRC(id) == 1); assert(getRC(id) == 1);
return id; return id;
} }
@ -219,7 +221,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
m_events.remove(rs); m_events.remove(rs);
} catch (Exception e) nogc_assert(false, e.msg); } catch (Exception e) nogc_assert(false, e.msg);
} }
m_loop.clearFD(descriptor); m_loop.clearFD!EventSlot(descriptor);
version (Posix) close(cast(int)descriptor); version (Posix) close(cast(int)descriptor);
else () @trusted { closesocket(cast(SOCKET)descriptor); } (); else () @trusted { closesocket(cast(SOCKET)descriptor); } ();
return false; return false;

View file

@ -38,8 +38,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
} (); } ();
m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none); m_loop.initFD(cast(FD)fd, is_internal ? FDFlags.internal : FDFlags.none, SignalSlot(on_signal));
m_loop.m_fds[fd].specific = SignalSlot(on_signal);
m_loop.registerFD(cast(FD)fd, EventMask.read); m_loop.registerFD(cast(FD)fd, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal); m_loop.setNotifyCallback!(EventType.read)(cast(FD)fd, &onSignal);
@ -59,8 +58,9 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
nogc_assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); nogc_assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback adds a second reference
m_loop.setNotifyCallback!(EventType.read)(fd, null);
m_loop.unregisterFD(fd, EventMask.read); m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd); m_loop.clearFD!SignalSlot(fd);
close(cast(int)fd); close(cast(int)fd);
return false; return false;
} }

View file

@ -109,9 +109,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return sock; return sock;
} }
m_loop.initFD(sock, FDFlags.none); m_loop.initFD(sock, FDFlags.none, StreamSocketSlot.init);
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[sock].specific = StreamSocketSlot.init;
m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError); m_loop.setNotifyCallback!(EventType.status)(sock, &onConnectError);
releaseRef(sock); // onConnectError callback is weak reference releaseRef(sock); // onConnectError callback is weak reference
@ -128,7 +127,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect); m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
} else { } else {
m_loop.clearFD(sock); m_loop.clearFD!StreamSocketSlot(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
invalidateSocket(); invalidateSocket();
on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError); on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError);
@ -148,7 +147,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
"Unable to cancel connect on the socket that is not in connecting state"); "Unable to cancel connect on the socket that is not in connecting state");
state = ConnectionState.closed; state = ConnectionState.closed;
connectCallback = null; connectCallback = null;
m_loop.clearFD(sock); m_loop.setNotifyCallback!(EventType.status)(sock, null);
m_loop.setNotifyCallback!(EventType.write)(sock, null);
m_loop.clearFD!StreamSocketSlot(sock);
m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.unregisterFD(sock, EventMask.read|EventMask.write|EventMask.status);
closeSocket(cast(sock_t)sock.value); closeSocket(cast(sock_t)sock.value);
} }
@ -160,9 +161,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (m_loop.m_fds[fd].common.refCount) // FD already in use? if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return StreamSocketFD.invalid; return StreamSocketFD.invalid;
setSocketNonBlocking(fd); setSocketNonBlocking(fd);
m_loop.initFD(fd, FDFlags.none); m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
return fd; return fd;
} }
@ -226,8 +226,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (sock == StreamListenSocketFD.invalid) if (sock == StreamListenSocketFD.invalid)
return sock; return sock;
m_loop.initFD(sock, FDFlags.none); m_loop.initFD(sock, FDFlags.none, StreamListenSocketSlot.init);
m_loop.m_fds[sock].specific = StreamListenSocketSlot.init;
if (on_accept) waitForConnections(sock, on_accept); if (on_accept) waitForConnections(sock, on_accept);
@ -256,8 +255,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
setSocketNonBlocking(cast(SocketFD)sockfd, true); setSocketNonBlocking(cast(SocketFD)sockfd, true);
} }
auto fd = cast(StreamSocketFD)sockfd; auto fd = cast(StreamSocketFD)sockfd;
m_loop.initFD(fd, FDFlags.none); m_loop.initFD(fd, FDFlags.none, StreamSocketSlot.init);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
@ -371,8 +369,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
{ {
auto l = lockHandle(socket); auto l = lockHandle(socket);
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
assert(m_loop.m_fds[socket].common.refCount > 0);
//m_fds[fd].readBuffer = null; //m_fds[fd].readBuffer = null;
slot.readCallback(socket, status, slot.bytesRead); slot.readCallback(socket, status, slot.bytesRead);
assert(m_loop.m_fds[socket].common.refCount > 0);
} }
sizediff_t ret = 0; sizediff_t ret = 0;
@ -590,8 +590,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
} }
} }
m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none); m_loop.initFD(sock, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
m_loop.m_fds[sock].specific = DgramSocketSlot.init;
m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(sock, EventMask.read|EventMask.write|EventMask.status);
return sock; return sock;
@ -608,9 +607,8 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (m_loop.m_fds[fd].common.refCount) // FD already in use? if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init; return DatagramSocketFD.init;
setSocketNonBlocking(fd, close_on_exec); setSocketNonBlocking(fd, close_on_exec);
m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none); m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none, DgramSocketSlot.init);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
return fd; return fd;
} }
@ -813,7 +811,19 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0; int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0;
if (--slot.common.refCount == base_refcount) { if (--slot.common.refCount == base_refcount) {
m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.unregisterFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD(fd); switch (slot.specific.kind) with (slot.specific.Kind) {
default: assert(false, "File descriptor slot is not a socket.");
case streamSocket:
m_loop.clearFD!StreamSocketSlot(fd);
break;
case streamListen:
m_loop.setNotifyCallback!(EventType.read)(fd, null);
m_loop.clearFD!StreamListenSocketSlot(fd);
break;
case datagramSocket:
m_loop.clearFD!DgramSocketSlot(fd);
break;
}
closeSocket(cast(sock_t)fd); closeSocket(cast(sock_t)fd);
return false; return false;
} }

View file

@ -46,10 +46,9 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
if (recursive) if (recursive)
addSubWatches(ret, path, ""); addSubWatches(ret, path, "");
m_loop.initFD(FD(handle), FDFlags.none); m_loop.initFD(FD(handle), FDFlags.none, WatcherSlot(callback));
m_loop.registerFD(FD(handle), EventMask.read); m_loop.registerFD(FD(handle), EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges); m_loop.setNotifyCallback!(EventType.read)(FD(handle), &onChanges);
m_loop.m_fds[handle].specific = WatcherSlot(callback);
processEvents(WatcherID(handle)); processEvents(WatcherID(handle));
@ -65,10 +64,12 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
final override bool releaseRef(WatcherID descriptor) final override bool releaseRef(WatcherID descriptor)
{ {
FD fd = cast(FD)descriptor; FD fd = cast(FD)descriptor;
nogc_assert(m_loop.m_fds[fd].common.refCount > 0, "Releasing reference to unreferenced event FD."); auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
if (--m_loop.m_fds[fd].common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced event FD.");
if (--slot.common.refCount == 1) { // NOTE: 1 because setNotifyCallback increments the reference count
m_loop.setNotifyCallback!(EventType.read)(fd, null);
m_loop.unregisterFD(fd, EventMask.read); m_loop.unregisterFD(fd, EventMask.read);
m_loop.clearFD(fd); m_loop.clearFD!WatcherSlot(fd);
m_watches.remove(descriptor); m_watches.remove(descriptor);
/*errnoEnforce(*/close(cast(int)fd)/* == 0)*/; /*errnoEnforce(*/close(cast(int)fd)/* == 0)*/;
return false; return false;