diff --git a/source/eventcore/drivers/posix/sockets.d b/source/eventcore/drivers/posix/sockets.d index d67743f..6197537 100644 --- a/source/eventcore/drivers/posix/sockets.d +++ b/source/eventcore/drivers/posix/sockets.d @@ -20,11 +20,22 @@ version (Posix) { version (linux) enum SO_REUSEPORT = 15; else enum SO_REUSEPORT = 0x200; + + static if (!is(typeof(O_CLOEXEC))) + { + version (linux) enum O_CLOEXEC = 0x80000; + else version (FreeBSD) enum O_CLOEXEC = 0x100000; + else version (NetBSD) enum O_CLOEXEC = 0x400000; + else version (OpenBSD) enum O_CLOEXEC = 0x10000; + else version (OSX) enum O_CLOEXEC = 0x1000000; + } } version (linux) { extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc; static if (!is(typeof(SOCK_NONBLOCK))) enum SOCK_NONBLOCK = 0x800; + static if (!is(typeof(SOCK_CLOEXEC))) + enum SOCK_CLOEXEC = 0x80000; static if (__VERSION__ < 2077) { @@ -230,12 +241,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets sockaddr_storage addr; socklen_t addr_len = addr.sizeof; version (linux) { - () @trusted { sockfd = accept4(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len, SOCK_NONBLOCK); } (); + () @trusted { sockfd = accept4(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); } (); if (sockfd == -1) return; } else { () @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } (); if (sockfd == -1) return; - setSocketNonBlocking(cast(SocketFD)sockfd); + setSocketNonBlocking(cast(SocketFD)sockfd, true); } auto fd = cast(StreamSocketFD)sockfd; m_loop.initFD(fd, FDFlags.none); @@ -850,12 +861,12 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { sock_t sock; version (linux) { - () @trusted { sock = socket(family, type | SOCK_NONBLOCK, 0); } (); + () @trusted { sock = socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); } (); if (sock == -1) return -1; } else { () @trusted { sock = socket(family, type, 0); } (); if (sock == -1) return -1; - setSocketNonBlocking(cast(SocketFD)sock); + setSocketNonBlocking(cast(SocketFD)sock, true); } return sock; } @@ -917,13 +928,15 @@ private void closeSocket(sock_t sockfd) else close(sockfd); } -private void setSocketNonBlocking(SocketFD sockfd) +private void setSocketNonBlocking(SocketFD sockfd, bool close_on_exec = false) @nogc nothrow { version (Windows) { uint enable = 1; () @trusted { ioctlsocket(sockfd, FIONBIO, &enable); } (); } else { - () @trusted { fcntl(cast(int)sockfd, F_SETFL, O_NONBLOCK, 1); } (); + int f = O_NONBLOCK; + if (close_on_exec) f |= O_CLOEXEC; + () @trusted { fcntl(cast(int)sockfd, F_SETFL, f); } (); } } diff --git a/tests/0-tcp.d b/tests/0-tcp.d index 92708f4..fa79fdf 100644 --- a/tests/0-tcp.d +++ b/tests/0-tcp.d @@ -6,14 +6,20 @@ module test; import eventcore.core; import eventcore.socket; +import eventcore.internal.utils : print; import std.socket : InternetAddress; -import core.time : Duration; +import core.time : Duration, msecs; ubyte[256] s_rbuf; bool s_done; void main() { + // watchdog timer in case of starvation/deadlocks + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, 10000.msecs, 0.msecs); + eventDriver.timers.wait(tm, (tm) { assert(false, "Test hung."); }); + static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; static ubyte[] pack2 = [4, 3, 2, 1, 0]; @@ -24,33 +30,46 @@ void main() server.waitForConnections!((incoming_, addr) { incoming = incoming_; // work around ref counting issue + print("Got incoming, reading data"); incoming.read!((status, bts) { + print("Got data"); assert(status == IOStatus.ok); assert(bts == pack1.length); assert(s_rbuf[0 .. pack1.length] == pack1); + print("Second write"); client.write!((status, bytes) { + print("Second write done"); assert(status == IOStatus.ok); assert(bytes == pack2.length); })(pack2, IOMode.once); + print("Second read"); incoming.read!((status, bts) { + print("Second read done"); assert(status == IOStatus.ok); assert(bts == pack2.length); assert(s_rbuf[0 .. pack2.length] == pack2); + destroy(client); destroy(incoming); destroy(server); - destroy(client); s_done = true; + eventDriver.timers.stop(tm); + + // NOTE: one reference to incoming is still held by read() + //assert(eventDriver.core.waiterCount == 1); })(s_rbuf, IOMode.once); })(s_rbuf, IOMode.once); }); + print("Connect..."); connectStream!((sock, status) { client = sock; assert(status == ConnectStatus.connected); + print("Initial write"); client.write!((wstatus, bytes) { + print("Initial write done"); assert(wstatus == IOStatus.ok); assert(bytes == 10); })(pack1, IOMode.all);