Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Update subprocess_daemon.py to terminate all child processes from ray job when 'sky cancel' is ran #3919

5 changes: 4 additions & 1 deletion sky/skylet/log_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ def run_with_log(
str(proc.pid),
]

# We do not need to set `start_new_session=True` here, as the
# daemon script will detach itself from the parent process with
# fork to avoid being killed by ray job. See the reason we
# daemonize the process in `sky/skylet/subprocess_daemon.py`.
subprocess.Popen(
daemon_cmd,
start_new_session=True,
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
# Suppress output
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
Expand Down
62 changes: 47 additions & 15 deletions sky/skylet/subprocess_daemon.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
"""Sky subprocess daemon.

Wait for parent_pid to exit, then SIGTERM (or SIGKILL if needed) the child
processes of proc_pid.
"""

import argparse
import os
import sys
import time

import psutil

if __name__ == '__main__':

def daemonize():
"""Detaches the process from its parent process with double-forking.

This detachment is crucial in the context of SkyPilot and Ray job. When
'sky cancel' is executed, it uses Ray's stop job API to terminate the job.
Without daemonization, this subprocess_daemon process would be terminated
along with its parent process, ray::task, which is launched with Ray job.
Daemonization ensures this process survives the 'sky cancel' command,
allowing it to prevent orphaned processes of Ray job.
"""
# First fork: Creates a child process identical to the parent
if os.fork() > 0:
# Parent process exits, allowing the child to run independently
sys.exit()

# Continues to run from first forked child process.
# Detach from parent environment.
os.setsid()

# Second fork: Creates a grandchild process
if os.fork() > 0:
# First child exits, orphaning the grandchild
sys.exit()
# Continues execution in the grandchild process
# This process is now fully detached from the original parent and terminal


if __name__ == '__main__':
daemonize()
parser = argparse.ArgumentParser()
parser.add_argument('--parent-pid', type=int, required=True)
parser.add_argument('--proc-pid', type=int, required=True)
Expand All @@ -28,29 +55,34 @@
if process is None:
sys.exit()

children = []
if parent_process is not None:
# Wait for either parent or target process to exit.
while process.is_running() and parent_process.is_running():
try:
# process.children() must be called while the target process
# is alive, as it will return an empty list if the target
# process has already terminated.
tmp_children = process.children(recursive=True)
if tmp_children:
children = tmp_children
except psutil.NoSuchProcess:
pass
time.sleep(1)
children.append(process)

try:
children = process.children(recursive=True)
children.append(process)
except psutil.NoSuchProcess:
sys.exit()

for pid in children:
for child in children:
try:
pid.terminate()
child.terminate()
except psutil.NoSuchProcess:
pass
continue

# Wait 30s for the processes to exit gracefully.
time.sleep(30)

# SIGKILL if they're still running.
for pid in children:
for child in children:
try:
pid.kill()
child.kill()
except psutil.NoSuchProcess:
pass
continue
6 changes: 0 additions & 6 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2388,26 +2388,20 @@ def _get_cancel_task_with_cloud(name, cloud, timeout=15 * 60):

# ---------- Testing `sky cancel` ----------
@pytest.mark.aws
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_aws():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'aws')
run_one_test(test)


@pytest.mark.gcp
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_gcp():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'gcp')
run_one_test(test)


@pytest.mark.azure
@pytest.mark.skip(
reason='The resnet_app is flaky, due to TF failing to detect GPUs.')
def test_cancel_azure():
name = _get_cluster_name()
test = _get_cancel_task_with_cloud(name, 'azure', timeout=30 * 60)
Expand Down
Loading