Implement TCPListener.stopListening and fix the vibe.core.net.1726 test.

The test fix follows the fix in the vibe.d repository: rejectedsoftware/vibe.d#f960427e5974c176c58b516647895a2af4ea181b
This commit is contained in:
Sönke Ludwig 2017-07-18 11:55:39 +02:00
parent a70f35e846
commit d7b2173cb3
2 changed files with 34 additions and 25 deletions

View file

@ -192,9 +192,9 @@ TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyA
return () @trusted { // scope return () @trusted { // scope
scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen);
scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
// FIXME: make this interruptible // FIXME: make this interruptible
auto result = asyncAwaitUninterruptible!(ConnectCallback, auto result = asyncAwaitUninterruptible!(ConnectCallback,
cb => eventDriver.sockets.connectStream(uaddr, baddr, cb) cb => eventDriver.sockets.connectStream(uaddr, baddr, cb)
//cb => eventDriver.sockets.cancelConnect(cb) //cb => eventDriver.sockets.cancelConnect(cb)
); );
@ -511,7 +511,7 @@ struct TCPConnection {
@property bool empty() { return leastSize == 0; } @property bool empty() { return leastSize == 0; }
@property ulong leastSize() { waitForData(); return m_context && m_context.readBuffer.length; } @property ulong leastSize() { waitForData(); return m_context && m_context.readBuffer.length; }
@property bool dataAvailableForRead() { return waitForData(0.seconds); } @property bool dataAvailableForRead() { return waitForData(0.seconds); }
void close() void close()
nothrow { nothrow {
//logInfo("close %s", cast(int)m_fd); //logInfo("close %s", cast(int)m_fd);
@ -522,7 +522,7 @@ struct TCPConnection {
m_context = null; m_context = null;
} }
} }
bool waitForData(Duration timeout = Duration.max) bool waitForData(Duration timeout = Duration.max)
{ {
mixin(tracer); mixin(tracer);
@ -602,7 +602,7 @@ mixin(tracer);
auto res = asyncAwait!(IOCallback, auto res = asyncAwait!(IOCallback,
cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), cb => eventDriver.sockets.write(m_socket, bytes, mode, cb),
cb => eventDriver.sockets.cancelWrite(m_socket)); cb => eventDriver.sockets.cancelWrite(m_socket));
switch (res[1]) { switch (res[1]) {
default: default:
throw new Exception("Error writing data to socket."); throw new Exception("Error writing data to socket.");
@ -667,7 +667,7 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration
do { do {
if (LoopBody(timeout)) if (LoopBody(timeout))
return; return;
if (timeout != Duration.max) { if (timeout != Duration.max) {
auto prev = now; auto prev = now;
now = Clock.currTime(UTC()); now = Clock.currTime(UTC());
@ -683,6 +683,9 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration
Represents a listening TCP socket. Represents a listening TCP socket.
*/ */
struct TCPListener { struct TCPListener {
// FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking
// the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
// stopListening() needs to keep working.
private { private {
StreamListenSocketFD m_socket; StreamListenSocketFD m_socket;
NetworkAddress m_bindAddress; NetworkAddress m_bindAddress;
@ -704,7 +707,10 @@ struct TCPListener {
/// Stops listening and closes the socket. /// Stops listening and closes the socket.
void stopListening() void stopListening()
{ {
assert(false); if (m_socket != StreamListenSocketFD.invalid) {
eventDriver.sockets.releaseRef(m_socket);
m_socket = StreamListenSocketFD.invalid;
}
} }
} }
@ -722,7 +728,7 @@ struct UDPConnection {
Context* m_context; Context* m_context;
} }
private this(ref NetworkAddress bind_address) private this(ref NetworkAddress bind_address)
{ {
scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); m_socket = eventDriver.sockets.createDatagramSocket(baddr, null);

View file

@ -10,12 +10,11 @@ import vibe.core.net;
import core.time : msecs; import core.time : msecs;
import vibe.core.log; import vibe.core.log;
void main() ubyte[] buf;
{
bool done = false;
auto buf = new ubyte[512*1024*1024];
listenTCP(11375,(conn) { void performTest(bool reverse)
{
auto l = listenTCP(11375, (conn) {
bool read_ex = false; bool read_ex = false;
bool write_ex = false; bool write_ex = false;
auto rt = runTask!TCPConnection((conn) { auto rt = runTask!TCPConnection((conn) {
@ -29,10 +28,10 @@ void main()
} // expected } // expected
}, conn); }, conn);
auto wt = runTask!TCPConnection((conn) { auto wt = runTask!TCPConnection((conn) {
sleep(1.msecs); // give the connection time to establish sleep(reverse ? 100.msecs : 20.msecs); // give the connection time to establish
try { try {
conn.write(buf); conn.write(buf);
assert(false, "Expected read() to throw an exception."); assert(false, "Expected write() to throw an exception.");
} catch (Exception) { } catch (Exception) {
write_ex = true; write_ex = true;
conn.close(); conn.close();
@ -44,24 +43,28 @@ void main()
wt.join(); wt.join();
assert(read_ex, "No read exception thrown"); assert(read_ex, "No read exception thrown");
assert(write_ex, "No write exception thrown"); assert(write_ex, "No write exception thrown");
done = true; logInfo("Test has finished successfully.");
exitEventLoop();
}, "127.0.0.1"); }, "127.0.0.1");
runTask({ runTask({
try { try {
auto conn = connectTCP("127.0.0.1", 11375); auto conn = connectTCP("127.0.0.1", 11375);
sleep(10.msecs); sleep(reverse ? 20.msecs : 100.msecs);
conn.close(); conn.close();
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
sleep(50.msecs);
assert(done, "Not done");
exitEventLoop();
}); });
setTimer(2000.msecs, { runEventLoop();
assert(false, "Test has hung.");
});
runApplication(); l.stopListening();
}
void main()
{
setTimer(10000.msecs, { assert(false, "Test has hung."); });
buf = new ubyte[512*1024*1024];
performTest(false);
performTest(true);
} }