diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index ce2cf7b..41d3517 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -48,7 +48,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS auto id = cast(EventID)eid; // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); m_loop.registerFD(id, EventMask.read); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return @@ -106,7 +106,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS catch (Exception e) assert(false, e.msg); // FIXME: avoid dynamic memory allocation for the queue m_loop.initFD(id, FDFlags.internal, - EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); + EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); assert(getRC(id) == 1); return id; } @@ -133,13 +133,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS final override void trigger(EventID event, bool notify_all) shared @trusted @nogc { import core.atomic : atomicStore; - auto thisus = cast(PosixEventDriverEvents)this; - assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent."); - long one = 1; + long count = notify_all ? long.max : 1; //log("emitting for all threads"); - if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); - version (Posix) .write(cast(int)event, &one, one.sizeof); - else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof); + version (Posix) .write(cast(int)event, &count, count.sizeof); + else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof); } final override void wait(EventID event, EventCallback on_event) @@ -157,26 +154,23 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS getSlot(event).waiters.removePending(on_event); } - private void onEvent(FD fd) - @trusted { - EventID event = cast(EventID)fd; - version (linux) { + version (linux) { + private void onEvent(FD fd) + @trusted { + EventID event = cast(EventID)fd; ulong cnt; () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); + trigger(event, cnt > 0); } - import core.atomic : cas; - auto all = cas(&getSlot(event).triggerAll, true, false); - trigger(event, all); - } - - version (linux) {} - else { + } else { private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) @nogc { m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); try { EventID evt = m_sockets.userData!EventID(s); - scope doit = { onEvent(evt); }; // cast to nogc + scope doit = { + trigger(evt, (cast(long[])m_buf)[0] > 1); + }; // cast to nogc () @trusted { (cast(void delegate() @nogc)doit)(); } (); } catch (Exception e) assert(false, e.msg); } @@ -238,7 +232,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS package struct EventSlot { alias Handle = EventID; ConsumableQueue!EventCallback waiters; - shared bool triggerAll; bool isInternal; version (linux) {} else { diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 4a0a646..d92502e 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start write task"); try { - m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeWrites.remove(file); @@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil threadSetup(); log("start read task"); try { - m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); + auto thiss = () @trusted { return cast(shared)this; } (); + auto fs = () @trusted { return cast(shared)f; } (); + m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); startWaiting(); } catch (Exception e) { m_activeReads.remove(file); @@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil } /// private - static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) + static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer) { log("task fun"); - IOInfo* f = mixin("&fd.m_files[file]."~op); + shared(IOInfo)* f = mixin("&fi."~op); log("start processing"); if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) @@ -337,7 +341,7 @@ log("start processing"); scope (exit) { log("trigger event"); - () @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); + files.m_events.trigger(files.m_readyEvent, true); } if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); @@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args) // Maintains a single thread pool shared by all driver instances (threads) private struct StaticTaskPool { - import core.atomic : cas, atomicStore; + import core.sync.mutex : Mutex; import std.parallelism : TaskPool; private { - static shared int m_locked = 0; + static shared Mutex m_mutex; static __gshared TaskPool m_pool; static __gshared int m_refCount = 0; } + shared static this() + { + m_mutex = new shared Mutex; + } + static TaskPool addRef() @trusted nothrow { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!m_refCount++) { try { @@ -452,8 +461,8 @@ private struct StaticTaskPool { TaskPool fin_pool; { - while (!cas(&m_locked, 0, 1)) {} - scope (exit) atomicStore(m_locked, 0); + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); if (!--m_refCount) { fin_pool = m_pool;