Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Update subprocess_daemon.py to terminate all child processes from ray job when 'sky cancel' is ran #3919

52 changes: 37 additions & 15 deletions sky/skylet/subprocess_daemon.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
"""Sky subprocess daemon.

Wait for parent_pid to exit, then SIGTERM (or SIGKILL if needed) the child
processes of proc_pid.
"""

import argparse
import os
import sys
import time

import psutil

if __name__ == '__main__':

def daemonize():
"""Separates the process from its parent process with double-forking."""
# First fork
if os.fork() > 0:
# original process terminates.
sys.exit()

# Continues to run from first forked child process.
# Detach from parent environment
os.setsid()

# Second fork
if os.fork() > 0:
# The first forked child process terminates.
sys.exit()
# Continues to run from second forked child process.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have already set start_new_session=True when we start this daemon, which should already do setsid for this process. Do we know why that does not work?

subprocess.Popen(
daemon_cmd,
start_new_session=True,
# Suppress output
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
# Disable input
stdin=subprocess.DEVNULL,
)

Reference: https://docs.python.org/3/library/subprocess.html

If start_new_session is true the setsid() system call will be made in the child process prior to the execution of the subprocess.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the start_new_session=True flag certainly does start a new session by running setsid. See the three comparison below:

  1. Running a ray job with the flag, start_new_session=True, and without the daemonization for subprocess_daemon.py I added(what we currently have at master branch):
$ ps aux | grep subprocess_daemon
gcpuser     4536  0.0  0.0  16212 12704 ?        SNs  18:37   0:00 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-packages/sky/skylet/subprocess_daemon.py --parent-pid 4141 --proc-pid 4532
gcpuser     4885  0.0  0.0   5264   636 pts/0    S+   18:39   0:00 grep subprocess_daemon
$ ps -o pid,ppid,sid,cmd 4536
    PID    PPID     SID CMD
   4536    4141    4536 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-pa
$ ps -o pid,ppid,sid,cmd 4141
    PID    PPID     SID CMD
   4141    3710    3527 ray::task,

As you can see from above, the SID, session id, is different for the process running subprocess_daemon.py and ray::task(4536 and 3527). And this is due to the start_new_session=True flag. And you can see that process running subprocess_daemon.py is a child process of the ray::task process, which kills the daemon process when ray's stop_job API is ran.

  1. Running a ray job with the flag, start_new_session=True, and daemonization for subprocess_daemon.py I added(what we currnetly have at this PR):
$ ps aux | grep subprocess_daemon
gcpuser     4531  0.3  0.0  16228  9872 ?        SN   18:08   0:00 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-packages/sky/skylet/subprocess_daemon.py --parent-pid 4129 --proc-pid 4522
gcpuser     5029  0.0  0.0   5132   708 pts/0    S+   18:11   0:00 grep subprocess_daemon
$ ps -o pid,ppid,sid,cmd 4531
    PID    PPID     SID CMD
   4531       1    4526 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-pa
$ ps -o pid,ppid,sid,cmd 4129
    PID    PPID     SID CMD
   4129    3699    3516 ray::task,

As you can see from above, the SID is different(4526 and 3516) for the process running subprocess_daemon.py and ray::task. Also, the parent process id, PPID, of the process running subprocess_daemon.py is not ray::task anymore with the daemonization added from this PR. As the subdaemon_process.py process is completely detached from it's original parent process, ray::task, the subprocess_daemon.py process does not get killed anymore when ray's stop_job API is ran.

  1. Running a ray job without the flag, start_new_session=True, but with daemonization for subprocess_daemon.py I added(just to show session group is separated with start_new_session=True flag):
$ ps aux | grep subprocess_daemon
gcpuser     4553  0.2  0.0  16228  9876 ?        SN   18:18   0:00 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-packages/sky/skylet/subprocess_daemon.py --parent-pid 4147 --proc-pid 4544
gcpuser     4650  0.0  0.0   5132   640 pts/0    S+   18:18   0:00 grep subprocess_daemon
$ ps -o pid,ppid,sid,cmd 4553
    PID    PPID     SID CMD
   4553       1    3502 /home/gcpuser/skypilot-runtime/bin/python /home/gcpuser/skypilot-runtime/lib/python3.10/site-
$ ps -o pid,ppid,sid,cmd 4147
    PID    PPID     SID CMD
   4147    3685    3502 ray::task,

As you can see from above, the SID is identical(3502) for both ray::task and subprocess_deamon.py as I removed the start_new_session=True flag, so the flag is indeed creating a new session when it's launching subprocess_daemon.py as a process.

In my understanding, creating a process with a separate session group doesn't necessarily detaches the process(subprocess_daemon.py) from it's parent process(ray::task) in terms of process hierarchy. So the additional daemonization is necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comprehensive investigation for this @landscapepainter ! This is very useful! I am wondering if it's possible that we can add some additional arguments to the subprocess call in python to achieve this detachment. That might be cleaner than having the current handling in the subprocess_daemon.py because the fork can cause two processes (though one will be exited immediately) to be generated every time we start this demon process?

Copy link
Collaborator Author

@landscapepainter landscapepainter Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Michaelvll I did some extensive search on this before I implemented current solution, and did more research in case I missed any.

The observed reason why ray's stop_job api kills subprocess_daemon.py is because the process is a child process of ray::task within the process hierarchy although the session group and process group are different, which implies the possible solution is to run subprocess_daemon.py in a separate process hierarchy from ray::task process. Unfortunately, there isn't an option for subprocess.Popen() to completely detach the child process from the process hierarchy.

Another approach that seems feasible is to update the way how we create ray job scripts from RayCodeGen and submit jobs to perhaps run the subprocess_daemon.py separately from ray job. But this seems to include much larger refactoring task since now skypilot needs access to the process IDs generated within remote ray::task to pass as an argument to and run subprocess_daemon.py outside of the job script.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. If we daemonize it in this file already, do we still need to add start_new_session in our Popen call?

Also, please leave a comment at the top of this file or this function to say why we daemonize this process, e.g., to make sure ray job stop does not kill this process, so this process can handle any leaked processes from user program.

Copy link
Collaborator Author

@landscapepainter landscapepainter Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_new_session is redundant at this point as you mentioned, and confirmed it works fine without it. Removed the option, and also updated the docstring.



if __name__ == '__main__':
daemonize()
parser = argparse.ArgumentParser()
parser.add_argument('--parent-pid', type=int, required=True)
parser.add_argument('--proc-pid', type=int, required=True)
Expand All @@ -28,29 +46,33 @@
if process is None:
sys.exit()

children = []
if parent_process is not None:
# Wait for either parent or target process to exit.
while process.is_running() and parent_process.is_running():
try:
# if process is terminated by the time reaching this line,
# it returns an empty list.
tmp_children = process.children(recursive=True)
if tmp_children:
children = tmp_children
children.append(process)
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
except psutil.NoSuchProcess:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move this in the while loop? Is it to make sure that even if the process is killed, we have recorded all the child processes before the process is killed, so we can kill those processes to avoid leakage?

If that is the purpose, we should mention that in the comment. (style nit: comment should be more about why we do a thing, instead of what the code does).

Copy link
Collaborator Author

@landscapepainter landscapepainter Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason you mentioned is exactly correct on why I moved this into while loop. This was the second issue of this script. Running process.children(recursive=True) after the process got killed returned empty list so it was not able to remove the child processes although it was daemonized.

Thanks for the reminder advice! The comment is updated at 4e1e7f3

time.sleep(1)

try:
children = process.children(recursive=True)
children.append(process)
except psutil.NoSuchProcess:
sys.exit()

for pid in children:
for child in children:
try:
pid.terminate()
child.terminate()
except psutil.NoSuchProcess:
pass
continue

# Wait 30s for the processes to exit gracefully.
time.sleep(30)

# SIGKILL if they're still running.
for pid in children:
for child in children:
try:
pid.kill()
child.kill()
except psutil.NoSuchProcess:
pass
continue
6 changes: 0 additions & 6 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2388,26 +2388,20 @@ def _get_cancel_task_with_cloud(name, cloud, timeout=15 * 60):

# ---------- Testing `sky cancel` ----------
@pytest.mark.aws
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_aws():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'aws')
run_one_test(test)


@pytest.mark.gcp
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_gcp():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'gcp')
run_one_test(test)


@pytest.mark.azure
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_azure():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'azure', timeout=30 * 60)
Expand Down
Loading