Merge pull request #130 from vibe-d/issue_vibed_2378_dns_lookup_memory_leak

Start a thread for each DNS lookup
This commit is contained in:
Sönke Ludwig 2019-10-08 16:22:29 +02:00 committed by GitHub
commit 9357cf485f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -28,13 +28,17 @@ version (Posix)
final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS { final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriverSignals) : EventDriverDNS {
import std.parallelism : task, taskPool; import std.parallelism : task, taskPool;
import std.string : toStringz; import std.string : toStringz;
import core.atomic : atomicFence, atomicLoad, atomicStore;
import core.thread : Thread;
private { private {
static struct Lookup { static struct Lookup {
shared(bool) done;
DNSLookupCallback callback; DNSLookupCallback callback;
addrinfo* result; addrinfo* result;
int retcode; int retcode;
string name; string name;
Thread thread;
} }
ChoppedVector!Lookup m_lookups; ChoppedVector!Lookup m_lookups;
Events m_events; Events m_events;
@ -69,28 +73,53 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
Lookup* l = () @trusted { return &m_lookups[handle]; } (); Lookup* l = () @trusted { return &m_lookups[handle]; } ();
l.name = name; l.name = name;
l.callback = on_lookup_finished; l.callback = on_lookup_finished;
l.done = false;
auto events = () @trusted { return cast(shared)m_events; } (); auto events = () @trusted { return cast(shared)m_events; } ();
auto t = task!taskFun(l, AddressFamily.UNSPEC, events, m_event);
try t.executeInNewThread();//taskPool.put(t); try {
catch (Exception e) return DNSLookupID.invalid; auto thr = new class(l, AddressFamily.UNSPEC, events, m_event) Thread {
debug (EventCoreLogDNS) print("lookup handle: %s", handle); Lookup* m_lookup;
m_events.loop.m_waiterCount++; AddressFamily m_family;
return handle; shared(Events) m_events;
EventID m_event;
this(Lookup* l, AddressFamily af, shared(Events) events, EventID event)
{
m_lookup = l;
m_family = af;
m_events = events;
m_event = event;
super(&perform);
this.name = "Eventcore DNS Lookup";
l.thread = this;
} }
/// public void perform()
static void taskFun(Lookup* lookup, int af, shared(Events) events, EventID event) nothrow {
{ debug (EventCoreLogDNS) print("lookup %s start", m_lookup.name);
debug (EventCoreLogDNS) print("lookup %s start", lookup.name);
addrinfo hints; addrinfo hints;
hints.ai_flags = AI_ADDRCONFIG; hints.ai_flags = AI_ADDRCONFIG;
version (linux) hints.ai_flags |= AI_V4MAPPED; version (linux) hints.ai_flags |= AI_V4MAPPED;
hints.ai_family = af; hints.ai_family = m_family;
() @trusted { lookup.retcode = getaddrinfo(lookup.name.toStringz(), null, af == AddressFamily.UNSPEC ? null : &hints, &lookup.result); } (); () @trusted { m_lookup.retcode = getaddrinfo(m_lookup.name.toStringz(), null, m_family == AddressFamily.UNSPEC ? null : &hints, &m_lookup.result); } ();
if (lookup.retcode == -1) if (m_lookup.retcode == -1)
version (CRuntime_Glibc) version (linux) __res_init(); version (CRuntime_Glibc) version (linux) __res_init();
events.trigger(event, true);
debug (EventCoreLogDNS) print("lookup %s finished", lookup.name); atomicStore(m_lookup.done, true);
atomicFence(); // synchronize the other fields in m_lookup with the main thread
m_events.trigger(m_event, true);
debug (EventCoreLogDNS) print("lookup %s finished", m_lookup.name);
}
};
() @trusted { thr.start(); } ();
} catch (Exception e) {
return DNSLookupID.invalid;
}
debug (EventCoreLogDNS) print("lookup handle: %s", handle);
m_events.loop.m_waiterCount++;
return handle;
} }
override void cancelLookup(DNSLookupID handle) override void cancelLookup(DNSLookupID handle)
@ -104,9 +133,21 @@ final class EventDriverDNS_GAI(Events : EventDriverEvents, Signals : EventDriver
{ {
debug (EventCoreLogDNS) print("DNS event triggered"); debug (EventCoreLogDNS) print("DNS event triggered");
m_events.wait(m_event, &onDNSSignal); m_events.wait(m_event, &onDNSSignal);
size_t lastmax; size_t lastmax;
foreach (i, ref l; m_lookups) { foreach (i, ref l; m_lookups) {
if (i > m_maxHandle) break; if (i > m_maxHandle) break;
if (!atomicLoad(l.done)) continue;
// synchronize the other fields in m_lookup with the lookup thread
atomicFence();
try {
l.thread.join();
destroy(l.thread);
} catch (Exception e) {
debug (EventCoreLogDNS) print("Failed to join DNS thread: %s", e.msg);
}
if (l.callback) { if (l.callback) {
if (l.result || l.retcode) { if (l.result || l.retcode) {
debug (EventCoreLogDNS) print("found finished lookup %s for %s", i, l.name); debug (EventCoreLogDNS) print("found finished lookup %s for %s", i, l.name);