From 5255645455e227429a03435b7f09ec50defd21e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 7 Oct 2019 11:09:16 +0200 Subject: [PATCH 1/2] Start a thread for each DNS lookup. Fixes vibe-d/vibe.d#2378. std.parallelism.Task.executeInNewThread leaks the thread's resources instead of reusing it in later calls. As a workaround, this commit starts a new thread for every lookup and properly tears it down afterwards. At a later point, this code should be changed to reuse the thread(s), if possible, to avoid the startup overhead. --- source/eventcore/drivers/posix/dns.d | 72 +++++++++++++++++++++------- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index d4d7ef8..d8a8dae 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -28,13 +28,16 @@ version (Posix) final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS { import std.parallelism : task, taskPool; import std.string : toStringz; + import core.thread : Thread; private { static struct Lookup { + bool done; DNSLookupCallback callback; addrinfo* result; int retcode; string name; + Thread thread; } ChoppedVector!Lookup m_lookups; Events m_events; @@ -69,30 +72,54 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver Lookup* l = () @trusted { return &m_lookups[handle]; } (); l.name = name; l.callback = on_lookup_finished; + l.done = false; auto events = () @trusted { return cast(shared)m_events; } (); - auto t = task!taskFun(l, AddressFamily.UNSPEC, events, m_event); - try t.executeInNewThread();//taskPool.put(t); - catch (Exception e) return DNSLookupID.invalid; + + try { + auto thr = new class(l, AddressFamily.UNSPEC, events, m_event) Thread { + Lookup* m_lookup; + AddressFamily m_family; + shared(Events) m_events; + EventID m_event; + + this(Lookup* l, AddressFamily af, shared(Events) events, EventID event) + { + m_lookup = l; + m_family = af; + m_events = events; + m_event = event; + super(&perform); + this.name = "Eventcore DNS Lookup"; + l.thread = this; + } + + void perform() + nothrow { + debug (EventCoreLogDNS) print("lookup %s start", m_lookup.name); + addrinfo hints; + hints.ai_flags = AI_ADDRCONFIG; + version (linux) hints.ai_flags |= AI_V4MAPPED; + hints.ai_family = m_family; + () @trusted { m_lookup.retcode = getaddrinfo(m_lookup.name.toStringz(), null, m_family == AddressFamily.UNSPEC ? null : &hints, &m_lookup.result); } (); + if (m_lookup.retcode == -1) + version (CRuntime_Glibc) version (linux) __res_init(); + + m_lookup.done = true; + m_events.trigger(m_event, true); + debug (EventCoreLogDNS) print("lookup %s finished", m_lookup.name); + } + }; + + () @trusted { thr.start(); } (); + } catch (Exception e) { + return DNSLookupID.invalid; + } + debug (EventCoreLogDNS) print("lookup handle: %s", handle); m_events.loop.m_waiterCount++; return handle; } - /// public - static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event) - { - debug (EventCoreLogDNS) print("lookup %s start", lookup.name); - addrinfo hints; - hints.ai_flags = AI_ADDRCONFIG; - version (linux) hints.ai_flags |= AI_V4MAPPED; - hints.ai_family = af; - () @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } (); - if (lookup.retcode == -1) - version (CRuntime_Glibc) version (linux) __res_init(); - events.trigger(event, true); - debug (EventCoreLogDNS) print("lookup %s finished", lookup.name); - } - override void cancelLookup(DNSLookupID handle) { m_lookups[handle].callback = null; @@ -107,6 +134,15 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver size_t lastmax; foreach (i, ref l; m_lookups) { if (i > m_maxHandle) break; + if (!l.done) continue; + + try { + l.thread.join(); + destroy(l.thread); + } catch (Exception e) { + debug (EventCoreLogDNS) print("Failed to join DNS thread: %s", e.msg); + } + if (l.callback) { if (l.result || l.retcode) { debug (EventCoreLogDNS) print("found finished lookup %s for %s", i, l.name); From b77adfcbd230c0f5ddf7048f90b458b27dd87960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 7 Oct 2019 11:18:58 +0200 Subject: [PATCH 2/2] Add proper thread synchronization to EventDriverDNS_GAI. --- source/eventcore/drivers/posix/dns.d | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/eventcore/drivers/posix/dns.d b/source/eventcore/drivers/posix/dns.d index d8a8dae..196455c 100644 --- a/source/eventcore/drivers/posix/dns.d +++ b/source/eventcore/drivers/posix/dns.d @@ -28,11 +28,12 @@ version (Posix) final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS { import std.parallelism : task, taskPool; import std.string : toStringz; + import core.atomic : atomicFence, atomicLoad, atomicStore; import core.thread : Thread; private { static struct Lookup { - bool done; + shared(bool) done; DNSLookupCallback callback; addrinfo* result; int retcode; @@ -104,7 +105,8 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver if (m_lookup.retcode == -1) version (CRuntime_Glibc) version (linux) __res_init(); - m_lookup.done = true; + atomicStore(m_lookup.done, true); + atomicFence(); // synchronize the other fields in m_lookup with the main thread m_events.trigger(m_event, true); debug (EventCoreLogDNS) print("lookup %s finished", m_lookup.name); } @@ -131,10 +133,13 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver { debug (EventCoreLogDNS) print("DNS event triggered"); m_events.wait(m_event, &onDNSSignal); + size_t lastmax; foreach (i, ref l; m_lookups) { if (i > m_maxHandle) break; - if (!l.done) continue; + if (!atomicLoad(l.done)) continue; + // synchronize the other fields in m_lookup with the lookup thread + atomicFence(); try { l.thread.join();