diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index d9d7979..2c2a2a6 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -619,7 +619,7 @@ mixin(tracer); if (is(typeof(() @safe { read_ready_callback(true); } ()))) { mixin(tracer); - import vibe.core.core : setTimer; + import vibe.core.core : Timer, setTimer; if (!m_context) return WaitForDataAsyncStatus.noMoreData; @@ -632,17 +632,49 @@ mixin(tracer); return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; } - auto tm = setTimer(timeout, { - eventDriver.sockets.cancelRead(m_socket); - read_ready_callback(false); - }); - eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once, - (sock, st, nb) { - tm.stop(); - assert(m_context.readBuffer.length == 0); - m_context.readBuffer.putN(nb); - read_ready_callback(m_context.readBuffer.length > 0); - }); + static final class WaitContext { + import std.algorithm.mutation : move; + + CALLABLE callback; + TCPConnection connection; + Timer timer; + + this(CALLABLE callback, TCPConnection connection, Duration timeout) + { + this.callback = callback; + this.connection = connection; + if (timeout < Duration.max) + this.timer = setTimer(timeout, &onTimeout); + } + + void onTimeout() + { + eventDriver.sockets.cancelRead(connection.m_socket); + invoke(false); + } + + void onData(StreamSocketFD, IOStatus st, size_t nb) + { + if (timer) timer.stop(); + assert(connection.m_context.readBuffer.length == 0); + connection.m_context.readBuffer.putN(nb); + invoke(connection.m_context.readBuffer.length > 0); + } + + void invoke(bool status) + { + auto cb = move(callback); + connection = TCPConnection.init; + timer = Timer.init; + cb(status); + } + } + + // FIXME: make this work without a heap allocation! + auto context = new WaitContext(read_ready_callback, this, timeout); + + eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), + IOMode.once, &context.onData); return WaitForDataAsyncStatus.waiting; } @@ -758,6 +790,20 @@ enum WaitForDataAsyncStatus { waiting, } +unittest { // test compilation of callback with scoped destruction + static struct CB { + ~this() {} + this(this) {} + void opCall(bool) {} + } + + void test() { + TCPConnection c; + CB cb; + c.waitForDataAsync(cb); + } +} + mixin validateConnectionStream!TCPConnection; @@ -1066,4 +1112,4 @@ class ReadTimeoutException: Exception { super(message, file, line, next); } -} \ No newline at end of file +}