Merge pull request #122 from vibe-d/fix_zombie_processes

Use waitpid to iterate over all exited child processes
This commit is contained in:
Sönke Ludwig 2019-08-24 00:38:15 +02:00 committed by GitHub
commit bca94d5736
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 429 additions and 345 deletions

View file

@ -55,7 +55,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
//else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; //else version (OSX) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
version (linux) alias ProcessDriver = SignalEventDriverProcesses!Loop; version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop;
else alias ProcessDriver = DummyEventDriverProcesses!Loop; else alias ProcessDriver = DummyEventDriverProcesses!Loop;
Loop m_loop; Loop m_loop;

View file

@ -10,427 +10,435 @@ import std.algorithm.comparison : among;
import std.variant : visit; import std.variant : visit;
import std.stdint; import std.stdint;
private struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
EventDriverProcesses driver;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
private struct StaticProcesses {
@safe: nothrow:
import core.sync.mutex : Mutex;
private {
static shared Mutex m_mutex;
static __gshared ProcessInfo[ProcessID] m_processes;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static void add(ProcessID pid, ProcessInfo info) @trusted {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
assert(pid !in m_processes, "Process adopted twice");
m_processes[pid] = info;
}
}
private auto lockedProcessInfo(alias fn)(ProcessID pid) @trusted {
StaticProcesses.m_mutex.lock_nothrow();
scope (exit) StaticProcesses.m_mutex.unlock_nothrow();
auto info = pid in StaticProcesses.m_processes;
return fn(info);
}
private enum SIGCHLD = 17; private enum SIGCHLD = 17;
final class SignalEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN, EINPROGRESS; import core.sync.mutex : Mutex;
import core.sys.linux.sys.signalfd; import core.sys.posix.unistd : dup;
import core.sys.posix.unistd : close, read, write, dup; import core.thread : Thread;
private { private {
Loop m_loop; static shared Mutex s_mutex;
EventDriver m_driver; static __gshared ProcessInfo[ProcessID] s_processes;
SignalListenID m_sighandle; static __gshared Thread s_waitThread;
}
this(Loop loop, EventDriver driver) Loop m_loop;
{ // FIXME: avoid virtual funciton calls and use the final type instead
import core.sys.posix.signal; EventDriver m_driver;
}
m_loop = loop; this(Loop loop, EventDriver driver)
m_driver = driver; {
m_loop = loop;
m_driver = driver;
}
// Listen for child process exits using SIGCHLD void dispose()
m_sighandle = () @trusted { {
sigset_t sset; }
sigemptyset(&sset);
sigaddset(&sset, SIGCHLD);
assert(sigprocmask(SIG_BLOCK, &sset, null) == 0); final override ProcessID adopt(int system_pid)
{
auto pid = cast(ProcessID)system_pid;
return SignalListenID(signalfd(-1, &sset, SFD_NONBLOCK | SFD_CLOEXEC)); ProcessInfo info;
} (); info.exited = false;
info.refCount = 1;
info.driver = this;
add(pid, info);
return pid;
}
m_loop.initFD(cast(FD)m_sighandle, FDFlags.internal, SignalSlot(null)); final override Process spawn(
m_loop.registerFD(cast(FD)m_sighandle, EventMask.read); string[] args,
m_loop.setNotifyCallback!(EventType.read)(cast(FD)m_sighandle, &onSignal); ProcessStdinFile stdin,
ProcessStdoutFile stdout,
ProcessStderrFile stderr,
const string[string] env,
ProcessConfig config,
string working_dir)
@trusted {
// Use std.process to spawn processes
import std.process : pipe, Pid, spawnProcess;
import std.stdio : File;
static import std.stdio;
onSignal(cast(FD)m_sighandle); static File fdToFile(int fd, scope const(char)[] mode)
} {
try {
File f;
f.fdopen(fd, mode);
return f;
} catch (Exception e) {
assert(0);
}
}
void dispose() try {
{ Process process;
FD sighandle = cast(FD)m_sighandle; File stdinFile, stdoutFile, stderrFile;
m_loop.m_fds[sighandle].common.refCount--;
m_loop.setNotifyCallback!(EventType.read)(sighandle, null);
m_loop.unregisterFD(sighandle, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!(SignalSlot)(sighandle);
close(cast(int)sighandle);
}
final override ProcessID adopt(int system_pid) stdinFile = stdin.visit!(
{ (int handle) => fdToFile(handle, "r"),
auto pid = cast(ProcessID)system_pid; (ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stdin;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno));
return p.readEnd;
}
});
ProcessInfo info; stdoutFile = stdout.visit!(
info.exited = false; (int handle) => fdToFile(handle, "w"),
info.refCount = 1; (ProcessRedirect redirect) {
info.driver = this; final switch (redirect) {
StaticProcesses.add(pid, info); case ProcessRedirect.inherit: return std.stdio.stdout;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
return pid; stderrFile = stderr.visit!(
} (int handle) => fdToFile(handle, "w"),
(ProcessRedirect redirect) {
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stderr;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
final override Process spawn( const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect;
string[] args, const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
ProcessStdinFile stdin,
ProcessStdoutFile stdout,
ProcessStderrFile stderr,
const string[string] env,
ProcessConfig config,
string working_dir)
@trusted {
// Use std.process to spawn processes
import std.process : pipe, Pid, spawnProcess;
import std.stdio : File;
static import std.stdio;
static File fdToFile(int fd, scope const(char)[] mode) if (redirectStdout) {
{ assert(!redirectStderr, "Can't redirect both stdout and stderr");
try {
File f;
f.fdopen(fd, mode);
return f;
} catch (Exception e) {
assert(0);
}
}
try { stdoutFile = stderrFile;
Process process; } else if (redirectStderr) {
File stdinFile, stdoutFile, stderrFile; stderrFile = stdoutFile;
}
stdinFile = stdin.visit!( Pid stdPid = spawnProcess(
(int handle) => fdToFile(handle, "r"), args,
(ProcessRedirect redirect) { stdinFile,
final switch (redirect) { stdoutFile,
case ProcessRedirect.inherit: return std.stdio.stdin; stderrFile,
case ProcessRedirect.none: return File.init; env,
case ProcessRedirect.pipe: cast(std.process.Config)config,
auto p = pipe(); working_dir);
process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); process.pid = adopt(stdPid.osHandle);
return p.readEnd; stdPid.destroy();
}
});
stdoutFile = stdout.visit!( return process;
(int handle) => fdToFile(handle, "w"), } catch (Exception e) {
(ProcessRedirect redirect) { return Process.init;
final switch (redirect) { }
case ProcessRedirect.inherit: return std.stdio.stdout; }
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
stderrFile = stderr.visit!( final override void kill(ProcessID pid, int signal)
(int handle) => fdToFile(handle, "w"), @trusted {
(ProcessRedirect redirect) { import core.sys.posix.signal : pkill = kill;
final switch (redirect) {
case ProcessRedirect.inherit: return std.stdio.stderr;
case ProcessRedirect.none: return File.init;
case ProcessRedirect.pipe:
auto p = pipe();
process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno));
return p.writeEnd;
}
},
(_) => File.init);
const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; assert(cast(int)pid > 0, "Invalid PID passed to kill.");
const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
if (redirectStdout) { if (cast(int)pid > 0)
assert(!redirectStderr, "Can't redirect both stdout and stderr"); pkill(cast(int)pid, signal);
}
stdoutFile = stderrFile; final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
} else if (redirectStderr) { {
stderrFile = stdoutFile; bool exited;
} int exitCode;
Pid stdPid = spawnProcess( size_t id = size_t.max;
args, lockedProcessInfo(pid, (info) {
stdinFile, assert(info !is null, "Unknown process ID");
stdoutFile,
stderrFile,
env,
cast(std.process.Config)config,
working_dir);
process.pid = adopt(stdPid.osHandle);
stdPid.destroy();
return process; if (info.exited) {
} catch (Exception e) { exited = true;
return Process.init; exitCode = info.exitCode;
} } else {
} info.callbacks ~= on_process_exit;
id = info.callbacks.length - 1;
}
});
final override void kill(ProcessID pid, int signal) if (exited) {
@trusted { on_process_exit(pid, exitCode);
import core.sys.posix.signal : pkill = kill; }
pkill(cast(int)pid, signal); return id;
} }
final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) final override void cancelWait(ProcessID pid, size_t wait_id)
{ {
bool exited; if (wait_id == size_t.max) return;
int exitCode;
size_t id = lockedProcessInfo!((info) { lockedProcessInfo(pid, (info) {
assert(info !is null, "Unknown process ID"); assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > wait_id, "Invalid process wait ID");
if (info.exited) { info.callbacks[wait_id] = null;
exited = true; });
exitCode = info.exitCode; }
return 0;
} else {
info.callbacks ~= on_process_exit;
return info.callbacks.length - 1;
}
})(pid);
if (exited) { private void onProcessExit(int system_pid)
on_process_exit(pid, exitCode); shared {
} m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid);
}
return id; private static void onLocalProcessExit(intptr_t system_pid)
} {
auto pid = cast(ProcessID)system_pid;
final override void cancelWait(ProcessID pid, size_t waitId) int exitCode;
{ ProcessWaitCallback[] callbacks;
lockedProcessInfo!((info) {
assert(info !is null, "Unknown process ID");
assert(!info.exited, "Cannot cancel wait when none are pending");
assert(info.callbacks.length > waitId, "Invalid process wait ID");
info.callbacks[waitId] = null; PosixEventDriverProcesses driver;
})(pid); lockedProcessInfo(pid, (info) {
} assert(info !is null);
private void onSignal(FD fd) exitCode = info.exitCode;
{
SignalListenID lid = cast(SignalListenID)fd;
signalfd_siginfo nfo; callbacks = info.callbacks;
do { info.callbacks = null;
auto ret = () @trusted { return read(cast(int)fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS) || ret != nfo.sizeof) driver = info.driver;
return; });
onProcessExit(nfo.ssi_pid, nfo.ssi_status); foreach (cb; callbacks) {
} while (true); if (cb)
} cb(pid, exitCode);
}
private void onProcessExit(int system_pid, int exitCode) driver.releaseRef(pid);
{ }
auto pid = cast(ProcessID)system_pid;
ProcessWaitCallback[] callbacks; final override bool hasExited(ProcessID pid)
auto driver = lockedProcessInfo!((info) @safe { {
// We get notified of any child exiting, so ignore the ones we're bool ret;
// not aware of lockedProcessInfo(pid, (info) {
if (info is null) { assert(info !is null, "Unknown process ID");
return null; ret = info.exited;
} });
return ret;
}
// Increment the ref count to make sure it doesn't get removed final override void addRef(ProcessID pid)
info.refCount++; {
lockedProcessInfo(pid, (info) {
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++;
});
}
info.exited = true; final override bool releaseRef(ProcessID pid)
info.exitCode = exitCode; {
return info.driver; bool ret;
})(pid); lockedProcessInfo(pid, (info) {
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) {
// Remove/deallocate process
if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } ();
// Need to call callbacks in the owner thread as this function can be () @trusted { s_processes.remove(pid); } ();
// called from any thread. Without extra threads this is always the main ret = false;
// thread. } else ret = true;
if (() @trusted { return cast(void*)this == cast(void*)driver; } ()) { });
onLocalProcessExit(cast(intptr_t)pid); return ret;
} else if (driver) { }
auto sharedDriver = () @trusted { return cast(shared typeof(this))driver; } ();
sharedDriver.m_driver.core.runInOwnerThread(&onLocalProcessExit, cast(intptr_t)pid); final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
} @system {
} void* ret;
lockedProcessInfo(pid, (info) @safe nothrow {
assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");
assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
private static void onLocalProcessExit(intptr_t system_pid) if (!info.userDataDestructor) {
{ () @trusted { initialize(info.userData.ptr); } ();
auto pid = cast(ProcessID)system_pid; info.userDataDestructor = destroy;
}
ret = () @trusted { return info.userData.ptr; } ();
});
return ret;
}
int exitCode; package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; }
ProcessWaitCallback[] callbacks;
auto driver = lockedProcessInfo!((info) {
assert(info !is null);
exitCode = info.exitCode; shared static this()
{
s_mutex = new shared Mutex;
}
callbacks = info.callbacks; private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
info.callbacks = null; {
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
auto info = () @trusted { return pid in s_processes; } ();
fn(info);
}
return info.driver; private static void add(ProcessID pid, ProcessInfo info) @trusted {
})(pid); s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
foreach (cb; callbacks) { if (!s_waitThread) {
if (cb) s_waitThread = new Thread(&waitForProcesses);
cb(pid, exitCode); s_waitThread.start();
} }
driver.releaseRef(pid); assert(pid !in s_processes, "Process adopted twice");
} s_processes[pid] = info;
}
final override bool hasExited(ProcessID pid) private static void waitForProcesses()
{ @system {
return lockedProcessInfo!((info) { import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid;
assert(info !is null, "Unknown process ID"); import core.sys.posix.signal : siginfo_t;
return info.exited; while (true) {
})(pid); siginfo_t dummy;
} auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT);
if (ret == -1) {
{
s_mutex.lock_nothrow();
scope (exit) s_mutex.unlock_nothrow();
s_waitThread = null;
}
break;
}
final override void addRef(ProcessID pid) ProcessID[] allprocs;
{
lockedProcessInfo!((info) {
nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
info.refCount++;
})(pid);
}
final override bool releaseRef(ProcessID pid) {
{ s_mutex.lock_nothrow();
return lockedProcessInfo!((info) { scope (exit) s_mutex.unlock_nothrow();
nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
if (--info.refCount == 0) {
// Remove/deallocate process
if (info.userDataDestructor)
() @trusted { info.userDataDestructor(info.userData.ptr); } ();
StaticProcesses.m_processes.remove(pid);
return false;
}
return true;
})(pid);
}
final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) () @trusted {
@system { foreach (ref entry; s_processes.byKeyValue) {
return lockedProcessInfo!((info) { if (!entry.value.exited)
assert(info.userDataDestructor is null || info.userDataDestructor is destroy, allprocs ~= entry.key;
"Requesting user data with differing type (destructor)."); }
assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); } ();
}
if (!info.userDataDestructor) { foreach (pid; allprocs) {
initialize(info.userData.ptr); int status;
info.userDataDestructor = destroy; ret = () @trusted { return waitpid(cast(int)pid, &status, WNOHANG); } ();
} if (ret == cast(int)pid) {
return info.userData.ptr; int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
})(pid); onProcessExitStatic(ret, exitstatus);
} }
}
}
}
package final @property size_t pendingCount() const nothrow @trusted { return StaticProcesses.m_processes.length; } private static void onProcessExitStatic(int system_pid, int exit_status)
{
auto pid = cast(ProcessID)system_pid;
PosixEventDriverProcesses driver;
lockedProcessInfo(pid, (ProcessInfo* info) @safe {
// We get notified of any child exiting, so ignore the ones we're
// not aware of
if (info is null) return;
// Increment the ref count to make sure it doesn't get removed
info.refCount++;
info.exited = true;
info.exitCode = exit_status;
driver = info.driver;
});
if (driver)
() @trusted { return cast(shared)driver; } ().onProcessExit(cast(int)pid);
}
private static struct ProcessInfo {
bool exited = true;
int exitCode;
ProcessWaitCallback[] callbacks;
size_t refCount = 0;
PosixEventDriverProcesses driver;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}
} }
final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
this(Loop loop, EventDriver driver) {} this(Loop loop, EventDriver driver) {}
void dispose() {} void dispose() {}
override ProcessID adopt(int system_pid) override ProcessID adopt(int system_pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool hasExited(ProcessID pid) override bool hasExited(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void kill(ProcessID pid, int signal) override void kill(ProcessID pid, int signal)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void cancelWait(ProcessID pid, size_t waitId) override void cancelWait(ProcessID pid, size_t waitId)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void addRef(ProcessID pid) override void addRef(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override bool releaseRef(ProcessID pid) override bool releaseRef(ProcessID pid)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
} }
protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system { @system {
assert(false, "TODO!"); assert(false, "TODO!");
} }
package final @property size_t pendingCount() const nothrow { return 0; } package final @property size_t pendingCount() const nothrow { return 0; }
} }

View file

@ -0,0 +1,76 @@
#!/usr/bin/env dub
/+ dub.sdl:
name "test"
dependency "eventcore" path=".."
+/
module test;
import core.time : Duration, msecs;
import eventcore.core;
import std.conv;
import std.datetime;
import std.process : thisProcessID;
import std.stdio;
version (Windows) {
void main()
{
writefln("Skipping SIGCHLD coalesce test on Windows.");
}
} else:
import core.sys.posix.sys.wait : waitpid, WNOHANG;
int numProc;
void main(string[] args)
{
// child mode
if (args.length == 2)
{
import core.thread : Thread;
writefln("Child: %s (%s) from %s", args[1], (args[1].to!long - Clock.currStdTime).hnsecs, thisProcessID);
Thread.sleep((args[1].to!long - Clock.currStdTime).hnsecs);
return;
}
auto tm = eventDriver.timers.create();
eventDriver.timers.set(tm, 5.seconds, 0.msecs);
eventDriver.timers.wait(tm, (tm) @trusted {
assert(false, "Test hung.");
});
// attempt to let all child processes finish in exactly 1 second to force
// signal coalescing
auto targettime = Clock.currTime(UTC()) + 1.seconds;
auto procs = new Process[](20);
foreach (i, ref p; procs) {
p = eventDriver.processes.spawn(
[args[0], targettime.stdTime.to!string],
ProcessStdinFile(ProcessRedirect.inherit),
ProcessStdoutFile(ProcessRedirect.inherit),
ProcessStderrFile(ProcessRedirect.inherit),
null, ProcessConfig.none, null
);
assert(p != Process.init);
writeln("Started child: ", p.pid);
numProc++;
}
foreach (p; procs) {
eventDriver.processes.wait(p.pid, (ProcessID pid, int res) nothrow
{
numProc--;
try writefln("Child %s exited with %s", pid, res);
catch(Exception){}
});
}
do eventDriver.core.processEvents(Duration.max);
while (numProc);
foreach (p; procs) assert(waitpid(cast(int)p.pid, null, WNOHANG) == -1);
}