From ad3a40c1a11a70c2f6d976513f952d1c36d11d6a Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Tue, 27 Aug 2024 16:17:55 -0400 Subject: [PATCH] fabtests: multinode test updates Made a few updates to the multinode test: 1. accept a -x flag to turn off setting the service/node/flags - this is needed to work with CXI 2. accept a -u flag to set a process manager: pmi or pmix 3. modify the code to get the rank from the appropriate environment variable if a process manager is specified. 4. Add a runmultinode.py script which enables users to run the test using a backing process manager. The python script takes a YAML configuration file which defines the environment and test. An example python configuration file: multinode: environment: FI_MR_CACHE_MAX_SIZE: -1 FI_MR_CACHE_MAX_COUNT: 524288 FI_SHM_USE_XPMEM: 1 FI_LOG_LEVEL: info bind-to: core map-by-count: 1 map-by: l3cache pattern: full_mesh Script Usage: usage: runmultinode.py [-h] [--dry-run] [--ci CI] [-C CAPABILITY] [-i ITERATIONS] [-l {internal,srun,mpirun}] [-p PROVIDER] [-np NUM_PROCS] [-c CONFIG] [-t PROCS_PER_NODE] libfabric multinode test with slurm optional arguments: -h, --help show this help message and exit --dry-run Perform a dry run without making any changes. --ci CI Commands to prepend to test call. Only used with the internal launcher option -C CAPABILITY, --capability CAPABILITY libfabric capability -i ITERATIONS, --iterations ITERATIONS Number of iterations -l {internal,srun,mpirun}, --launcher {internal,srun,mpirun} launcher to use for running job. If nothing is specified, test manages processes internally. Available options: internal, srun and mpirun Required arguments: -p PROVIDER, --provider PROVIDER libfabric provider -np NUM_PROCS, --num-procs NUM_PROCS Map process by node, l3cache, etc -c CONFIG, --config CONFIG Test configuration Required if using srun: -t PROCS_PER_NODE, --procs-per-node PROCS_PER_NODE Number of procs per node Running the script: runmultinode.py -p cxi -i 1 --procs-per-node 8 --num-procs 8 -l srun -c mn.yaml Signed-off-by: Amir Shehata --- fabtests/Makefile.am | 1 + fabtests/common/shared.c | 1 + fabtests/include/shared.h | 2 +- fabtests/multinode/include/core.h | 32 +++++ fabtests/multinode/src/core.c | 12 +- fabtests/multinode/src/harness.c | 105 ++++++++++++--- fabtests/multinode/src/timing.c | 10 +- fabtests/scripts/runmultinode.py | 209 ++++++++++++++++++++++++++++++ 8 files changed, 339 insertions(+), 33 deletions(-) create mode 100644 fabtests/scripts/runmultinode.py diff --git a/fabtests/Makefile.am b/fabtests/Makefile.am index 9a33fedcabe..ac193830365 100644 --- a/fabtests/Makefile.am +++ b/fabtests/Makefile.am @@ -87,6 +87,7 @@ dist_bin_SCRIPTS = \ scripts/runfabtests.sh \ scripts/runfabtests.py \ scripts/runmultinode.sh \ + scripts/runmultinode.py \ scripts/rft_yaml_to_junit_xml dist_noinst_SCRIPTS = \ diff --git a/fabtests/common/shared.c b/fabtests/common/shared.c index af163e282bf..e4d45c46e42 100644 --- a/fabtests/common/shared.c +++ b/fabtests/common/shared.c @@ -1974,6 +1974,7 @@ int ft_read_addr_opts(char **node, char **service, struct fi_info *hints, if (opts->options & FT_OPT_ADDR_IS_OOB) { *service = NULL; *node = NULL; + *flags = 0; } else if (opts->address_format == FI_ADDR_STR) { /* We are likely dealing with a provider specific address format. * I.e. NOT an IP address or host name diff --git a/fabtests/include/shared.h b/fabtests/include/shared.h index d0c0375f219..ae35106b5b8 100644 --- a/fabtests/include/shared.h +++ b/fabtests/include/shared.h @@ -53,7 +53,7 @@ extern "C" { #endif #ifndef FT_FIVERSION -#define FT_FIVERSION FI_VERSION(1,20) +#define FT_FIVERSION FI_VERSION(1,21) #endif #include "ft_osd.h" diff --git a/fabtests/multinode/include/core.h b/fabtests/multinode/include/core.h index db6f3b27efd..094f54603d9 100644 --- a/fabtests/multinode/include/core.h +++ b/fabtests/multinode/include/core.h @@ -32,8 +32,11 @@ #pragma once +#include +#include #include #include +#include #include #include @@ -56,6 +59,12 @@ enum multi_pattern { PATTERN_BROADCAST, }; +enum multi_pm_type { + PM_NONE, + PM_PMIX, + PM_PMI, +}; + struct multi_xfer_method { char* name; int (*send)(void); @@ -77,6 +86,7 @@ struct pm_job_info { fi_addr_t *fi_addrs; enum multi_xfer transfer_method; enum multi_pattern pattern; + enum multi_pm_type pm; }; struct multinode_xfer_state { @@ -106,6 +116,28 @@ static inline int timer_index(int iter, int dest_rank) return iter * pm_job.num_ranks + dest_rank; } +#ifndef _WIN32 +static inline void log_print(FILE *f, char *fmt, ...) +{ + va_list args; + + if (!pm_job.clients) + return; + va_start(args, fmt); + vfprintf(f, fmt, args); + va_end(args); +} + +#define PRINTF(fmt, args...) log_print(stdout, fmt, ## args) +#define EPRINTF(fmt, args...) log_print(stderr, fmt, ## args) + +#else + +#define PRINTF(fmt, ...) fprintf(stdout, fmt, __VA_ARGS__) +#define EPRINTF(fmt, ...) fprintf(stderr, fmt, __VA_ARGS__) + +#endif /* _WIN32 */ + int multinode_run_tests(int argc, char **argv); int pm_allgather(void *my_item, void *items, int item_size); ssize_t socket_send(int sock, void *buf, size_t len, int flags); diff --git a/fabtests/multinode/src/core.c b/fabtests/multinode/src/core.c index 3fec8b79f54..dbc21cc42e6 100644 --- a/fabtests/multinode/src/core.c +++ b/fabtests/multinode/src/core.c @@ -500,14 +500,14 @@ int multinode_run_tests(int argc, char **argv) timers = calloc(opts.iterations * pm_job.num_ranks, sizeof(*timers)); if (pm_job.pattern != -1) { - printf("starting %s... ", patterns[pm_job.pattern].name); + PRINTF("starting %s... ", patterns[pm_job.pattern].name); pattern = &patterns[pm_job.pattern]; ret = multi_run_test(); if (ret) { - printf("failed\n"); + PRINTF("failed\n"); goto out; } - printf("passed\n"); + PRINTF("passed\n"); if (ft_check_opts(FT_OPT_PERF)) { ret = multi_timer_analyze(timers, opts.iterations * @@ -519,14 +519,14 @@ int multinode_run_tests(int argc, char **argv) } else { for (i = 0; i < NUM_TESTS && !ret; i++) { - printf("starting %s... ", patterns[i].name); + PRINTF("starting %s... ", patterns[i].name); pattern = &patterns[i]; ret = multi_run_test(); if (ret) { - printf("failed\n"); + PRINTF("failed\n"); goto out; } - printf("passed\n"); + PRINTF("passed\n"); if (ft_check_opts(FT_OPT_PERF)) { ret = multi_timer_analyze(timers, opts.iterations diff --git a/fabtests/multinode/src/harness.c b/fabtests/multinode/src/harness.c index 0bb3ac97991..0df5417521e 100644 --- a/fabtests/multinode/src/harness.c +++ b/fabtests/multinode/src/harness.c @@ -57,6 +57,17 @@ static enum multi_xfer parse_caps(char *caps) } } +static enum multi_pm_type parse_pm(char *pm) +{ + if (strcmp(pm, "pmix") == 0) { + return PM_PMIX; + } else if (strcmp(pm, "pmi") == 0) { + return PM_PMI; + } + + return PM_NONE; +} + static enum multi_pattern parse_pattern(char *pattern) { if (strcmp(pattern, "full_mesh") == 0) { @@ -155,8 +166,34 @@ void pm_barrier(void) pm_allgather(&ch, chs, 1); } -static int pm_init_ranks(void) +static int pm_get_rank(char *env_name, size_t *rank) { + char *rank_str; + + rank_str = getenv(env_name); + + if (!rank_str) + return -FI_EINVAL; + + *rank = atoi(rank_str); + + return FI_SUCCESS; +} + +static int pm_init_ranks_pm(void) +{ + int ret = -FI_EINVAL; + + if (pm_job.pm == PM_PMIX) + return pm_get_rank("PMIX_RANK", &pm_job.my_rank); + else if (pm_job.pm == PM_PMI) + return pm_get_rank("PMI_RANK", &pm_job.my_rank); + + return ret; +} + + static int pm_init_ranks(void) + { int ret; int i; size_t send_rank; @@ -165,17 +202,18 @@ static int pm_init_ranks(void) for (i = 0; i < pm_job.num_ranks-1; i++) { send_rank = i + 1; ret = socket_send(pm_job.clients[i], &send_rank, - sizeof(send_rank), 0); + sizeof(send_rank), 0); if (ret < 0) break; } } else { ret = socket_recv(pm_job.sock, &(pm_job.my_rank), - sizeof(pm_job.my_rank), 0); + sizeof(pm_job.my_rank), 0); } return ret; -} + } + static int server_connect(void) { @@ -211,10 +249,11 @@ static int server_connect(void) return new_sock; } -static int pm_conn_setup(void) +static int pm_conn_setup(bool pm) { int sock, ret; int optval = 1; + bool bound = false; sock = socket(pm_job.oob_server_addr.ss_family, SOCK_STREAM, 0); if (sock < 0) @@ -234,11 +273,15 @@ static int pm_conn_setup(void) } #endif - ret = bind(sock, (struct sockaddr *)&pm_job.oob_server_addr, - pm_job.server_addr_len); - if (ret == 0) { - ret = server_connect(); - } else { + if (!pm || (pm && pm_job.my_rank == 0)) { + ret = bind(sock, (struct sockaddr *)&pm_job.oob_server_addr, + pm_job.server_addr_len); + if (ret == 0) { + ret = server_connect(); + bound = true; + } + } + if ((!pm && !bound) || (pm && pm_job.my_rank != 0)) { opts.dst_addr = opts.src_addr; opts.dst_port = opts.src_port; opts.src_addr = NULL; @@ -316,12 +359,13 @@ int main(int argc, char **argv) pm_job.clients = NULL; pm_job.pattern = -1; + pm_job.pm = PM_NONE; hints = fi_allocinfo(); if (!hints) return EXIT_FAILURE; - while ((c = getopt(argc, argv, "n:x:z:Ths:I:" INFO_OPTS)) != -1) { + while ((c = getopt(argc, argv, "n:x:z:u:Ths:I:" INFO_OPTS)) != -1) { switch (c) { default: ft_parse_addr_opts(c, optarg, &opts); @@ -343,6 +387,10 @@ int main(int argc, char **argv) case 'z': pm_job.pattern = parse_pattern(optarg); break; + case 'u': + /* setup the process manager type */ + pm_job.pm = parse_pm(optarg); + break; case '?': case 'h': fprintf(stderr, "Usage:\n"); @@ -369,9 +417,10 @@ int main(int argc, char **argv) FT_PRINT_OPTS_USAGE("-d ", "domain name"); FT_PRINT_OPTS_USAGE("-p ", "specific provider name eg sockets, verbs"); + FT_PRINT_OPTS_USAGE("-a", "do not use local address"); ft_addr_usage(); ft_hmem_usage(); - + return EXIT_FAILURE; } } @@ -384,16 +433,30 @@ int main(int argc, char **argv) if (ret) goto err1; - ret = pm_conn_setup(); - if (ret) { - FT_ERR("connection setup failed\n"); - goto err1; - } + if (pm_job.pm != PM_NONE) { + ret = pm_init_ranks_pm(); + if (ret < 0) { + FT_ERR("rank initialization failed\n"); + goto err1; + } - ret = pm_init_ranks(); - if (ret < 0) { - FT_ERR("rank initialization failed\n"); - goto err2; + ret = pm_conn_setup(true); + if (ret) { + FT_ERR("connection setup failed\n"); + goto err1; + } + } else { + ret = pm_conn_setup(false); + if (ret) { + FT_ERR("connection setup failed\n"); + goto err1; + } + + ret = pm_init_ranks(); + if (ret < 0) { + FT_ERR("rank initialization failed\n"); + goto err2; + } } FT_DEBUG("OOB job setup done\n"); diff --git a/fabtests/multinode/src/timing.c b/fabtests/multinode/src/timing.c index eee541adf51..7f8913a207d 100644 --- a/fabtests/multinode/src/timing.c +++ b/fabtests/multinode/src/timing.c @@ -56,7 +56,7 @@ void multi_timer_stop(struct multi_timer *timer) static inline void print_timer(struct multi_timer timer, char* info) { - printf("rank: %i, start: %ld, end: %ld, %s\n", + PRINTF("rank: %i, start: %ld, end: %ld, %s\n", timer.rank, timer.start, timer.end, info); } @@ -83,7 +83,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count) goto out; } - printf("%-10s %16s %16s %16s %16s\n", + PRINTF("%-10s %16s %16s %16s %16s\n", "Iteration", "Min Send (ns)", "Max Send (ns)", "Pattern Time(ns)", "Average Send(ns)"); @@ -114,7 +114,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count) min[i] = duration; } - printf("%-10i %16ld %16ld %16ld %16.3f\n", + PRINTF("%-10i %16ld %16ld %16ld %16.3f\n", i, min[i], max[i], last_end[i] - first_start[i], sum_time[i] / iter_timer_count); @@ -128,7 +128,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count) } if (pm_job.my_rank == 0) - printf("%-10s %16.3lf %16.3lf %16.3lf %16.3lf\n", "Average", + PRINTF("%-10s %16.3lf %16.3lf %16.3lf %16.3lf\n", "Average", total_min / iterations, total_max / iterations, total_duration / iterations, total_sum_time / total_timers); @@ -189,7 +189,7 @@ int multi_timer_iter_gather(struct multi_timer *gather_timers, ret = multi_timer_gather(gather_timers, iter_timers, pm_job.num_ranks); if (ret < 0) - printf("gather timer error: %i\n", ret); + PRINTF("gather timer error: %i\n", ret); pm_barrier(); diff --git a/fabtests/scripts/runmultinode.py b/fabtests/scripts/runmultinode.py new file mode 100644 index 00000000000..a8c836f532e --- /dev/null +++ b/fabtests/scripts/runmultinode.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 + +import argparse, builtins, os, sys, yaml, socket + +def parse_args(): + parser = argparse.ArgumentParser(description="libfabric multinode test with slurm") + parser.add_argument('--dry-run', action='store_true', help='Perform a dry run without making any changes.') + parser.add_argument("--ci", type=str, help="Commands to prepend to test call. Only used with the internal launcher option", default="") + parser.add_argument("-C", "--capability", type=str, help="libfabric capability", default="msg") + parser.add_argument("-i", "--iterations", type=int , help="Number of iterations", default=1) + parser.add_argument("-l", "--launcher", type=str, choices=['internal', 'srun', 'mpirun'], help="launcher to use for running job. If nothing is specified, test manages processes internally. Available options: internal, srun and mpirun", default="internal") + + required = parser.add_argument_group("Required arguments") + required.add_argument("-p", "--provider", type=str, help="libfabric provider") + required.add_argument("-np", "--num-procs", type=int, help="Map process by node, l3cache, etc") + required.add_argument("-c", "--config", type=str, help="Test configuration") + + srun_required = parser.add_argument_group("Required if using srun") + srun_required.add_argument("-t", "--procs-per-node", type=int, + help="Number of procs per node", default=-1) + + args = parser.parse_args() + return args, parser + +def parse_config(config): + with open(config, "r") as f: + yconf = yaml.load(f, Loader=yaml.FullLoader) + return yconf + +def mpi_env(config): + env = config['environment'] + result = [] + for k in env.keys(): + result.append(f"-x {k}") + return " ".join(result) + +def set_env(config): + env = config['environment'] + for k, v in env.items(): + os.environ[k] = str(v) + +def mpi_mca_params(config): + try: + mca = config['mca'] + result = [] + for k, v in env.items(): + result.append(f"--mca {k} {v}") + return " ".join(result) + except: + return "" + +def mpi_bind_to(config): + try: + return f"--bind-to {config['bind-to']}" + except: + return "--bind-to core" + +def mpi_map_by(config): + try: + return f"--map-by ppr:{config['map-by-count']}:{config['map-by']}" + except: + return "--map-by ppr:1:l3" + +def execute_cmd(cmd, dry_run): + script_dir = os.path.dirname(os.path.abspath(__file__)) + sys.path.append(script_dir) + from command import Command + cmd = Command(cmd, fake=dry_run) + rc, out = cmd.exec_cmd() + return rc, out + +def split_on_commas(expr): + l = [] + c = o = 0 + s = expr + stop = False + while not stop and c != -1 and o != -1: + o = s.find('[') + b = s.find(']') + c = s.find(',') + while c > o and c < b: + c = s.find(',', c+1) + if len(l): + l.pop() + if c < o or c > b: + l += [s[:s.find(',', c)], s[s.find(',', c)+1:]] + s = l[-1] + else: + l += s.split(',') + stop = True + for i in range(0, len(l)): + l[i] = l[i].strip() + return l + +def expand_host_list_sub(expr): + host_list = [] + + # first phase split on the commas first + open_br = expr.find('[') + close_br = expr.find(']', open_br) + if open_br == -1 and close_br == -1: + return expr.split(',') + + if open_br == -1 or close_br == -1: + return [] + + rangestr = expr[open_br+1 : close_br] + + node = expr[:open_br] + + ranges = rangestr.split(',') + + for r in ranges: + cur = r.split('-') + if len(cur) == 2: + pre = "{:0%dd}" % len(cur[0]) + for idx in range(int(cur[0]), int(cur[1])+1): + host_list.append(f'{node}{pre.format(idx)}') + elif len(cur) == 1: + pre = "{:0%dd}" % len(cur[0]) + host_list.append(f'{node}{pre.format(int(cur[0]))}') + + return host_list + +def expand_host_list(expr): + l = split_on_commas(expr) + host_list = [] + for e in l: + host_list += expand_host_list_sub(e) + return host_list + +supported_pm = ['pmi', 'pmi2', 'pmix'] + +def is_srun_pm_supported(): + rc, out = execute_cmd('srun --mpi=list', False) + if rc: + return False + input_list = out.split('\n') + cleaned_list = [entry.strip() for entry in input_list if entry.strip()] + for e in supported_pm: + if e in cleaned_list[1:]: + return True + return False + +if __name__ == '__main__': + + # list of providers which do not address specification + no_addr_prov = ['cxi'] + + args, parser = parse_args() + + if not args.config: + print("**A configuration file is required") + print(parser.format_help()) + exit(-1) + + mnode = parse_config(args.config)['multinode'] + set_env(mnode) + + if args.launcher == 'srun': + if not is_srun_pm_supported(): + print(f"**Supported process managers are: {','.join(supported_pm)}") + print(parser.format_help()) + exit(-1) + + # The script assumes it's already running in a SLURM allocation. It can + # then srun fi_multinode + # + if "pattern" not in mnode: + print("Test pattern must be defined in the YAML configuration file") + exit() + + if args.provider in no_addr_prov: + cmd = f"fi_multinode -n {args.num_procs} -s {socket.gethostname()} " \ + f"-p {args.provider} -C {args.capability} -z {mnode['pattern']} " \ + f"-I {args.iterations} -u {args.launcher.lower()} -E -T" + else: + cmd = f"fi_multinode -n {args.num_procs} -s {socket.gethostname()} " \ + f"-p {args.provider} -C {args.capability} -z '{mnode['pattern']}' " \ + f"-I {args.iterations} -u {args.launcher.lower()} -T" + + if args.launcher.lower() == 'mpirun': + mpi = f"mpirun {mpi_env(mnode)} {mpi_mca_params(mnode)} {mpi_bind_to(mnode)} "\ + f"{mpi_map_by(mnode)} -np {args.num_procs} {cmd}" + elif args.launcher.lower() == 'srun': + if args.procs_per_node == -1 or args.num_procs == -1: + print("**Need to specify --procs-per-node and --num-procs") + print(parser.format_help()) + exit() + mpi = f"srun --ntasks-per-node {args.procs_per_node} --ntasks {args.num_procs} "\ + f"{cmd}" + elif args.launcher.lower() == 'internal': + if args.procs_per_node == -1: + print("**Need to specify --procs-per-node") + print(parser.format_help()) + exit() + hl = ",".join(expand_host_list(os.environ['SLURM_NODELIST'])) + mpi = f"runmultinode.sh -h {hl} -n {args.procs_per_node} -p {args.provider} " \ + f"-C {args.capability} -I {args.iterations} -z {mnode['pattern']}" + if args.ci: + mpi += f" --ci '{args.ci}'" + else: + print("**Unsupported launcher") + print(parser.format_help()) + exit() + + rc, out = execute_cmd(mpi, args.dry_run) + + print(f"Command completed with {rc}\n{out}")