diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index e95681c..18c86b3 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -192,9 +192,9 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA return () @trusted { // scope scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); - + // FIXME: make this interruptible - auto result = asyncAwaitUninterruptible!(ConnectCallback, + auto result = asyncAwaitUninterruptible!(ConnectCallback, cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) //cb => eventDriver.sockets.cancelConnect(cb) ); @@ -511,7 +511,7 @@ struct TCPConnection { @property bool empty() { return leastSize == 0; } @property ulong leastSize() { waitForData(); return m_context && m_context.readBuffer.length; } @property bool dataAvailableForRead() { return waitForData(0.seconds); } - + void close() nothrow { //logInfo("close %s", cast(int)m_fd); @@ -522,7 +522,7 @@ struct TCPConnection { m_context = null; } } - + bool waitForData(Duration timeout = Duration.max) { mixin(tracer); @@ -602,7 +602,7 @@ mixin(tracer); auto res = asyncAwait!(IOCallback, cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), cb => eventDriver.sockets.cancelWrite(m_socket)); - + switch (res[1]) { default: throw new Exception("Error writing data to socket."); @@ -667,7 +667,7 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration do { if (LoopBody(timeout)) return; - + if (timeout != Duration.max) { auto prev = now; now = Clock.currTime(UTC()); @@ -683,6 +683,9 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration Represents a listening TCP socket. */ struct TCPListener { + // FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking + // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, + // stopListening() needs to keep working. private { StreamListenSocketFD m_socket; NetworkAddress m_bindAddress; @@ -704,7 +707,10 @@ struct TCPListener { /// Stops listening and closes the socket. void stopListening() { - assert(false); + if (m_socket != StreamListenSocketFD.invalid) { + eventDriver.sockets.releaseRef(m_socket); + m_socket = StreamListenSocketFD.invalid; + } } } @@ -722,7 +728,7 @@ struct UDPConnection { Context* m_context; } - private this(ref NetworkAddress bind_address) + private this(ref NetworkAddress bind_address) { scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); diff --git a/tests/vibe.core.net.1726.d b/tests/vibe.core.net.1726.d index 55af1e4..394ebc1 100644 --- a/tests/vibe.core.net.1726.d +++ b/tests/vibe.core.net.1726.d @@ -10,12 +10,11 @@ import vibe.core.net; import core.time : msecs; import vibe.core.log; -void main() -{ - bool done = false; - auto buf = new ubyte[512*1024*1024]; +ubyte[] buf; - listenTCP(11375,(conn) { +void performTest(bool reverse) +{ + auto l = listenTCP(11375, (conn) { bool read_ex = false; bool write_ex = false; auto rt = runTask!TCPConnection((conn) { @@ -29,10 +28,10 @@ void main() } // expected }, conn); auto wt = runTask!TCPConnection((conn) { - sleep(1.msecs); // give the connection time to establish + sleep(reverse ? 100.msecs : 20.msecs); // give the connection time to establish try { conn.write(buf); - assert(false, "Expected read() to throw an exception."); + assert(false, "Expected write() to throw an exception."); } catch (Exception) { write_ex = true; conn.close(); @@ -44,24 +43,28 @@ void main() wt.join(); assert(read_ex, "No read exception thrown"); assert(write_ex, "No write exception thrown"); - done = true; + logInfo("Test has finished successfully."); + exitEventLoop(); }, "127.0.0.1"); runTask({ try { auto conn = connectTCP("127.0.0.1", 11375); - sleep(10.msecs); + sleep(reverse ? 20.msecs : 100.msecs); conn.close(); } catch (Exception e) assert(false, e.msg); - sleep(50.msecs); - assert(done, "Not done"); - - exitEventLoop(); }); - setTimer(2000.msecs, { - assert(false, "Test has hung."); - }); + runEventLoop(); - runApplication(); + l.stopListening(); +} + +void main() +{ + setTimer(10000.msecs, { assert(false, "Test has hung."); }); + buf = new ubyte[512*1024*1024]; + + performTest(false); + performTest(true); }