From c533f674951b6112abab60098cd16b24228d8bd3 Mon Sep 17 00:00:00 2001 From: Zheng Date: Tue, 25 May 2021 18:10:33 +0800 Subject: [PATCH 1/5] explicitly set the graph format. --- python/dgl/distributed/dist_context.py | 5 ++++- python/dgl/distributed/dist_graph.py | 14 ++++++++++---- tools/launch.py | 5 +++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/python/dgl/distributed/dist_context.py b/python/dgl/distributed/dist_context.py index e68106e933dd..2c929e80468f 100644 --- a/python/dgl/distributed/dist_context.py +++ b/python/dgl/distributed/dist_context.py @@ -93,11 +93,14 @@ def initialize(ip_config, num_servers=1, num_workers=0, 'Please define DGL_NUM_CLIENT to run DistGraph server' assert os.environ.get('DGL_CONF_PATH') is not None, \ 'Please define DGL_CONF_PATH to run DistGraph server' + formats = os.environ.get('DGL_GRAPH_FORMAT', 'csc') + formats = [f.strip() for f in formats] serv = DistGraphServer(int(os.environ.get('DGL_SERVER_ID')), os.environ.get('DGL_IP_CONFIG'), int(os.environ.get('DGL_NUM_SERVER')), int(os.environ.get('DGL_NUM_CLIENT')), - os.environ.get('DGL_CONF_PATH')) + os.environ.get('DGL_CONF_PATH'), + graph_format=formats) serv.start() sys.exit() else: diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 55c9aeba8a69..db4564039c18 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -60,8 +60,8 @@ def __getstate__(self): def __setstate__(self, state): self._graph_name = state -def _copy_graph_to_shared_mem(g, graph_name): - new_g = g.shared_memory(graph_name, formats='csc') +def _copy_graph_to_shared_mem(g, graph_name, graph_format): + new_g = g.shared_memory(graph_name, formats=graph_format) # We should share the node/edge data to the client explicitly instead of putting them # in the KVStore because some of the node/edge data may be duplicated. new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'], @@ -289,9 +289,12 @@ class DistGraphServer(KVServer): The path of the config file generated by the partition tool. disable_shared_mem : bool Disable shared memory. + graph_format : str or list of str + The graph formats. ''' def __init__(self, server_id, ip_config, num_servers, - num_clients, part_config, disable_shared_mem=False): + num_clients, part_config, disable_shared_mem=False, + graph_format='csc'): super(DistGraphServer, self).__init__(server_id=server_id, ip_config=ip_config, num_servers=num_servers, @@ -307,8 +310,11 @@ def __init__(self, server_id, ip_config, num_servers, self.client_g, node_feats, edge_feats, self.gpb, graph_name, \ ntypes, etypes = load_partition(part_config, self.part_id) print('load ' + graph_name) + # Create the graph formats specified the users. + self.client_g = self.client_g.formats(graph_format) + self.client_g.create_formats_() if not disable_shared_mem: - self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name) + self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name, graph_format) if not disable_shared_mem: self.gpb.shared_memory(graph_name) diff --git a/tools/launch.py b/tools/launch.py index 8c930c5affb0..ac6cd1180f63 100644 --- a/tools/launch.py +++ b/tools/launch.py @@ -185,6 +185,7 @@ def submit_jobs(args, udf_command): client_cmd = client_cmd + ' ' + 'OMP_NUM_THREADS=' + str(args.num_omp_threads) if os.environ.get('PYTHONPATH') is not None: client_cmd = client_cmd + ' ' + 'PYTHONPATH=' + os.environ.get('PYTHONPATH') + client_cmd = client_cmd + ' ' + 'DGL_GRAPH_FORMAT=' + str(args.graph_format) torch_cmd = '-m torch.distributed.launch' torch_cmd = torch_cmd + ' ' + '--nproc_per_node=' + str(args.num_trainers) @@ -248,6 +249,10 @@ def main(): help='The number of OMP threads in the server process. \ It should be small if server processes and trainer processes run on \ the same machine. By default, it is 1.') + parser.add_argument('--graph_format', type=str, default='csc', + help='The format of the graph structure of each partition. \ + The allowed formats are csr, csc and coo. A user can specify multiple + formats, separated by ",". For example, the graph format is "csr,csc".') args, udf_command = parser.parse_known_args() assert len(udf_command) == 1, 'Please provide user command line.' assert args.num_trainers is not None and args.num_trainers > 0, \ From 6f3422254abf5ca11e026f8dbaed6b26174db568 Mon Sep 17 00:00:00 2001 From: Da Zheng Date: Wed, 26 May 2021 04:38:37 +0000 Subject: [PATCH 2/5] fix. --- tests/distributed/test_distributed_sampling.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index e2de1445f34d..67e95ac10bbf 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -16,9 +16,10 @@ from dgl.distributed import DistGraphServer, DistGraph -def start_server(rank, tmpdir, disable_shared_mem, graph_name): +def start_server(rank, tmpdir, disable_shared_mem, graph_name, graph_format='csc'): g = DistGraphServer(rank, "rpc_ip_config.txt", 1, 1, - tmpdir / (graph_name + '.json'), disable_shared_mem=disable_shared_mem) + tmpdir / (graph_name + '.json'), disable_shared_mem=disable_shared_mem, + graph_format=graph_format) g.start() @@ -102,7 +103,8 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server): pserver_list = [] ctx = mp.get_context('spawn') for i in range(num_server): - p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_find_edges')) + p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, + 'test_find_edges', ['csr', 'coo'])) p.start() time.sleep(1) pserver_list.append(p) From 9d9cdac5e898d3675c15b2d78a02b17a810a5f74 Mon Sep 17 00:00:00 2001 From: Da Zheng Date: Wed, 26 May 2021 05:24:20 +0000 Subject: [PATCH 3/5] fix. --- python/dgl/distributed/dist_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dgl/distributed/dist_context.py b/python/dgl/distributed/dist_context.py index 2c929e80468f..cb92448fcce4 100644 --- a/python/dgl/distributed/dist_context.py +++ b/python/dgl/distributed/dist_context.py @@ -93,7 +93,7 @@ def initialize(ip_config, num_servers=1, num_workers=0, 'Please define DGL_NUM_CLIENT to run DistGraph server' assert os.environ.get('DGL_CONF_PATH') is not None, \ 'Please define DGL_CONF_PATH to run DistGraph server' - formats = os.environ.get('DGL_GRAPH_FORMAT', 'csc') + formats = os.environ.get('DGL_GRAPH_FORMAT', 'csc').split(',') formats = [f.strip() for f in formats] serv = DistGraphServer(int(os.environ.get('DGL_SERVER_ID')), os.environ.get('DGL_IP_CONFIG'), From f9c8e269cf7dc5b09450eea74f59b846b8a088c1 Mon Sep 17 00:00:00 2001 From: Da Zheng Date: Wed, 26 May 2021 08:12:28 +0000 Subject: [PATCH 4/5] fix launch script. --- tools/launch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/launch.py b/tools/launch.py index ac6cd1180f63..b682dd52e134 100644 --- a/tools/launch.py +++ b/tools/launch.py @@ -167,12 +167,14 @@ def submit_jobs(args, udf_command): server_cmd = server_cmd + ' ' + 'DGL_CONF_PATH=' + str(args.part_config) server_cmd = server_cmd + ' ' + 'DGL_IP_CONFIG=' + str(args.ip_config) server_cmd = server_cmd + ' ' + 'DGL_NUM_SERVER=' + str(args.num_servers) + server_cmd = server_cmd + ' ' + 'DGL_GRAPH_FORMAT=' + str(args.graph_format) for i in range(len(hosts)*server_count_per_machine): ip, _ = hosts[int(i / server_count_per_machine)] cmd = server_cmd + ' ' + 'DGL_SERVER_ID=' + str(i) cmd = cmd + ' ' + udf_command cmd = 'cd ' + str(args.workspace) + '; ' + cmd execute_remote(cmd, ip, args.ssh_port, thread_list) + # launch client tasks client_cmd = 'DGL_DIST_MODE="distributed" DGL_ROLE=client DGL_NUM_SAMPLER=' + str(args.num_samplers) client_cmd = client_cmd + ' ' + 'DGL_NUM_CLIENT=' + str(tot_num_clients) @@ -251,7 +253,7 @@ def main(): the same machine. By default, it is 1.') parser.add_argument('--graph_format', type=str, default='csc', help='The format of the graph structure of each partition. \ - The allowed formats are csr, csc and coo. A user can specify multiple + The allowed formats are csr, csc and coo. A user can specify multiple \ formats, separated by ",". For example, the graph format is "csr,csc".') args, udf_command = parser.parse_known_args() assert len(udf_command) == 1, 'Please provide user command line.' From 8d17e2f7f0575a1fa4991cf558762de5375aa459 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 26 May 2021 09:02:08 +0000 Subject: [PATCH 5/5] fix readme. --- examples/pytorch/graphsage/experimental/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/pytorch/graphsage/experimental/README.md b/examples/pytorch/graphsage/experimental/README.md index 29d271040e39..14a5ab771d0f 100644 --- a/examples/pytorch/graphsage/experimental/README.md +++ b/examples/pytorch/graphsage/experimental/README.md @@ -135,6 +135,7 @@ python3 ~/workspace/dgl/tools/launch.py \ --num_servers 1 \ --part_config data/ogb-product.json \ --ip_config ip_config.txt \ +--graph_format csc,coo \ "python3 train_dist_unsupervised.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000" ``` @@ -183,6 +184,7 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt --num_servers 1 \ --part_config data/ogb-product.json \ --ip_config ip_config.txt \ +--graph_format csc,coo \ "python3 train_dist_unsupervised_transductive.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000 --num_gpus 4" ``` @@ -194,6 +196,7 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt --num_servers 1 \ --part_config data/ogb-product.json \ --ip_config ip_config.txt \ +--graph_format csc,coo \ "python3 train_dist_unsupervised_transductive.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000 --num_gpus 4 --dgl_sparse" ```