Merge pull request #109 from vibe-d/fixes

Threading fixes
merged-on-behalf-of: Leonid Kramer <l-kramer@users.noreply.github.com>
This commit is contained in:
The Dlang Bot 2019-04-13 22:17:24 +02:00 committed by GitHub
commit 83310fda70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 32 deletions

View file

@ -48,7 +48,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
auto id = cast(EventID)eid; auto id = cast(EventID)eid;
// FIXME: avoid dynamic memory allocation for the queue // FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal, m_loop.initFD(id, FDFlags.internal,
EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal)); EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal));
m_loop.registerFD(id, EventMask.read); m_loop.registerFD(id, EventMask.read);
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
@ -106,7 +106,7 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
catch (Exception e) assert(false, e.msg); catch (Exception e) assert(false, e.msg);
// FIXME: avoid dynamic memory allocation for the queue // FIXME: avoid dynamic memory allocation for the queue
m_loop.initFD(id, FDFlags.internal, m_loop.initFD(id, FDFlags.internal,
EventSlot(mallocT!(ConsumableQueue!EventCallback), false, is_internal, s)); EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s));
assert(getRC(id) == 1); assert(getRC(id) == 1);
return id; return id;
} }
@ -133,13 +133,10 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
final override void trigger(EventID event, bool notify_all) final override void trigger(EventID event, bool notify_all)
shared @trusted @nogc { shared @trusted @nogc {
import core.atomic : atomicStore; import core.atomic : atomicStore;
auto thisus = cast(PosixEventDriverEvents)this; long count = notify_all ? long.max : 1;
assert(event < thisus.m_loop.m_fds.length, "Invalid event ID passed to shared triggerEvent.");
long one = 1;
//log("emitting for all threads"); //log("emitting for all threads");
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true); version (Posix) .write(cast(int)event, &count, count.sizeof);
version (Posix) .write(cast(int)event, &one, one.sizeof); else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof);
else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof);
} }
final override void wait(EventID event, EventCallback on_event) final override void wait(EventID event, EventCallback on_event)
@ -157,26 +154,23 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
getSlot(event).waiters.removePending(on_event); getSlot(event).waiters.removePending(on_event);
} }
private void onEvent(FD fd) version (linux) {
@trusted { private void onEvent(FD fd)
EventID event = cast(EventID)fd; @trusted {
version (linux) { EventID event = cast(EventID)fd;
ulong cnt; ulong cnt;
() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
trigger(event, cnt > 0);
} }
import core.atomic : cas; } else {
auto all = cas(&getSlot(event).triggerAll, true, false);
trigger(event, all);
}
version (linux) {}
else {
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress) private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
@nogc { @nogc {
m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
try { try {
EventID evt = m_sockets.userData!EventID(s); EventID evt = m_sockets.userData!EventID(s);
scope doit = { onEvent(evt); }; // cast to nogc scope doit = {
trigger(evt, (cast(long[])m_buf)[0] > 1);
}; // cast to nogc
() @trusted { (cast(void delegate() @nogc)doit)(); } (); () @trusted { (cast(void delegate() @nogc)doit)(); } ();
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
} }
@ -238,7 +232,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
package struct EventSlot { package struct EventSlot {
alias Handle = EventID; alias Handle = EventID;
ConsumableQueue!EventCallback waiters; ConsumableQueue!EventCallback waiters;
shared bool triggerAll;
bool isInternal; bool isInternal;
version (linux) {} version (linux) {}
else { else {

View file

@ -230,7 +230,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
threadSetup(); threadSetup();
log("start write task"); log("start write task");
try { try {
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer)); auto thiss = () @trusted { return cast(shared)this; } ();
auto fs = () @trusted { return cast(shared)f; } ();
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer));
startWaiting(); startWaiting();
} catch (Exception e) { } catch (Exception e) {
m_activeWrites.remove(file); m_activeWrites.remove(file);
@ -267,7 +269,9 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
threadSetup(); threadSetup();
log("start read task"); log("start read task");
try { try {
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer)); auto thiss = () @trusted { return cast(shared)this; } ();
auto fs = () @trusted { return cast(shared)f; } ();
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer));
startWaiting(); startWaiting();
} catch (Exception e) { } catch (Exception e) {
m_activeReads.remove(file); m_activeReads.remove(file);
@ -320,10 +324,10 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
} }
/// private /// private
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer) static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer)
{ {
log("task fun"); log("task fun");
IOInfo* f = mixin("&fd.m_files[file]."~op); shared(IOInfo)* f = mixin("&fi."~op);
log("start processing"); log("start processing");
if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing))
@ -337,7 +341,7 @@ log("start processing");
scope (exit) { scope (exit) {
log("trigger event"); log("trigger event");
() @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent, true); files.m_events.trigger(files.m_readyEvent, true);
} }
if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok);
@ -421,19 +425,24 @@ private void log(ARGS...)(string fmt, ARGS args)
// Maintains a single thread pool shared by all driver instances (threads) // Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool { private struct StaticTaskPool {
import core.atomic : cas, atomicStore; import core.sync.mutex : Mutex;
import std.parallelism : TaskPool; import std.parallelism : TaskPool;
private { private {
static shared int m_locked = 0; static shared Mutex m_mutex;
static __gshared TaskPool m_pool; static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0; static __gshared int m_refCount = 0;
} }
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef() static TaskPool addRef()
@trusted nothrow { @trusted nothrow {
while (!cas(&m_locked, 0, 1)) {} m_mutex.lock_nothrow();
scope (exit) atomicStore(m_locked, 0); scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) { if (!m_refCount++) {
try { try {
@ -452,8 +461,8 @@ private struct StaticTaskPool {
TaskPool fin_pool; TaskPool fin_pool;
{ {
while (!cas(&m_locked, 0, 1)) {} m_mutex.lock_nothrow();
scope (exit) atomicStore(m_locked, 0); scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) { if (!--m_refCount) {
fin_pool = m_pool; fin_pool = m_pool;