From 9dc890bd52abf453053d3ff4b800d794baf171bb Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 22 Jul 2024 19:51:36 +0000 Subject: [PATCH] Change to using ProcessPoolExecutor --- python/dgl/distributed/partition.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index c77f895b2557..b0b37bc4719b 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1,13 +1,15 @@ """Functions for partitions. """ +import concurrent +import concurrent.futures import copy +from functools import partial import json import logging import os import time import numpy as np -from joblib import Parallel, delayed import torch @@ -1573,15 +1575,12 @@ def convert_partition(part_id, graph_formats): # Update graph path. # Iterate over partitions. - partition_paths = Parallel(n_jobs=min(num_parts, n_jobs))( - delayed(convert_partition)(part_id, graph_formats) for part_id in range(num_parts) - ) - - for part_id, part_path in enumerate(partition_paths): - new_part_meta[f"part-{part_id}"][ - "part_graph_graphbolt" - ] = part_path - + convert_with_format = partial(convert_partition, graph_formats=graph_formats) + with concurrent.futures.ProcessPoolExecutor(max_workers=min(num_parts, n_jobs)) as executor: + for part_id, part_path in enumerate(executor.map(convert_with_format, range(num_parts))): + new_part_meta[f"part-{part_id}"][ + "part_graph_graphbolt" + ] = part_path # Save dtype info into partition config. # [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more