Skip to content

Commit

Permalink
fabtests: multinode test updates
Browse files Browse the repository at this point in the history
Made a few updates to the multinode test:

1. accept a -x flag to turn off setting the service/node/flags
  - this is needed to work with CXI
2. accept a -u flag to set a process manager: pmi or pmix
3. modify the code to get the rank from the appropriate environment
   variable if a process manager is specified.
4. Add a runmultinode.py script which enables users to run the test
   using a backing process manager. The python script takes a YAML
   configuration file which defines the environment and test. An example
   python configuration file:

multinode:
    environment:
        FI_MR_CACHE_MAX_SIZE: -1
        FI_MR_CACHE_MAX_COUNT: 524288
        FI_SHM_USE_XPMEM: 1
        FI_LOG_LEVEL: info
    bind-to: core
    map-by-count: 1
    map-by: l3cache
    pattern: full_mesh

Script Usage:
usage: runmultinode.py [-h] [--dry-run] [--ci CI] [-C CAPABILITY]
                       [-i ITERATIONS] [-l {internal,srun,mpirun}]
                       [-p PROVIDER] [-np NUM_PROCS] [-c CONFIG]
                       [-t PROCS_PER_NODE]

libfabric multinode test with slurm

optional arguments:
  -h, --help            show this help message and exit
  --dry-run             Perform a dry run without making any changes.
  --ci CI               Commands to prepend to test call. Only used with the
                        internal launcher option
  -C CAPABILITY, --capability CAPABILITY
                        libfabric capability
  -i ITERATIONS, --iterations ITERATIONS
                        Number of iterations
  -l {internal,srun,mpirun}, --launcher {internal,srun,mpirun}
                        launcher to use for running job. If nothing is
                        specified, test manages processes internally.
                        Available options: internal, srun and mpirun

Required arguments:
  -p PROVIDER, --provider PROVIDER
                        libfabric provider
  -np NUM_PROCS, --num-procs NUM_PROCS
                        Map process by node, l3cache, etc
  -c CONFIG, --config CONFIG
                        Test configuration

Required if using srun:
  -t PROCS_PER_NODE, --procs-per-node PROCS_PER_NODE
                        Number of procs per node

Running the script:

runmultinode.py -p cxi -i 1 --procs-per-node 8 --num-procs 8 -l srun -c mn.yaml

Signed-off-by: Amir Shehata <shehataa@ornl.gov>
  • Loading branch information
amirshehataornl authored and j-xiong committed Sep 4, 2024
1 parent 8477665 commit ad3a40c
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 33 deletions.
1 change: 1 addition & 0 deletions fabtests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ dist_bin_SCRIPTS = \
scripts/runfabtests.sh \
scripts/runfabtests.py \
scripts/runmultinode.sh \
scripts/runmultinode.py \
scripts/rft_yaml_to_junit_xml

dist_noinst_SCRIPTS = \
Expand Down
1 change: 1 addition & 0 deletions fabtests/common/shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,7 @@ int ft_read_addr_opts(char **node, char **service, struct fi_info *hints,
if (opts->options & FT_OPT_ADDR_IS_OOB) {
*service = NULL;
*node = NULL;
*flags = 0;
} else if (opts->address_format == FI_ADDR_STR) {
/* We are likely dealing with a provider specific address format.
* I.e. NOT an IP address or host name
Expand Down
2 changes: 1 addition & 1 deletion fabtests/include/shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ extern "C" {
#endif

#ifndef FT_FIVERSION
#define FT_FIVERSION FI_VERSION(1,20)
#define FT_FIVERSION FI_VERSION(1,21)
#endif

#include "ft_osd.h"
Expand Down
32 changes: 32 additions & 0 deletions fabtests/multinode/include/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@

#pragma once

#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>

#include <rdma/fabric.h>
#include <rdma/fi_trigger.h>
Expand All @@ -56,6 +59,12 @@ enum multi_pattern {
PATTERN_BROADCAST,
};

enum multi_pm_type {
PM_NONE,
PM_PMIX,
PM_PMI,
};

struct multi_xfer_method {
char* name;
int (*send)(void);
Expand All @@ -77,6 +86,7 @@ struct pm_job_info {
fi_addr_t *fi_addrs;
enum multi_xfer transfer_method;
enum multi_pattern pattern;
enum multi_pm_type pm;
};

struct multinode_xfer_state {
Expand Down Expand Up @@ -106,6 +116,28 @@ static inline int timer_index(int iter, int dest_rank)
return iter * pm_job.num_ranks + dest_rank;
}

#ifndef _WIN32
static inline void log_print(FILE *f, char *fmt, ...)
{
va_list args;

if (!pm_job.clients)
return;
va_start(args, fmt);
vfprintf(f, fmt, args);
va_end(args);
}

#define PRINTF(fmt, args...) log_print(stdout, fmt, ## args)
#define EPRINTF(fmt, args...) log_print(stderr, fmt, ## args)

#else

#define PRINTF(fmt, ...) fprintf(stdout, fmt, __VA_ARGS__)
#define EPRINTF(fmt, ...) fprintf(stderr, fmt, __VA_ARGS__)

#endif /* _WIN32 */

int multinode_run_tests(int argc, char **argv);
int pm_allgather(void *my_item, void *items, int item_size);
ssize_t socket_send(int sock, void *buf, size_t len, int flags);
Expand Down
12 changes: 6 additions & 6 deletions fabtests/multinode/src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,14 @@ int multinode_run_tests(int argc, char **argv)
timers = calloc(opts.iterations * pm_job.num_ranks, sizeof(*timers));

if (pm_job.pattern != -1) {
printf("starting %s... ", patterns[pm_job.pattern].name);
PRINTF("starting %s... ", patterns[pm_job.pattern].name);
pattern = &patterns[pm_job.pattern];
ret = multi_run_test();
if (ret) {
printf("failed\n");
PRINTF("failed\n");
goto out;
}
printf("passed\n");
PRINTF("passed\n");

if (ft_check_opts(FT_OPT_PERF)) {
ret = multi_timer_analyze(timers, opts.iterations *
Expand All @@ -519,14 +519,14 @@ int multinode_run_tests(int argc, char **argv)

} else {
for (i = 0; i < NUM_TESTS && !ret; i++) {
printf("starting %s... ", patterns[i].name);
PRINTF("starting %s... ", patterns[i].name);
pattern = &patterns[i];
ret = multi_run_test();
if (ret) {
printf("failed\n");
PRINTF("failed\n");
goto out;
}
printf("passed\n");
PRINTF("passed\n");

if (ft_check_opts(FT_OPT_PERF)) {
ret = multi_timer_analyze(timers, opts.iterations
Expand Down
105 changes: 84 additions & 21 deletions fabtests/multinode/src/harness.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ static enum multi_xfer parse_caps(char *caps)
}
}

static enum multi_pm_type parse_pm(char *pm)
{
if (strcmp(pm, "pmix") == 0) {
return PM_PMIX;
} else if (strcmp(pm, "pmi") == 0) {
return PM_PMI;
}

return PM_NONE;
}

static enum multi_pattern parse_pattern(char *pattern)
{
if (strcmp(pattern, "full_mesh") == 0) {
Expand Down Expand Up @@ -155,8 +166,34 @@ void pm_barrier(void)
pm_allgather(&ch, chs, 1);
}

static int pm_init_ranks(void)
static int pm_get_rank(char *env_name, size_t *rank)
{
char *rank_str;

rank_str = getenv(env_name);

if (!rank_str)
return -FI_EINVAL;

*rank = atoi(rank_str);

return FI_SUCCESS;
}

static int pm_init_ranks_pm(void)
{
int ret = -FI_EINVAL;

if (pm_job.pm == PM_PMIX)
return pm_get_rank("PMIX_RANK", &pm_job.my_rank);
else if (pm_job.pm == PM_PMI)
return pm_get_rank("PMI_RANK", &pm_job.my_rank);

return ret;
}

static int pm_init_ranks(void)
{
int ret;
int i;
size_t send_rank;
Expand All @@ -165,17 +202,18 @@ static int pm_init_ranks(void)
for (i = 0; i < pm_job.num_ranks-1; i++) {
send_rank = i + 1;
ret = socket_send(pm_job.clients[i], &send_rank,
sizeof(send_rank), 0);
sizeof(send_rank), 0);
if (ret < 0)
break;
}
} else {
ret = socket_recv(pm_job.sock, &(pm_job.my_rank),
sizeof(pm_job.my_rank), 0);
sizeof(pm_job.my_rank), 0);
}

return ret;
}
}


static int server_connect(void)
{
Expand Down Expand Up @@ -211,10 +249,11 @@ static int server_connect(void)
return new_sock;
}

static int pm_conn_setup(void)
static int pm_conn_setup(bool pm)
{
int sock, ret;
int optval = 1;
bool bound = false;

sock = socket(pm_job.oob_server_addr.ss_family, SOCK_STREAM, 0);
if (sock < 0)
Expand All @@ -234,11 +273,15 @@ static int pm_conn_setup(void)
}
#endif

ret = bind(sock, (struct sockaddr *)&pm_job.oob_server_addr,
pm_job.server_addr_len);
if (ret == 0) {
ret = server_connect();
} else {
if (!pm || (pm && pm_job.my_rank == 0)) {
ret = bind(sock, (struct sockaddr *)&pm_job.oob_server_addr,
pm_job.server_addr_len);
if (ret == 0) {
ret = server_connect();
bound = true;
}
}
if ((!pm && !bound) || (pm && pm_job.my_rank != 0)) {
opts.dst_addr = opts.src_addr;
opts.dst_port = opts.src_port;
opts.src_addr = NULL;
Expand Down Expand Up @@ -316,12 +359,13 @@ int main(int argc, char **argv)

pm_job.clients = NULL;
pm_job.pattern = -1;
pm_job.pm = PM_NONE;

hints = fi_allocinfo();
if (!hints)
return EXIT_FAILURE;

while ((c = getopt(argc, argv, "n:x:z:Ths:I:" INFO_OPTS)) != -1) {
while ((c = getopt(argc, argv, "n:x:z:u:Ths:I:" INFO_OPTS)) != -1) {
switch (c) {
default:
ft_parse_addr_opts(c, optarg, &opts);
Expand All @@ -343,6 +387,10 @@ int main(int argc, char **argv)
case 'z':
pm_job.pattern = parse_pattern(optarg);
break;
case 'u':
/* setup the process manager type */
pm_job.pm = parse_pm(optarg);
break;
case '?':
case 'h':
fprintf(stderr, "Usage:\n");
Expand All @@ -369,9 +417,10 @@ int main(int argc, char **argv)
FT_PRINT_OPTS_USAGE("-d <domain>", "domain name");
FT_PRINT_OPTS_USAGE("-p <provider>",
"specific provider name eg sockets, verbs");
FT_PRINT_OPTS_USAGE("-a", "do not use local address");
ft_addr_usage();
ft_hmem_usage();

return EXIT_FAILURE;
}
}
Expand All @@ -384,16 +433,30 @@ int main(int argc, char **argv)
if (ret)
goto err1;

ret = pm_conn_setup();
if (ret) {
FT_ERR("connection setup failed\n");
goto err1;
}
if (pm_job.pm != PM_NONE) {
ret = pm_init_ranks_pm();
if (ret < 0) {
FT_ERR("rank initialization failed\n");
goto err1;
}

ret = pm_init_ranks();
if (ret < 0) {
FT_ERR("rank initialization failed\n");
goto err2;
ret = pm_conn_setup(true);
if (ret) {
FT_ERR("connection setup failed\n");
goto err1;
}
} else {
ret = pm_conn_setup(false);
if (ret) {
FT_ERR("connection setup failed\n");
goto err1;
}

ret = pm_init_ranks();
if (ret < 0) {
FT_ERR("rank initialization failed\n");
goto err2;
}
}

FT_DEBUG("OOB job setup done\n");
Expand Down
10 changes: 5 additions & 5 deletions fabtests/multinode/src/timing.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void multi_timer_stop(struct multi_timer *timer)

static inline void print_timer(struct multi_timer timer, char* info)
{
printf("rank: %i, start: %ld, end: %ld, %s\n",
PRINTF("rank: %i, start: %ld, end: %ld, %s\n",
timer.rank, timer.start, timer.end, info);
}

Expand All @@ -83,7 +83,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count)
goto out;
}

printf("%-10s %16s %16s %16s %16s\n",
PRINTF("%-10s %16s %16s %16s %16s\n",
"Iteration", "Min Send (ns)", "Max Send (ns)",
"Pattern Time(ns)", "Average Send(ns)");

Expand Down Expand Up @@ -114,7 +114,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count)
min[i] = duration;

}
printf("%-10i %16ld %16ld %16ld %16.3f\n",
PRINTF("%-10i %16ld %16ld %16ld %16.3f\n",
i, min[i], max[i], last_end[i] - first_start[i],
sum_time[i] / iter_timer_count);

Expand All @@ -128,7 +128,7 @@ int multi_timer_analyze(struct multi_timer *timers, int timer_count)
}

if (pm_job.my_rank == 0)
printf("%-10s %16.3lf %16.3lf %16.3lf %16.3lf\n", "Average",
PRINTF("%-10s %16.3lf %16.3lf %16.3lf %16.3lf\n", "Average",
total_min / iterations, total_max / iterations,
total_duration / iterations,
total_sum_time / total_timers);
Expand Down Expand Up @@ -189,7 +189,7 @@ int multi_timer_iter_gather(struct multi_timer *gather_timers,

ret = multi_timer_gather(gather_timers, iter_timers, pm_job.num_ranks);
if (ret < 0)
printf("gather timer error: %i\n", ret);
PRINTF("gather timer error: %i\n", ret);

pm_barrier();

Expand Down
Loading

0 comments on commit ad3a40c

Please sign in to comment.