Update for eventcore 0.5.0 and the latest DMD beta.

This commit is contained in:
Sönke Ludwig 2016-10-24 08:22:37 +02:00
parent d7243dcd39
commit c08f101549
8 changed files with 32 additions and 210 deletions

View file

@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016, rejectedsoftware e.K." copyright "Copyright © 2016, rejectedsoftware e.K."
license "MIT" license "MIT"
dependency "eventcore" version="~>0.3.0" dependency "eventcore" version="~>0.5.0"
//subConfiguration "eventcore" "libasync" //subConfiguration "eventcore" "libasync"

View file

@ -3,22 +3,29 @@ import vibe.core.log;
import vibe.core.net; import vibe.core.net;
//import vibe.stream.operations; //import vibe.stream.operations;
import std.exception : enforce;
import std.functional : toDelegate; import std.functional : toDelegate;
void main() void main()
{ {
void staticAnswer(TCPConnection conn) void staticAnswer(TCPConnection conn)
nothrow @safe { nothrow @safe {
try { try {
while (!conn.empty) { while (!conn.empty) {
logInfo("read request");
while (true) { while (true) {
CountingRange r; CountingRange r;
conn.readLine(r); conn.readLine(r);
if (!r.count) break; if (!r.count) break;
} }
logInfo("write answer");
conn.write(cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"); conn.write(cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!");
logInfo("flush");
conn.flush(); conn.flush();
logInfo("wait for next request");
} }
logInfo("out");
} catch (Exception e) { } catch (Exception e) {
scope (failure) assert(false); scope (failure) assert(false);
logError("Error processing request: %s", e.msg); logError("Error processing request: %s", e.msg);
@ -40,103 +47,14 @@ struct CountingRange {
import std.range.primitives : isOutputRange; import std.range.primitives : isOutputRange;
/** void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max)
Reads and returns a single line from the stream.
Throws:
An exception if either the stream end was hit without hitting a newline first, or
if more than max_bytes have been read from the stream.
*/
ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", Allocator alloc = defaultAllocator()) /*@ufcs*/
{
auto output = AllocAppender!(ubyte[])(alloc);
output.reserve(max_bytes < 64 ? max_bytes : 64);
readLine(stream, output, max_bytes, linesep);
return output.data();
}
/// ditto
void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
{
import vibe.stream.wrapper;
auto dstrng = StreamOutputRange(dst);
readLine(stream, dstrng, max_bytes, linesep);
}
/// ditto
void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
if (isOutputRange!(R, ubyte)) if (isOutputRange!(R, ubyte))
{ {
readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes);
}
/**
Reads all data of a stream until the specified end marker is detected.
Params:
stream = The input stream which is searched for end_marker
end_marker = The byte sequence which is searched in the stream
max_bytes = An optional limit of how much data is to be read from the
input stream; if the limit is reaached before hitting the end
marker, an exception is thrown.
alloc = An optional allocator that is used to build the result string
in the string variant of this function
dst = The output stream, to which the prefix to the end marker of the
input stream is written
Returns:
The string variant of this function returns the complete prefix to the
end marker of the input stream, excluding the end marker itself.
Throws:
An exception if either the stream end was hit without hitting a marker
first, or if more than max_bytes have been read from the stream in
case of max_bytes != 0.
Remarks:
This function uses an algorithm inspired by the
$(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm,
Boyer-Moore string search algorithm). However, contrary to the original
algorithm, it will scan the whole input string exactly once, without
jumping over portions of it. This allows the algorithm to work with
constant memory requirements and without the memory copies that would
be necessary for streams that do not hold their complete data in
memory.
The current implementation has a run time complexity of O(n*m+m²) and
O(n+m) in typical cases, with n being the length of the scanned input
string and m the length of the marker.
*/
ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, Allocator alloc = defaultAllocator()) /*@ufcs*/
{
auto output = AllocAppender!(ubyte[])(alloc);
output.reserve(max_bytes < 64 ? max_bytes : 64);
readUntil(stream, output, end_marker, max_bytes);
return output.data();
}
/// ditto
void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
{
import vibe.stream.wrapper;
auto dstrng = StreamOutputRange(dst);
readUntil(stream, dstrng, end_marker, max_bytes);
}
/// ditto
void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
if (isOutputRange!(R, ubyte))
{
assert(max_bytes > 0 && end_marker.length > 0);
if (end_marker.length <= 2)
readUntilSmall(stream, dst, end_marker, max_bytes);
else
readUntilGeneric(stream, dst, end_marker, max_bytes);
}
private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max)
@safe {
import std.algorithm.comparison : min, max; import std.algorithm.comparison : min, max;
import std.algorithm.searching : countUntil; import std.algorithm.searching : countUntil;
enum end_marker = "\r\n";
assert(end_marker.length >= 1 && end_marker.length <= 2); assert(end_marker.length >= 1 && end_marker.length <= 2);
size_t nmatched = 0; size_t nmatched = 0;
@ -186,106 +104,6 @@ private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ub
} }
} }
private final class Buffer { ubyte[64*1024-4*size_t.sizeof] bytes = void; } // 64k - some headroom for
private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
if (isOutputRange!(R, ubyte))
{
import std.algorithm.comparison : min;
// allocate internal jump table to optimize the number of comparisons
size_t[8] nmatchoffsetbuffer = void;
size_t[] nmatchoffset;
if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length];
else nmatchoffset = new size_t[end_marker.length];
// precompute the jump table
nmatchoffset[0] = 0;
foreach( i; 1 .. end_marker.length ){
nmatchoffset[i] = i;
foreach_reverse( j; 1 .. i )
if( end_marker[j .. i] == end_marker[0 .. i-j] ){
nmatchoffset[i] = i-j;
break;
}
assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i);
}
size_t nmatched = 0;
scope bufferobj = new Buffer; // FIXME: use heap allocation
auto buf = bufferobj.bytes[];
ulong bytes_read = 0;
void skip2(size_t nbytes)
{
bytes_read += nbytes;
stream.skip(nbytes);
}
while( !stream.empty ){
enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker.");
// try to get as much data as possible, either by peeking into the stream or
// by reading as much as isguaranteed to not exceed the end marker length
// the block size is also always limited by the max_bytes parameter.
size_t nread = 0;
auto least_size = stream.leastSize(); // NOTE: blocks until data is available
auto max_read = max_bytes - bytes_read;
auto str = stream.peek(); // try to get some data for free
if( str.length == 0 ){ // if not, read as much as possible without reading past the end
nread = min(least_size, end_marker.length-nmatched, buf.length, max_read);
stream.read(buf[0 .. nread]);
str = buf[0 .. nread];
bytes_read += nread;
} else if( str.length > max_read ){
str.length = cast(size_t)max_read;
}
// remember how much of the marker was already matched before processing the current block
size_t nmatched_start = nmatched;
// go through the current block trying to match the marker
size_t i = 0;
for (i = 0; i < str.length; i++) {
auto ch = str[i];
// if we have a mismatch, use the jump table to try other possible prefixes
// of the marker
while( nmatched > 0 && ch != end_marker[nmatched] )
nmatched -= nmatchoffset[nmatched];
// if we then have a match, increase the match count and test for full match
if (ch == end_marker[nmatched])
if (++nmatched == end_marker.length) {
i++;
break;
}
}
// write out any false match part of previous blocks
if( nmatched_start > 0 ){
if( nmatched <= i ) dst.put(end_marker[0 .. nmatched_start]);
else dst.put(end_marker[0 .. nmatched_start-nmatched+i]);
}
// write out any unmatched part of the current block
if( nmatched < i ) dst.put(str[0 .. i-nmatched]);
// got a full, match => out
if (nmatched >= end_marker.length) {
// in case of a full match skip data in the stream until the end of
// the marker
skip2(i - nread);
return;
}
// otherwise skip this block in the stream
skip2(str.length - nread);
}
enforce(false, "Reached EOF before reaching end marker.");
}
static if (!is(typeof(TCPConnection.init.skip(0)))) static if (!is(typeof(TCPConnection.init.skip(0))))
{ {
private void skip(ref TCPConnection str, ulong count) private void skip(ref TCPConnection str, ulong count)

View file

@ -627,10 +627,10 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable,
mixin(callWithMove!ARGS("c", "args.expand")); mixin(callWithMove!ARGS("c", "args.expand"));
} }
return () @trusted {
TaskFuncInfo tfi; TaskFuncInfo tfi;
tfi.func = &callDelegate; tfi.func = &callDelegate;
() @trusted {
static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE();
static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS();
tfi.typedCallable!CALLABLE = callable; tfi.typedCallable!CALLABLE = callable;
@ -638,8 +638,8 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable,
static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]);
else tfi.typedArgs!TARGS.expand[i] = args[i]; else tfi.typedArgs!TARGS.expand[i] = args[i];
} }
} ();
return tfi; return tfi;
} ();
} }
@ -1094,7 +1094,7 @@ struct Timer {
if (!this.pending) return; if (!this.pending) return;
asyncAwait!(TimerCallback, asyncAwait!(TimerCallback,
cb => m_driver.wait(m_id, cb), cb => m_driver.wait(m_id, cb),
cb => m_driver.cancelWait(m_id, cb) cb => m_driver.cancelWait(m_id)
); );
} }
} }
@ -1360,7 +1360,7 @@ private void shutdownDriver()
ManualEvent.ms_threadEvent = EventID.init; ManualEvent.ms_threadEvent = EventID.init;
} }
eventDriver.core.dispose(); eventDriver.dispose();
} }
private void workerThreadFunc() private void workerThreadFunc()

View file

@ -321,7 +321,7 @@ struct TCPConnection {
private this(StreamSocketFD socket) private this(StreamSocketFD socket)
nothrow { nothrow {
m_socket = socket; m_socket = socket;
m_context = &eventDriver.core.userData!Context(socket); m_context = () @trusted { return &eventDriver.core.userData!Context(socket); } ();
m_context.readBuffer.capacity = 4096; m_context.readBuffer.capacity = 4096;
} }
@ -360,7 +360,7 @@ struct TCPConnection {
nothrow { nothrow {
//logInfo("close %s", cast(int)m_fd); //logInfo("close %s", cast(int)m_fd);
if (m_socket != StreamSocketFD.invalid) { if (m_socket != StreamSocketFD.invalid) {
eventDriver.sockets.shutdown(m_socket); eventDriver.sockets.shutdown(m_socket, true, true);
eventDriver.sockets.releaseRef(m_socket); eventDriver.sockets.releaseRef(m_socket);
m_socket = StreamSocketFD.invalid; m_socket = StreamSocketFD.invalid;
m_context = null; m_context = null;

View file

@ -323,7 +323,7 @@ final package class TaskFiber : Fiber {
private void run() private void run()
{ {
import std.encoding : sanitize; import std.encoding : sanitize;
import std.concurrency : Tid; import std.concurrency : Tid, thisTid;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield; import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield;
version (VibeDebugCatchAll) alias UncaughtException = Throwable; version (VibeDebugCatchAll) alias UncaughtException = Throwable;
@ -346,12 +346,12 @@ final package class TaskFiber : Fiber {
m_running = true; m_running = true;
scope(exit) m_running = false; scope(exit) m_running = false;
std.concurrency.thisTid; // force creation of a message box thisTid; // force creation of a message box
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
if (!isEventLoopRunning) { if (!isEventLoopRunning) {
logTrace("Event loop not running at task start - yielding."); logTrace("Event loop not running at task start - yielding.");
vibe.core.core.taskScheduler.yieldUninterruptible(); taskScheduler.yieldUninterruptible();
logTrace("Initial resume of task."); logTrace("Initial resume of task.");
} }
task.func(&task); task.func(&task);
@ -597,7 +597,7 @@ package struct TaskScheduler {
// if the first run didn't process any events, block and // if the first run didn't process any events, block and
// process one chunk // process one chunk
logTrace("Wait for new events to process..."); logTrace("Wait for new events to process...");
er = eventDriver.core.processEvents(); er = eventDriver.core.processEvents(Duration.max);
logTrace("Done."); logTrace("Done.");
final switch (er) { final switch (er) {
case ExitReason.exited: return ExitReason.exited; case ExitReason.exited: return ExitReason.exited;

View file

@ -332,12 +332,16 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) {
auto dst = newbuffer; auto dst = newbuffer;
auto newfill = min(m_fill, new_size); auto newfill = min(m_fill, new_size);
read(dst[0 .. newfill]); read(dst[0 .. newfill]);
if (m_freeOnDestruct && m_buffer.length > 0) delete m_buffer; if (m_freeOnDestruct && m_buffer.length > 0) () @trusted {
delete m_buffer;
} ();
m_buffer = newbuffer; m_buffer = newbuffer;
m_start = 0; m_start = 0;
m_fill = newfill; m_fill = newfill;
} else { } else {
if (m_freeOnDestruct && m_buffer.length > 0) delete m_buffer; if (m_freeOnDestruct && m_buffer.length > 0) () @trusted {
delete m_buffer;
} ();
m_buffer = new T[new_size]; m_buffer = new T[new_size];
} }
} }

View file

@ -51,11 +51,11 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
import eventcore.core; import eventcore.core;
auto tm = eventDriver.timers.create(); auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, timeout); eventDriver.timers.set(tm, timeout, 0.seconds);
scope (exit) eventDriver.timers.releaseRef(tm); scope (exit) eventDriver.timers.releaseRef(tm);
Waitable!( Waitable!(
cb => eventDriver.timers.wait(tm, cb), cb => eventDriver.timers.wait(tm, cb),
cb => eventDriver.timers.cancelWait(tm, cb), cb => eventDriver.timers.cancelWait(tm),
TimerID TimerID
) timerwaitable; ) timerwaitable;
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables); asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);

View file

@ -7,7 +7,7 @@ class C {
this() this()
{ {
m_conn = connectTCP("google.com", 443); m_conn = connectTCP("example.com", 443);
} }
~this() ~this()