Skip to content

Commit

Permalink
[Enhancement] Support rerun failed or canceled jobs in `train_benchma…
Browse files Browse the repository at this point in the history
…rk.py` (#1259)

* trigger CI

* fix cache dir in job watcher

* support rerun failure or canceled jobs in train benchmark

* add use case and readme for rerun

* avoid trigger circle CI pr-stage-test  when .dev_script is modified

* support change cpus-per-task in train benchmark
  • Loading branch information
LeoXing1996 committed Oct 11, 2022
1 parent 11dcf18 commit 1ca720d
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 8 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ workflows:
tools/.* lint_only false
configs/.* lint_only false
.circleci/.* lint_only false
.dev_scripts/.* lint_only true
base-revision: 1.x
# this is the path of the configuration we should trigger once
# path filtering and pipeline parameter value updates are
Expand Down
29 changes: 28 additions & 1 deletion .dev_scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,34 @@ python .dev_scripts/train_benchmark.py mm_lol \

Specifically, you need to enable `--skip`, and specify the list of models to skip by `--skip-list`

## Automatically check links
## 7. Train failed or canceled jobs

If you want to rerun failed or canceled jobs in the last run, you can combine `--rerun` flag with `--rerun-failure` and `--rerun-cancel` flags.

For example, the log file of the last run is `train-20221009-211904.log`, and now you want to rerun the failed jobs. You can use the following command:

```bash
python .dev_scripts/train_benchmark.py mm_lol \
--job-name RERUN \
--rerun train-20221009-211904.log \
--rerun-fail \
--run
```

We can combine `--rerun-fail` and `--rerun-cancel` with flag `---models` to rerun a **subset** of failed or canceled model.

```bash
python .dev_scripts/train_benchmark.py mm_lol \
--job-name RERUN \
--rerun train-20221009-211904.log \
--rerun-fail \
--models sagan \ # only rerun 'sagan' models in all failed tasks
--run
```

Specifically, `--rerun-fail` and `--rerun-cancel` can be used together to rerun both failed and cancaled jobs.

## 8. Automatically check links

Use the following script to check whether the links in documentations are valid:

Expand Down
2 changes: 1 addition & 1 deletion .dev_scripts/job_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pygments.util import ClassNotFound
from simple_term_menu import TerminalMenu

CACHE_DIR = '~/.task_watcher'
CACHE_DIR = osp.join(osp.abspath('~'), '.task_watcher')


def show_job_out(name, root, job_name_list):
Expand Down
29 changes: 23 additions & 6 deletions .dev_scripts/train_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from rich.syntax import Syntax
from rich.table import Table
from tqdm import tqdm
from utils import filter_jobs, parse_job_list_from_file

console = Console()
MMEDIT_ROOT = Path(__file__).absolute().parents[1]
Expand Down Expand Up @@ -91,8 +92,13 @@ def parse_args():
parser.add_argument('--skip', type=str, default=None)
parser.add_argument('--skip-list', default=None)
parser.add_argument('--rerun', type=str, default=None)
parser.add_argument(
'--rerun-fail', action='store_true', help='only rerun failed tasks')
parser.add_argument(
'--rerun-cancel', action='store_true', help='only rerun cancel tasks')
parser.add_argument('--rerun-list', default=None)
parser.add_argument('--gpus-per-job', type=int, default=None)
parser.add_argument('--cpus-per-job', type=int, default=16)
parser.add_argument(
'--amp', action='store_true', help='Whether to use amp.')
parser.add_argument(
Expand Down Expand Up @@ -145,11 +151,22 @@ def parse_args():
args.skip_list = skip_list
print('skip_list: ', args.skip_list)
elif args.rerun is not None:
with open(args.rerun, 'r') as fp:
rerun_list = fp.readlines()
rerun_list = [j.split('\n')[0] for j in rerun_list]
args.rerun_list = rerun_list
print('rerun_list: ', args.rerun_list)
job_id_list_full, job_name_list_full = parse_job_list_from_file(
args.rerun)
filter_target = []

if args.rerun_fail:
filter_target += ['FAILED']
if args.rerun_cancel:
filter_target += ['CANCELLED']

_, job_name_list = filter_jobs(
job_id_list_full,
job_name_list_full,
filter_target,
show_table=True,
table_name='Rerun List')
args.rerun_list = job_name_list

return args

Expand Down Expand Up @@ -222,7 +239,7 @@ def create_train_job_batch(commands, model_info, args, port, script_name):
job_script += (f'#SBATCH --gres=gpu:{n_gpus}\n'
f'#SBATCH --ntasks-per-node={min(n_gpus, 8)}\n'
f'#SBATCH --ntasks={n_gpus}\n'
f'#SBATCH --cpus-per-task=5\n\n')
f'#SBATCH --cpus-per-task={args.cpus_per_job}\n\n')
else:
job_script += '\n\n' + 'export CUDA_VISIBLE_DEVICES=-1\n'

Expand Down
118 changes: 118 additions & 0 deletions .dev_scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
import os.path as osp
from typing import Tuple

from rich import print as pprint
from rich.table import Table


def parse_job_list(job_list) -> Tuple[list, list]:
"""Parse task name and job id from list. All elements in `job_list` must.
be formatted as `JOBID @ JOBNAME`.
Args:
job_list (list[str]): Job list.
Returns:
Tuple[list, list]: Job ID list and Job name list.
"""
assert all([
' @ ' in job for job in job_list
]), ('Each line of job list must be formatted like \'JOBID @ JOBNAME\'.')
job_id_list, job_name_list = [], []
for job_info in job_list:
job_id, job_name = job_info.split(' @ ')
job_id_list.append(job_id)
job_name_list.append(job_name)
return job_id_list, job_name_list


def parse_job_list_from_file(job_list_file: str) -> Tuple[list, list]:
"""Parse job list from file and return a tuple contains list of job id and
job name.
Args:
job_list_file (str): The path to the file list.
Returns:
Tuple[list, list]: A tuple contains list of job id and job name.
"""
if not osp.exists(job_list_file):
return False
with open(job_list_file, 'r') as file:
job_list = [job.strip() for job in file.readlines()]
return parse_job_list(job_list)


def get_info_from_id(job_id: str) -> dict:
"""Get the basic information of a job id with `swatch examine` command.
Args:
job_id (str): The ID of the job.
Returns:
dict: A dict contains information of the corresponding job id.
"""
# NOTE: do not have exception handling here
info_stream = os.popen(f'swatch examine {job_id}')
info_str = [line.strip() for line in info_stream.readlines()]
status_info = info_str[2].split()
try:
status_dict = {
'JobID': status_info[0],
'JobName': status_info[1],
'Partition': status_info[2],
'NNodes': status_info[3],
'AllocCPUS': status_info[4],
'State': status_info[5]
}
except Exception:
print(job_id)
print(info_str)
return status_dict


def filter_jobs(job_id_list: list,
job_name_list: list,
select: list = ['FAILED'],
show_table: bool = False,
table_name: str = 'Filter Results') -> Tuple[list, list]:
"""Filter the job which status not belong to :attr:`select`.
Args:
job_id_list (list): The list of job ids.
job_name_list (list): The list of job names.
select (list, optional): Which kind of jobs will be selected.
Defaults to ['FAILED'].
show_table (bool, optional): Whether display the filter result in a
table. Defaults to False.
table_name (str, optional): The name of the table. Defaults to
'Filter Results'.
Returns:
Tuple[list]: A tuple contains selected job ids and job names.
"""
# if ignore is not passed, return the original id list and name list
if not select:
return job_id_list, job_name_list
filtered_id_list, filtered_name_list = [], []
job_info_list = []
for id_, name_ in zip(job_id_list, job_name_list):
info = get_info_from_id(id_)
job_info_list.append(info)
if info['State'] in select:
filtered_id_list.append(id_)
filtered_name_list.append(name_)

if show_table:
filter_table = Table(title=table_name)
for field in ['Name', 'ID', 'State', 'Is Selected']:
filter_table.add_column(field)
for id_, name_, info_ in zip(job_id_list, job_name_list,
job_info_list):
selected = '[green]True' \
if info_['State'] in select else '[red]False'
filter_table.add_row(name_, id_, info_['State'], selected)
pprint(filter_table)
return filtered_id_list, filtered_name_list

0 comments on commit 1ca720d

Please sign in to comment.