diff --git a/dub.sdl b/dub.sdl index 3800af6..a79ff4c 100644 --- a/dub.sdl +++ b/dub.sdl @@ -4,4 +4,10 @@ authors "Sönke Ludwig" copyright "Copyright © 2016, rejectedsoftware e.K." license "MIT" -dependency "eventcore" version="*" +dependency "eventcore" version="~>0.3.0" + +//subConfiguration "eventcore" "libasync" + +configuration "unittest" { + versions "VibeMutexLog" "VibeAsyncLog" +} diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index ed90bda..bdf2509 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -122,7 +122,10 @@ unittest { { // first, perform any application specific setup (privileged ports still // available if run as root) - listenTCP(7, (conn) { conn.write(conn); }); + listenTCP(7, (conn) { + try conn.write(conn); + catch (Exception e) { /* log error */ } + }); // then use runApplication to perform the remaining initialization and // to run the event loop @@ -147,7 +150,10 @@ unittest { if (!finalizeCommandLineOptions()) return 0; // then set up the application - listenTCP(7, (conn) { conn.write(conn); }); + listenTCP(7, (conn) { + try conn.write(conn); + catch (Exception e) { /* log error */ } + }); // finally, perform privilege lowering (safe to skip for non-server // applications) @@ -214,8 +220,8 @@ int runEventLoop() } logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).", - s_scheduler.scheduledTaskCount, eventDriver.waiterCount, s_exitEventLoop); - eventDriver.clearExitFlag(); + s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop); + eventDriver.core.clearExitFlag(); s_exitEventLoop = false; return 0; } @@ -246,7 +252,7 @@ void exitEventLoop(bool shutdown_all_threads = false) // shutdown the calling thread s_exitEventLoop = true; - if (s_eventLoopRunning) eventDriver.exit(); + if (s_eventLoopRunning) eventDriver.core.exit(); } /** @@ -849,10 +855,10 @@ unittest { */ Timer createTimer(void delegate() nothrow @safe callback) @safe nothrow { - auto ret = Timer(eventDriver.createTimer()); + auto ret = Timer(eventDriver.timers.create()); if (callback !is null) { void cb(TimerID tm) nothrow @safe { callback(); } - eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure! + eventDriver.timers.wait(ret.m_id, &cb); // FIXME: avoid heap closure! } return ret; } @@ -1036,7 +1042,7 @@ struct FileDescriptorEvent { */ struct Timer { private { - EventDriver m_driver; + typeof(eventDriver.timers) m_driver; TimerID m_id; debug uint m_magicNumber = 0x4d34f916; } @@ -1046,7 +1052,7 @@ struct Timer { private this(TimerID id) nothrow { assert(id != TimerID.init, "Invalid timer ID."); - m_driver = eventDriver; + m_driver = eventDriver.timers; m_id = id; } @@ -1063,7 +1069,7 @@ struct Timer { } /// True if the timer is yet to fire. - @property bool pending() nothrow { return m_driver.isTimerPending(m_id); } + @property bool pending() nothrow { return m_driver.isPending(m_id); } /// The internal ID of the timer. @property size_t id() const nothrow { return m_id; } @@ -1074,21 +1080,21 @@ struct Timer { */ void rearm(Duration dur, bool periodic = false) nothrow in { assert(dur > 0.seconds, "Negative timer duration specified."); } - body { m_driver.setTimer(m_id, dur, periodic ? dur : 0.seconds); } + body { m_driver.set(m_id, dur, periodic ? dur : 0.seconds); } /** Resets the timer and avoids any firing. */ - void stop() nothrow { m_driver.stopTimer(m_id); } + void stop() nothrow { m_driver.stop(m_id); } /** Waits until the timer fires. */ void wait() { - assert (!m_driver.isTimerPeriodic(m_id), "Cannot wait for a periodic timer."); + assert (!m_driver.isPeriodic(m_id), "Cannot wait for a periodic timer."); if (!this.pending) return; asyncAwait!(TimerCallback, - cb => m_driver.waitTimer(m_id, cb), - cb => m_driver.cancelTimerWait(m_id, cb) + cb => m_driver.wait(m_id, cb), + cb => m_driver.cancelWait(m_id, cb) ); } } @@ -1122,7 +1128,7 @@ package(vibe) void performIdleProcessing() again = (s_scheduler.schedule() || again) && !getExitFlag(); if (again) { - auto er = eventDriver.processEvents(0.seconds); + auto er = eventDriver.core.processEvents(0.seconds); if (er.among!(ExitReason.exited, ExitReason.outOfWaiters)) { logDebug("Setting exit flag due to driver signalling exit"); s_exitEventLoop = true; @@ -1355,11 +1361,11 @@ static ~this() private void shutdownDriver() { if (ManualEvent.ms_threadEvent != EventID.init) { - eventDriver.releaseRef(ManualEvent.ms_threadEvent); + eventDriver.events.releaseRef(ManualEvent.ms_threadEvent); ManualEvent.ms_threadEvent = EventID.init; } - eventDriver.dispose(); + eventDriver.core.dispose(); } private void workerThreadFunc() @@ -1428,7 +1434,7 @@ private void handleWorkerTasks() } logDebug("worker thread exit"); - eventDriver.exit(); + eventDriver.core.exit(); } private void watchExitFlag() @@ -1443,7 +1449,7 @@ private void watchExitFlag() } logDebug("main thread exit"); - eventDriver.exit(); + eventDriver.core.exit(); } private extern(C) void extrap() diff --git a/source/vibe/core/net.d b/source/vibe/core/net.d index 2473674..69c8403 100644 --- a/source/vibe/core/net.d +++ b/source/vibe/core/net.d @@ -80,7 +80,7 @@ TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, st { auto addr = resolveHost(address); addr.port = port; - auto sock = eventDriver.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow { + auto sock = eventDriver.sockets.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow { import vibe.core.core : runTask; runTask(connection_callback, TCPConnection(s)); }); @@ -120,8 +120,8 @@ TCPConnection connectTCP(NetworkAddress addr) addr.toUnknownAddress(uaddr); // FIXME: make this interruptible auto result = asyncAwaitUninterruptible!(ConnectCallback, - cb => eventDriver.connectStream(uaddr, cb) - //cb => eventDriver.cancelConnect(cb) + cb => eventDriver.sockets.connectStream(uaddr, cb) + //cb => eventDriver.sockets.cancelConnect(cb) ); enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); return TCPConnection(result[0]); @@ -321,23 +321,23 @@ struct TCPConnection { private this(StreamSocketFD socket) nothrow { m_socket = socket; - m_context = &eventDriver.userData!Context(socket); + m_context = &eventDriver.core.userData!Context(socket); m_context.readBuffer.capacity = 4096; } this(this) nothrow { if (m_socket != StreamSocketFD.invalid) - eventDriver.addRef(m_socket); + eventDriver.sockets.addRef(m_socket); } ~this() nothrow { if (m_socket != StreamSocketFD.invalid) - eventDriver.releaseRef(m_socket); + eventDriver.sockets.releaseRef(m_socket); } - @property void tcpNoDelay(bool enabled) { eventDriver.setTCPNoDelay(m_socket, enabled); } + @property void tcpNoDelay(bool enabled) { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); } @property bool tcpNoDelay() const { assert(false); } @property void keepAlive(bool enable) { assert(false); } @property bool keepAlive() const { assert(false); } @@ -349,7 +349,7 @@ struct TCPConnection { @property bool connected() const { if (m_socket == StreamSocketFD.invalid) return false; - auto s = eventDriver.getConnectionState(m_socket); + auto s = eventDriver.sockets.getConnectionState(m_socket); return s >= ConnectionState.connected && s < ConnectionState.activeClose; } @property bool empty() { return leastSize == 0; } @@ -360,8 +360,8 @@ struct TCPConnection { nothrow { //logInfo("close %s", cast(int)m_fd); if (m_socket != StreamSocketFD.invalid) { - eventDriver.shutdownSocket(m_socket); - eventDriver.releaseRef(m_socket); + eventDriver.sockets.shutdown(m_socket); + eventDriver.sockets.releaseRef(m_socket); m_socket = StreamSocketFD.invalid; m_context = null; } @@ -374,8 +374,8 @@ mixin(tracer); if (m_context.readBuffer.length > 0) return true; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; auto res = asyncAwait!(IOCallback, - cb => eventDriver.readSocket(m_socket, m_context.readBuffer.peekDst(), mode, cb), - cb => eventDriver.cancelRead(m_socket) + cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), + cb => eventDriver.sockets.cancelRead(m_socket) ); logTrace("Socket %s, read %s bytes: %s", res[0], res[2], res[1]); @@ -426,8 +426,8 @@ mixin(tracer); if (bytes.length == 0) return; auto res = asyncAwait!(IOCallback, - cb => eventDriver.writeSocket(m_socket, bytes, IOMode.all, cb), - cb => eventDriver.cancelWrite(m_socket)); + cb => eventDriver.sockets.write(m_socket, bytes, IOMode.all, cb), + cb => eventDriver.sockets.cancelWrite(m_socket)); switch (res[1]) { default: diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index b05cf39..22fdeb6 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -734,7 +734,7 @@ struct ManualEvent { auto drv = w.driver; auto evt = w.event; if (evt != EventID.init) - (cast(shared)drv).triggerEvent(evt, true); + (cast(shared)drv.events).trigger(evt, true); } w = wnext; } @@ -855,8 +855,8 @@ struct ManualEvent { // if we are the first waiter for this thread, // wait for the thread event to get emitted Waitable!( - cb => eventDriver.waitForEvent(ms_threadEvent, cb), - cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb), + cb => eventDriver.events.wait(ms_threadEvent, cb), + cb => eventDriver.events.cancelWait(ms_threadEvent, cb), EventID ) eventwaiter; Waitable!( @@ -924,7 +924,7 @@ struct ManualEvent { tw.task = Task.getThis(); if (ms_threadEvent == EventID.init) - ms_threadEvent = eventDriver.createEvent(); + ms_threadEvent = eventDriver.events.create(); auto sdriver = cast(shared)eventDriver; diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index a7e306c..d975421 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -548,7 +548,7 @@ package struct TaskScheduler { schedule(); logTrace("Processing pending events..."); - ExitReason er = eventDriver.processEvents(0.seconds); + ExitReason er = eventDriver.core.processEvents(0.seconds); logTrace("Done."); final switch (er) { @@ -597,7 +597,7 @@ package struct TaskScheduler { // if the first run didn't process any events, block and // process one chunk logTrace("Wait for new events to process..."); - er = eventDriver.processEvents(); + er = eventDriver.core.processEvents(); logTrace("Done."); final switch (er) { case ExitReason.exited: return ExitReason.exited; diff --git a/source/vibe/internal/async.d b/source/vibe/internal/async.d index bafb03c..8f7611d 100644 --- a/source/vibe/internal/async.d +++ b/source/vibe/internal/async.d @@ -50,12 +50,12 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...) else { import eventcore.core; - auto tm = eventDriver.createTimer(); - eventDriver.setTimer(tm, timeout); - scope (exit) eventDriver.releaseRef(tm); + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, timeout); + scope (exit) eventDriver.timers.releaseRef(tm); Waitable!( - cb => eventDriver.waitTimer(tm, cb), - cb => eventDriver.cancelTimerWait(tm, cb), + cb => eventDriver.timers.wait(tm, cb), + cb => eventDriver.timers.cancelWait(tm, cb), TimerID ) timerwaitable; asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);