diff --git a/dub.sdl b/dub.sdl index a79ff4c..bdaa0e8 100644 --- a/dub.sdl +++ b/dub.sdl @@ -4,7 +4,7 @@ authors "Sönke Ludwig" copyright "Copyright © 2016, rejectedsoftware e.K." license "MIT" -dependency "eventcore" version="~>0.3.0" +dependency "eventcore" version="~>0.5.0" //subConfiguration "eventcore" "libasync" diff --git a/examples/bench-dummy-http-server/source/app.d b/examples/bench-dummy-http-server/source/app.d index b2b5a93..3ee23d2 100644 --- a/examples/bench-dummy-http-server/source/app.d +++ b/examples/bench-dummy-http-server/source/app.d @@ -3,22 +3,29 @@ import vibe.core.log; import vibe.core.net; //import vibe.stream.operations; +import std.exception : enforce; import std.functional : toDelegate; + void main() { void staticAnswer(TCPConnection conn) nothrow @safe { try { while (!conn.empty) { + logInfo("read request"); while (true) { CountingRange r; conn.readLine(r); if (!r.count) break; } + logInfo("write answer"); conn.write(cast(const(ubyte)[])"HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"); + logInfo("flush"); conn.flush(); + logInfo("wait for next request"); } + logInfo("out"); } catch (Exception e) { scope (failure) assert(false); logError("Error processing request: %s", e.msg); @@ -40,103 +47,14 @@ struct CountingRange { import std.range.primitives : isOutputRange; -/** - Reads and returns a single line from the stream. - - Throws: - An exception if either the stream end was hit without hitting a newline first, or - if more than max_bytes have been read from the stream. -*/ -ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", Allocator alloc = defaultAllocator()) /*@ufcs*/ -{ - auto output = AllocAppender!(ubyte[])(alloc); - output.reserve(max_bytes < 64 ? max_bytes : 64); - readLine(stream, output, max_bytes, linesep); - return output.data(); -} -/// ditto -void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n") -{ - import vibe.stream.wrapper; - auto dstrng = StreamOutputRange(dst); - readLine(stream, dstrng, max_bytes, linesep); -} -/// ditto -void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n") +void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max) if (isOutputRange!(R, ubyte)) { - readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes); -} - - -/** - Reads all data of a stream until the specified end marker is detected. - - Params: - stream = The input stream which is searched for end_marker - end_marker = The byte sequence which is searched in the stream - max_bytes = An optional limit of how much data is to be read from the - input stream; if the limit is reaached before hitting the end - marker, an exception is thrown. - alloc = An optional allocator that is used to build the result string - in the string variant of this function - dst = The output stream, to which the prefix to the end marker of the - input stream is written - - Returns: - The string variant of this function returns the complete prefix to the - end marker of the input stream, excluding the end marker itself. - - Throws: - An exception if either the stream end was hit without hitting a marker - first, or if more than max_bytes have been read from the stream in - case of max_bytes != 0. - - Remarks: - This function uses an algorithm inspired by the - $(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm, - Boyer-Moore string search algorithm). However, contrary to the original - algorithm, it will scan the whole input string exactly once, without - jumping over portions of it. This allows the algorithm to work with - constant memory requirements and without the memory copies that would - be necessary for streams that do not hold their complete data in - memory. - - The current implementation has a run time complexity of O(n*m+m²) and - O(n+m) in typical cases, with n being the length of the scanned input - string and m the length of the marker. -*/ -ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, Allocator alloc = defaultAllocator()) /*@ufcs*/ -{ - auto output = AllocAppender!(ubyte[])(alloc); - output.reserve(max_bytes < 64 ? max_bytes : 64); - readUntil(stream, output, end_marker, max_bytes); - return output.data(); -} -/// ditto -void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ -{ - import vibe.stream.wrapper; - auto dstrng = StreamOutputRange(dst); - readUntil(stream, dstrng, end_marker, max_bytes); -} -/// ditto -void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ - if (isOutputRange!(R, ubyte)) -{ - assert(max_bytes > 0 && end_marker.length > 0); - - if (end_marker.length <= 2) - readUntilSmall(stream, dst, end_marker, max_bytes); - else - readUntilGeneric(stream, dst, end_marker, max_bytes); -} - -private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) -@safe { import std.algorithm.comparison : min, max; import std.algorithm.searching : countUntil; + enum end_marker = "\r\n"; + assert(end_marker.length >= 1 && end_marker.length <= 2); size_t nmatched = 0; @@ -186,106 +104,6 @@ private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ub } } -private final class Buffer { ubyte[64*1024-4*size_t.sizeof] bytes = void; } // 64k - some headroom for - -private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ - if (isOutputRange!(R, ubyte)) -{ - import std.algorithm.comparison : min; - // allocate internal jump table to optimize the number of comparisons - size_t[8] nmatchoffsetbuffer = void; - size_t[] nmatchoffset; - if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length]; - else nmatchoffset = new size_t[end_marker.length]; - - // precompute the jump table - nmatchoffset[0] = 0; - foreach( i; 1 .. end_marker.length ){ - nmatchoffset[i] = i; - foreach_reverse( j; 1 .. i ) - if( end_marker[j .. i] == end_marker[0 .. i-j] ){ - nmatchoffset[i] = i-j; - break; - } - assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i); - } - - size_t nmatched = 0; - scope bufferobj = new Buffer; // FIXME: use heap allocation - auto buf = bufferobj.bytes[]; - - ulong bytes_read = 0; - - void skip2(size_t nbytes) - { - bytes_read += nbytes; - stream.skip(nbytes); - } - - while( !stream.empty ){ - enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker."); - - // try to get as much data as possible, either by peeking into the stream or - // by reading as much as isguaranteed to not exceed the end marker length - // the block size is also always limited by the max_bytes parameter. - size_t nread = 0; - auto least_size = stream.leastSize(); // NOTE: blocks until data is available - auto max_read = max_bytes - bytes_read; - auto str = stream.peek(); // try to get some data for free - if( str.length == 0 ){ // if not, read as much as possible without reading past the end - nread = min(least_size, end_marker.length-nmatched, buf.length, max_read); - stream.read(buf[0 .. nread]); - str = buf[0 .. nread]; - bytes_read += nread; - } else if( str.length > max_read ){ - str.length = cast(size_t)max_read; - } - - // remember how much of the marker was already matched before processing the current block - size_t nmatched_start = nmatched; - - // go through the current block trying to match the marker - size_t i = 0; - for (i = 0; i < str.length; i++) { - auto ch = str[i]; - // if we have a mismatch, use the jump table to try other possible prefixes - // of the marker - while( nmatched > 0 && ch != end_marker[nmatched] ) - nmatched -= nmatchoffset[nmatched]; - - // if we then have a match, increase the match count and test for full match - if (ch == end_marker[nmatched]) - if (++nmatched == end_marker.length) { - i++; - break; - } - } - - - // write out any false match part of previous blocks - if( nmatched_start > 0 ){ - if( nmatched <= i ) dst.put(end_marker[0 .. nmatched_start]); - else dst.put(end_marker[0 .. nmatched_start-nmatched+i]); - } - - // write out any unmatched part of the current block - if( nmatched < i ) dst.put(str[0 .. i-nmatched]); - - // got a full, match => out - if (nmatched >= end_marker.length) { - // in case of a full match skip data in the stream until the end of - // the marker - skip2(i - nread); - return; - } - - // otherwise skip this block in the stream - skip2(str.length - nread); - } - - enforce(false, "Reached EOF before reaching end marker."); -} - static if (!is(typeof(TCPConnection.init.skip(0)))) { private void skip(ref TCPConnection str, ulong count) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 9ed226f..0b20a50 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -627,10 +627,10 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, mixin(callWithMove!ARGS("c", "args.expand")); } - TaskFuncInfo tfi; - tfi.func = &callDelegate; + return () @trusted { + TaskFuncInfo tfi; + tfi.func = &callDelegate; - () @trusted { static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); tfi.typedCallable!CALLABLE = callable; @@ -638,8 +638,8 @@ private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); else tfi.typedArgs!TARGS.expand[i] = args[i]; } + return tfi; } (); - return tfi; } @@ -1094,7 +1094,7 @@ struct Timer { if (!this.pending) return; asyncAwait!(TimerCallback, cb => m_driver.wait(m_id, cb), - cb => m_driver.cancelWait(m_id, cb) + cb => m_driver.cancelWait(m_id) ); } } @@ -1360,7 +1360,7 @@ private void shutdownDriver() ManualEvent.ms_threadEvent = EventID.init; } - eventDriver.core.dispose(); + eventDriver.dispose(); } private void workerThreadFunc() diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 69c8403..0bb4cf0 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -321,7 +321,7 @@ struct TCPConnection { private this(StreamSocketFD socket) nothrow { m_socket = socket; - m_context = &eventDriver.core.userData!Context(socket); + m_context = () @trusted { return &eventDriver.core.userData!Context(socket); } (); m_context.readBuffer.capacity = 4096; } @@ -360,7 +360,7 @@ struct TCPConnection { nothrow { //logInfo("close %s", cast(int)m_fd); if (m_socket != StreamSocketFD.invalid) { - eventDriver.sockets.shutdown(m_socket); + eventDriver.sockets.shutdown(m_socket, true, true); eventDriver.sockets.releaseRef(m_socket); m_socket = StreamSocketFD.invalid; m_context = null; diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index d975421..9378adc 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -323,7 +323,7 @@ final package class TaskFiber : Fiber { private void run() { import std.encoding : sanitize; - import std.concurrency : Tid; + import std.concurrency : Tid, thisTid; import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield; version (VibeDebugCatchAll) alias UncaughtException = Throwable; @@ -346,12 +346,12 @@ final package class TaskFiber : Fiber { m_running = true; scope(exit) m_running = false; - std.concurrency.thisTid; // force creation of a message box + thisTid; // force creation of a message box debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); if (!isEventLoopRunning) { logTrace("Event loop not running at task start - yielding."); - vibe.core.core.taskScheduler.yieldUninterruptible(); + taskScheduler.yieldUninterruptible(); logTrace("Initial resume of task."); } task.func(&task); @@ -597,7 +597,7 @@ package struct TaskScheduler { // if the first run didn't process any events, block and // process one chunk logTrace("Wait for new events to process..."); - er = eventDriver.core.processEvents(); + er = eventDriver.core.processEvents(Duration.max); logTrace("Done."); final switch (er) { case ExitReason.exited: return ExitReason.exited; diff --git a/source/vibe/internal/array.d b/source/vibe/internal/array.d index c6de5d9..51e136c 100644 --- a/source/vibe/internal/array.d +++ b/source/vibe/internal/array.d @@ -332,12 +332,16 @@ struct FixedRingBuffer(T, size_t N = 0, bool INITIALIZE = true) { auto dst = newbuffer; auto newfill = min(m_fill, new_size); read(dst[0 .. newfill]); - if (m_freeOnDestruct && m_buffer.length > 0) delete m_buffer; + if (m_freeOnDestruct && m_buffer.length > 0) () @trusted { + delete m_buffer; + } (); m_buffer = newbuffer; m_start = 0; m_fill = newfill; } else { - if (m_freeOnDestruct && m_buffer.length > 0) delete m_buffer; + if (m_freeOnDestruct && m_buffer.length > 0) () @trusted { + delete m_buffer; + } (); m_buffer = new T[new_size]; } } diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index 8f7611d..03a4b05 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -51,11 +51,11 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) import eventcore.core; auto tm = eventDriver.timers.create(); - eventDriver.timers.set(tm, timeout); + eventDriver.timers.set(tm, timeout, 0.seconds); scope (exit) eventDriver.timers.releaseRef(tm); Waitable!( cb => eventDriver.timers.wait(tm, cb), - cb => eventDriver.timers.cancelWait(tm, cb), + cb => eventDriver.timers.cancelWait(tm), TimerID ) timerwaitable; asyncAwaitAny!(interruptible, func)(timerwaitable, waitables); diff --git a/tests/vibe.core.net.1452/source/app.d b/tests/vibe.core.net.1452/source/app.d index c0ac16e..3420b06 100644 --- a/tests/vibe.core.net.1452/source/app.d +++ b/tests/vibe.core.net.1452/source/app.d @@ -7,7 +7,7 @@ class C { this() { - m_conn = connectTCP("google.com", 443); + m_conn = connectTCP("example.com", 443); } ~this()