Update for latest eventcore version.

This commit is contained in:
Sönke Ludwig 2016-10-05 14:40:29 +02:00
parent 914e9ad894
commit e294d24a4b
6 changed files with 58 additions and 46 deletions

View file

@ -4,4 +4,10 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016, rejectedsoftware e.K." copyright "Copyright © 2016, rejectedsoftware e.K."
license "MIT" license "MIT"
dependency "eventcore" version="*" dependency "eventcore" version="~>0.3.0"
//subConfiguration "eventcore" "libasync"
configuration "unittest" {
versions "VibeMutexLog" "VibeAsyncLog"
}

View file

@ -122,7 +122,10 @@ unittest {
{ {
// first, perform any application specific setup (privileged ports still // first, perform any application specific setup (privileged ports still
// available if run as root) // available if run as root)
listenTCP(7, (conn) { conn.write(conn); }); listenTCP(7, (conn) {
try conn.write(conn);
catch (Exception e) { /* log error */ }
});
// then use runApplication to perform the remaining initialization and // then use runApplication to perform the remaining initialization and
// to run the event loop // to run the event loop
@ -147,7 +150,10 @@ unittest {
if (!finalizeCommandLineOptions()) return 0; if (!finalizeCommandLineOptions()) return 0;
// then set up the application // then set up the application
listenTCP(7, (conn) { conn.write(conn); }); listenTCP(7, (conn) {
try conn.write(conn);
catch (Exception e) { /* log error */ }
});
// finally, perform privilege lowering (safe to skip for non-server // finally, perform privilege lowering (safe to skip for non-server
// applications) // applications)
@ -214,8 +220,8 @@ int runEventLoop()
} }
logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).", logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).",
s_scheduler.scheduledTaskCount, eventDriver.waiterCount, s_exitEventLoop); s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop);
eventDriver.clearExitFlag(); eventDriver.core.clearExitFlag();
s_exitEventLoop = false; s_exitEventLoop = false;
return 0; return 0;
} }
@ -246,7 +252,7 @@ void exitEventLoop(bool shutdown_all_threads = false)
// shutdown the calling thread // shutdown the calling thread
s_exitEventLoop = true; s_exitEventLoop = true;
if (s_eventLoopRunning) eventDriver.exit(); if (s_eventLoopRunning) eventDriver.core.exit();
} }
/** /**
@ -849,10 +855,10 @@ unittest {
*/ */
Timer createTimer(void delegate() nothrow @safe callback) Timer createTimer(void delegate() nothrow @safe callback)
@safe nothrow { @safe nothrow {
auto ret = Timer(eventDriver.createTimer()); auto ret = Timer(eventDriver.timers.create());
if (callback !is null) { if (callback !is null) {
void cb(TimerID tm) nothrow @safe { callback(); } void cb(TimerID tm) nothrow @safe { callback(); }
eventDriver.waitTimer(ret.m_id, &cb); // FIXME: avoid heap closure! eventDriver.timers.wait(ret.m_id, &cb); // FIXME: avoid heap closure!
} }
return ret; return ret;
} }
@ -1036,7 +1042,7 @@ struct FileDescriptorEvent {
*/ */
struct Timer { struct Timer {
private { private {
EventDriver m_driver; typeof(eventDriver.timers) m_driver;
TimerID m_id; TimerID m_id;
debug uint m_magicNumber = 0x4d34f916; debug uint m_magicNumber = 0x4d34f916;
} }
@ -1046,7 +1052,7 @@ struct Timer {
private this(TimerID id) private this(TimerID id)
nothrow { nothrow {
assert(id != TimerID.init, "Invalid timer ID."); assert(id != TimerID.init, "Invalid timer ID.");
m_driver = eventDriver; m_driver = eventDriver.timers;
m_id = id; m_id = id;
} }
@ -1063,7 +1069,7 @@ struct Timer {
} }
/// True if the timer is yet to fire. /// True if the timer is yet to fire.
@property bool pending() nothrow { return m_driver.isTimerPending(m_id); } @property bool pending() nothrow { return m_driver.isPending(m_id); }
/// The internal ID of the timer. /// The internal ID of the timer.
@property size_t id() const nothrow { return m_id; } @property size_t id() const nothrow { return m_id; }
@ -1074,21 +1080,21 @@ struct Timer {
*/ */
void rearm(Duration dur, bool periodic = false) nothrow void rearm(Duration dur, bool periodic = false) nothrow
in { assert(dur > 0.seconds, "Negative timer duration specified."); } in { assert(dur > 0.seconds, "Negative timer duration specified."); }
body { m_driver.setTimer(m_id, dur, periodic ? dur : 0.seconds); } body { m_driver.set(m_id, dur, periodic ? dur : 0.seconds); }
/** Resets the timer and avoids any firing. /** Resets the timer and avoids any firing.
*/ */
void stop() nothrow { m_driver.stopTimer(m_id); } void stop() nothrow { m_driver.stop(m_id); }
/** Waits until the timer fires. /** Waits until the timer fires.
*/ */
void wait() void wait()
{ {
assert (!m_driver.isTimerPeriodic(m_id), "Cannot wait for a periodic timer."); assert (!m_driver.isPeriodic(m_id), "Cannot wait for a periodic timer.");
if (!this.pending) return; if (!this.pending) return;
asyncAwait!(TimerCallback, asyncAwait!(TimerCallback,
cb => m_driver.waitTimer(m_id, cb), cb => m_driver.wait(m_id, cb),
cb => m_driver.cancelTimerWait(m_id, cb) cb => m_driver.cancelWait(m_id, cb)
); );
} }
} }
@ -1122,7 +1128,7 @@ package(vibe) void performIdleProcessing()
again = (s_scheduler.schedule() || again) && !getExitFlag(); again = (s_scheduler.schedule() || again) && !getExitFlag();
if (again) { if (again) {
auto er = eventDriver.processEvents(0.seconds); auto er = eventDriver.core.processEvents(0.seconds);
if (er.among!(ExitReason.exited, ExitReason.outOfWaiters)) { if (er.among!(ExitReason.exited, ExitReason.outOfWaiters)) {
logDebug("Setting exit flag due to driver signalling exit"); logDebug("Setting exit flag due to driver signalling exit");
s_exitEventLoop = true; s_exitEventLoop = true;
@ -1355,11 +1361,11 @@ static ~this()
private void shutdownDriver() private void shutdownDriver()
{ {
if (ManualEvent.ms_threadEvent != EventID.init) { if (ManualEvent.ms_threadEvent != EventID.init) {
eventDriver.releaseRef(ManualEvent.ms_threadEvent); eventDriver.events.releaseRef(ManualEvent.ms_threadEvent);
ManualEvent.ms_threadEvent = EventID.init; ManualEvent.ms_threadEvent = EventID.init;
} }
eventDriver.dispose(); eventDriver.core.dispose();
} }
private void workerThreadFunc() private void workerThreadFunc()
@ -1428,7 +1434,7 @@ private void handleWorkerTasks()
} }
logDebug("worker thread exit"); logDebug("worker thread exit");
eventDriver.exit(); eventDriver.core.exit();
} }
private void watchExitFlag() private void watchExitFlag()
@ -1443,7 +1449,7 @@ private void watchExitFlag()
} }
logDebug("main thread exit"); logDebug("main thread exit");
eventDriver.exit(); eventDriver.core.exit();
} }
private extern(C) void extrap() private extern(C) void extrap()

View file

@ -80,7 +80,7 @@ TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, st
{ {
auto addr = resolveHost(address); auto addr = resolveHost(address);
addr.port = port; addr.port = port;
auto sock = eventDriver.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow { auto sock = eventDriver.sockets.listenStream(addr.toUnknownAddress, (StreamListenSocketFD ls, StreamSocketFD s) @safe nothrow {
import vibe.core.core : runTask; import vibe.core.core : runTask;
runTask(connection_callback, TCPConnection(s)); runTask(connection_callback, TCPConnection(s));
}); });
@ -120,8 +120,8 @@ TCPConnection connectTCP(NetworkAddress addr)
addr.toUnknownAddress(uaddr); addr.toUnknownAddress(uaddr);
// FIXME: make this interruptible // FIXME: make this interruptible
auto result = asyncAwaitUninterruptible!(ConnectCallback, auto result = asyncAwaitUninterruptible!(ConnectCallback,
cb => eventDriver.connectStream(uaddr, cb) cb => eventDriver.sockets.connectStream(uaddr, cb)
//cb => eventDriver.cancelConnect(cb) //cb => eventDriver.sockets.cancelConnect(cb)
); );
enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string); enforce(result[1] == ConnectStatus.connected, "Failed to connect to "~addr.toString()~": "~result[1].to!string);
return TCPConnection(result[0]); return TCPConnection(result[0]);
@ -321,23 +321,23 @@ struct TCPConnection {
private this(StreamSocketFD socket) private this(StreamSocketFD socket)
nothrow { nothrow {
m_socket = socket; m_socket = socket;
m_context = &eventDriver.userData!Context(socket); m_context = &eventDriver.core.userData!Context(socket);
m_context.readBuffer.capacity = 4096; m_context.readBuffer.capacity = 4096;
} }
this(this) this(this)
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != StreamSocketFD.invalid)
eventDriver.addRef(m_socket); eventDriver.sockets.addRef(m_socket);
} }
~this() ~this()
nothrow { nothrow {
if (m_socket != StreamSocketFD.invalid) if (m_socket != StreamSocketFD.invalid)
eventDriver.releaseRef(m_socket); eventDriver.sockets.releaseRef(m_socket);
} }
@property void tcpNoDelay(bool enabled) { eventDriver.setTCPNoDelay(m_socket, enabled); } @property void tcpNoDelay(bool enabled) { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); }
@property bool tcpNoDelay() const { assert(false); } @property bool tcpNoDelay() const { assert(false); }
@property void keepAlive(bool enable) { assert(false); } @property void keepAlive(bool enable) { assert(false); }
@property bool keepAlive() const { assert(false); } @property bool keepAlive() const { assert(false); }
@ -349,7 +349,7 @@ struct TCPConnection {
@property bool connected() @property bool connected()
const { const {
if (m_socket == StreamSocketFD.invalid) return false; if (m_socket == StreamSocketFD.invalid) return false;
auto s = eventDriver.getConnectionState(m_socket); auto s = eventDriver.sockets.getConnectionState(m_socket);
return s >= ConnectionState.connected && s < ConnectionState.activeClose; return s >= ConnectionState.connected && s < ConnectionState.activeClose;
} }
@property bool empty() { return leastSize == 0; } @property bool empty() { return leastSize == 0; }
@ -360,8 +360,8 @@ struct TCPConnection {
nothrow { nothrow {
//logInfo("close %s", cast(int)m_fd); //logInfo("close %s", cast(int)m_fd);
if (m_socket != StreamSocketFD.invalid) { if (m_socket != StreamSocketFD.invalid) {
eventDriver.shutdownSocket(m_socket); eventDriver.sockets.shutdown(m_socket);
eventDriver.releaseRef(m_socket); eventDriver.sockets.releaseRef(m_socket);
m_socket = StreamSocketFD.invalid; m_socket = StreamSocketFD.invalid;
m_context = null; m_context = null;
} }
@ -374,8 +374,8 @@ mixin(tracer);
if (m_context.readBuffer.length > 0) return true; if (m_context.readBuffer.length > 0) return true;
auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
auto res = asyncAwait!(IOCallback, auto res = asyncAwait!(IOCallback,
cb => eventDriver.readSocket(m_socket, m_context.readBuffer.peekDst(), mode, cb), cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb),
cb => eventDriver.cancelRead(m_socket) cb => eventDriver.sockets.cancelRead(m_socket)
); );
logTrace("Socket %s, read %s bytes: %s", res[0], res[2], res[1]); logTrace("Socket %s, read %s bytes: %s", res[0], res[2], res[1]);
@ -426,8 +426,8 @@ mixin(tracer);
if (bytes.length == 0) return; if (bytes.length == 0) return;
auto res = asyncAwait!(IOCallback, auto res = asyncAwait!(IOCallback,
cb => eventDriver.writeSocket(m_socket, bytes, IOMode.all, cb), cb => eventDriver.sockets.write(m_socket, bytes, IOMode.all, cb),
cb => eventDriver.cancelWrite(m_socket)); cb => eventDriver.sockets.cancelWrite(m_socket));
switch (res[1]) { switch (res[1]) {
default: default:

View file

@ -734,7 +734,7 @@ struct ManualEvent {
auto drv = w.driver; auto drv = w.driver;
auto evt = w.event; auto evt = w.event;
if (evt != EventID.init) if (evt != EventID.init)
(cast(shared)drv).triggerEvent(evt, true); (cast(shared)drv.events).trigger(evt, true);
} }
w = wnext; w = wnext;
} }
@ -855,8 +855,8 @@ struct ManualEvent {
// if we are the first waiter for this thread, // if we are the first waiter for this thread,
// wait for the thread event to get emitted // wait for the thread event to get emitted
Waitable!( Waitable!(
cb => eventDriver.waitForEvent(ms_threadEvent, cb), cb => eventDriver.events.wait(ms_threadEvent, cb),
cb => eventDriver.cancelWaitForEvent(ms_threadEvent, cb), cb => eventDriver.events.cancelWait(ms_threadEvent, cb),
EventID EventID
) eventwaiter; ) eventwaiter;
Waitable!( Waitable!(
@ -924,7 +924,7 @@ struct ManualEvent {
tw.task = Task.getThis(); tw.task = Task.getThis();
if (ms_threadEvent == EventID.init) if (ms_threadEvent == EventID.init)
ms_threadEvent = eventDriver.createEvent(); ms_threadEvent = eventDriver.events.create();
auto sdriver = cast(shared)eventDriver; auto sdriver = cast(shared)eventDriver;

View file

@ -548,7 +548,7 @@ package struct TaskScheduler {
schedule(); schedule();
logTrace("Processing pending events..."); logTrace("Processing pending events...");
ExitReason er = eventDriver.processEvents(0.seconds); ExitReason er = eventDriver.core.processEvents(0.seconds);
logTrace("Done."); logTrace("Done.");
final switch (er) { final switch (er) {
@ -597,7 +597,7 @@ package struct TaskScheduler {
// if the first run didn't process any events, block and // if the first run didn't process any events, block and
// process one chunk // process one chunk
logTrace("Wait for new events to process..."); logTrace("Wait for new events to process...");
er = eventDriver.processEvents(); er = eventDriver.core.processEvents();
logTrace("Done."); logTrace("Done.");
final switch (er) { final switch (er) {
case ExitReason.exited: return ExitReason.exited; case ExitReason.exited: return ExitReason.exited;

View file

@ -50,12 +50,12 @@ void asyncAwaitAny(bool interruptible, string func = __FUNCTION__, Waitables...)
else { else {
import eventcore.core; import eventcore.core;
auto tm = eventDriver.createTimer(); auto tm = eventDriver.timers.create();
eventDriver.setTimer(tm, timeout); eventDriver.timers.set(tm, timeout);
scope (exit) eventDriver.releaseRef(tm); scope (exit) eventDriver.timers.releaseRef(tm);
Waitable!( Waitable!(
cb => eventDriver.waitTimer(tm, cb), cb => eventDriver.timers.wait(tm, cb),
cb => eventDriver.cancelTimerWait(tm, cb), cb => eventDriver.timers.cancelWait(tm, cb),
TimerID TimerID
) timerwaitable; ) timerwaitable;
asyncAwaitAny!(interruptible, func)(timerwaitable, waitables); asyncAwaitAny!(interruptible, func)(timerwaitable, waitables);