MonoTime in loopWithTimeout, and other read tweaks.
MonoTime is more robust and does not involve sophisticated timezone shenanigans, making it better suited for networking operations. Fixes #39. This commit also distinguishes read timeout exception as Exception child ReadTimeoutException, wich is thrown from loopWithTimeout function. Optimistic read branch for tcp socket improves performance on granular reads.
This commit is contained in:
parent
2006ace251
commit
eea57f8914
|
@ -659,7 +659,7 @@ mixin(tracer);
|
||||||
m_context.readBuffer.popFrontN(n);
|
m_context.readBuffer.popFrontN(n);
|
||||||
count -= n;
|
count -= n;
|
||||||
return count == 0;
|
return count == 0;
|
||||||
});
|
}, ReadTimeoutException);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t read(scope ubyte[] dst, IOMode mode)
|
size_t read(scope ubyte[] dst, IOMode mode)
|
||||||
|
@ -667,6 +667,10 @@ mixin(tracer);
|
||||||
mixin(tracer);
|
mixin(tracer);
|
||||||
import std.algorithm.comparison : min;
|
import std.algorithm.comparison : min;
|
||||||
if (!dst.length) return 0;
|
if (!dst.length) return 0;
|
||||||
|
if (m_context.readBuffer.length >= dst.length) {
|
||||||
|
m_context.readBuffer.read(dst);
|
||||||
|
return dst.length;
|
||||||
|
}
|
||||||
size_t nbytes = 0;
|
size_t nbytes = 0;
|
||||||
m_context.readTimeout.loopWithTimeout!((remaining) {
|
m_context.readTimeout.loopWithTimeout!((remaining) {
|
||||||
if (m_context.readBuffer.length == 0) {
|
if (m_context.readBuffer.length == 0) {
|
||||||
|
@ -680,7 +684,7 @@ mixin(tracer);
|
||||||
dst = dst[l .. $];
|
dst = dst[l .. $];
|
||||||
nbytes += l;
|
nbytes += l;
|
||||||
return dst.length == 0;
|
return dst.length == 0;
|
||||||
});
|
}, ReadTimeoutException);
|
||||||
return nbytes;
|
return nbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -746,7 +750,7 @@ mixin(tracer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Represents possible return values for
|
/** Represents possible return values for
|
||||||
TCPConnection.waitForDataAsync.
|
TCPConnection.waitForDataAsync.
|
||||||
*/
|
*/
|
||||||
enum WaitForDataAsyncStatus {
|
enum WaitForDataAsyncStatus {
|
||||||
noMoreData,
|
noMoreData,
|
||||||
|
@ -757,14 +761,14 @@ enum WaitForDataAsyncStatus {
|
||||||
|
|
||||||
mixin validateConnectionStream!TCPConnection;
|
mixin validateConnectionStream!TCPConnection;
|
||||||
|
|
||||||
private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout)
|
private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout,
|
||||||
|
immutable string timeoutMsg = "Operation timed out.")
|
||||||
{
|
{
|
||||||
import core.time : seconds;
|
import core.time : seconds, MonoTime;
|
||||||
import std.datetime : Clock, SysTime, UTC;
|
|
||||||
|
|
||||||
SysTime now;
|
MonoTime now;
|
||||||
if (timeout != Duration.max)
|
if (timeout != Duration.max)
|
||||||
now = Clock.currTime(UTC());
|
now = MonoTime.currTime();
|
||||||
|
|
||||||
do {
|
do {
|
||||||
if (LoopBody(timeout))
|
if (LoopBody(timeout))
|
||||||
|
@ -772,12 +776,12 @@ private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration
|
||||||
|
|
||||||
if (timeout != Duration.max) {
|
if (timeout != Duration.max) {
|
||||||
auto prev = now;
|
auto prev = now;
|
||||||
now = Clock.currTime(UTC());
|
now = MonoTime.currTime();
|
||||||
if (now > prev) timeout -= now - prev;
|
if (now > prev) timeout -= now - prev;
|
||||||
}
|
}
|
||||||
} while (timeout > 0.seconds);
|
} while (timeout > 0.seconds);
|
||||||
|
|
||||||
throw new ExceptionType("Operation timed out.");
|
throw new ExceptionType(timeoutMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1018,7 +1022,7 @@ enum TCPListenOptions {
|
||||||
/// Disables automatic closing of the connection when the connection callback exits
|
/// Disables automatic closing of the connection when the connection callback exits
|
||||||
disableAutoClose = 1<<1,
|
disableAutoClose = 1<<1,
|
||||||
/** Enable port reuse on linux kernel version >=3.9, do nothing on other OS
|
/** Enable port reuse on linux kernel version >=3.9, do nothing on other OS
|
||||||
Does not affect libasync driver because it is always enabled by libasync.
|
Does not affect libasync driver because it is always enabled by libasync.
|
||||||
*/
|
*/
|
||||||
reusePort = 1<<2,
|
reusePort = 1<<2,
|
||||||
}
|
}
|
||||||
|
@ -1042,3 +1046,24 @@ private pure nothrow {
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum tracer = "";
|
private enum tracer = "";
|
||||||
|
|
||||||
|
|
||||||
|
/// Thrown by TCPConnection read-alike operations when timeout is reached.
|
||||||
|
class ReadTimeoutException: Exception
|
||||||
|
{
|
||||||
|
@safe pure nothrow this(string message,
|
||||||
|
Throwable next,
|
||||||
|
string file =__FILE__,
|
||||||
|
size_t line = __LINE__)
|
||||||
|
{
|
||||||
|
super(message, next, file, line);
|
||||||
|
}
|
||||||
|
|
||||||
|
@safe pure nothrow this(string message,
|
||||||
|
string file =__FILE__,
|
||||||
|
size_t line = __LINE__,
|
||||||
|
Throwable next = null)
|
||||||
|
{
|
||||||
|
super(message, file, line, next);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue