Skip to content

Commit

Permalink
[GraphBolt] Allow using multiple processes for GraphBolt partition co…
Browse files Browse the repository at this point in the history
…nversion (#7497)
  • Loading branch information
thvasilo committed Jul 30, 2024
1 parent 0e00460 commit ceb6672
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 133 deletions.
342 changes: 210 additions & 132 deletions python/dgl/distributed/partition.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Functions for partitions. """

import concurrent
import concurrent.futures
import copy
import json
import logging
import multiprocessing as mp
import os
import time
from functools import partial

import numpy as np

Expand Down Expand Up @@ -1377,13 +1381,186 @@ def _cast_to_minimum_dtype(predicate, data, field=None):
return data


# Utility functions.
def is_homogeneous(ntypes, etypes):
"""Checks if the provided ntypes and etypes form a homogeneous graph."""
return len(ntypes) == 1 and len(etypes) == 1


def init_type_per_edge(graph, gpb):
"""Initialize edge ids for every edge type."""
etype_ids = gpb.map_to_per_etype(graph.edata[EID])[0]
return etype_ids


def gb_convert_single_dgl_partition(
part_id,
graph_formats,
part_config,
store_eids,
store_inner_node,
store_inner_edge,
):
"""Converts a single DGL partition to GraphBolt.
Parameters
----------
part_id : int
The numerical ID of the partition to convert.
graph_formats : str or list[str], optional
Save partitions in specified formats. It could be any combination of
`coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`,
it is not necessary to specify this argument. It's mainly for
specifying `coo` format to save edge ID mapping and destination node
IDs. If not specified, whether to save `coo` format is determined by
the availability of the format in DGL partitions. Default: None.
store_eids : bool, optional
Whether to store edge IDs in the new graph. Default: True.
store_inner_node : bool, optional
Whether to store inner node mask in the new graph. Default: False.
store_inner_edge : bool, optional
Whether to store inner edge mask in the new graph. Default: False.
"""
debug_mode = "DGL_DIST_DEBUG" in os.environ
if debug_mode:
dgl_warning(
"Running in debug mode which means all attributes of DGL partitions"
" will be saved to the new format."
)

part_meta = _load_part_config(part_config)
num_parts = part_meta["num_parts"]

graph, _, _, gpb, _, _, _ = load_partition(
part_config, part_id, load_feats=False
)
_, _, ntypes, etypes = load_partition_book(part_config, part_id)
is_homo = is_homogeneous(ntypes, etypes)
node_type_to_id = (
None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)}
)
edge_type_to_id = (
None
if is_homo
else {
gb.etype_tuple_to_str(etype): etid for etype, etid in etypes.items()
}
)
# Obtain CSC indtpr and indices.
indptr, indices, edge_ids = graph.adj_tensors("csc")

# Save node attributes. Detailed attributes are shown below.
# DGL_GB\Attributes dgl.NID("_ID") dgl.NTYPE("_TYPE") "inner_node" "part_id"
# DGL_Homograph ✅ 🚫 ✅ ✅
# GB_Homograph ✅ 🚫 optional 🚫
# DGL_Heterograph ✅ ✅ ✅ ✅
# GB_Heterograph ✅ 🚫 optional 🚫
required_node_attrs = [NID]
if store_inner_node:
required_node_attrs.append("inner_node")
if debug_mode:
required_node_attrs = list(graph.ndata.keys())
node_attributes = {attr: graph.ndata[attr] for attr in required_node_attrs}

# Save edge attributes. Detailed attributes are shown below.
# DGL_GB\Attributes dgl.EID("_ID") dgl.ETYPE("_TYPE") "inner_edge"
# DGL_Homograph ✅ 🚫 ✅
# GB_Homograph optional 🚫 optional
# DGL_Heterograph ✅ ✅ ✅
# GB_Heterograph optional ✅ optional
type_per_edge = None
if not is_homo:
type_per_edge = init_type_per_edge(graph, gpb)[edge_ids]
type_per_edge = type_per_edge.to(RESERVED_FIELD_DTYPE[ETYPE])
required_edge_attrs = []
if store_eids:
required_edge_attrs.append(EID)
if store_inner_edge:
required_edge_attrs.append("inner_edge")
if debug_mode:
required_edge_attrs = list(graph.edata.keys())
edge_attributes = {
attr: graph.edata[attr][edge_ids] for attr in required_edge_attrs
}
# When converting DGLGraph to FusedCSCSamplingGraph, edge IDs are
# re-ordered(actually FusedCSCSamplingGraph does not have edge IDs
# in nature). So we need to save such re-order info for any
# operations that uses original local edge IDs. For now, this is
# required by `DistGraph.find_edges()` for link prediction tasks.
#
# What's more, in order to find the dst nodes efficiently, we save
# dst nodes directly in the edge attributes.
#
# So we require additional `(2 * E) * dtype` space in total.
if graph_formats is not None and isinstance(graph_formats, str):
graph_formats = [graph_formats]
save_coo = (
graph_formats is None and "coo" in graph.formats()["created"]
) or (graph_formats is not None and "coo" in graph_formats)
if save_coo:
edge_attributes[DGL2GB_EID] = torch.argsort(edge_ids)
edge_attributes[GB_DST_ID] = gb.expand_indptr(
indptr, dtype=indices.dtype
)

# Cast various data to minimum dtype.
# Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr)
# Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices)
# Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE
)
# Cast 4: node/edge_attributes.
predicates = {
NID: part_meta["num_nodes"],
"part_id": num_parts,
NTYPE: len(ntypes),
EID: part_meta["num_edges"],
ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"],
GB_DST_ID: part_meta["num_nodes"],
}
for attributes in [node_attributes, edge_attributes]:
for key in attributes:
if key not in predicates:
continue
attributes[key] = _cast_to_minimum_dtype(
predicates[key], attributes[key], field=key
)

csc_graph = gb.fused_csc_sampling_graph(
indptr,
indices,
node_type_offset=None,
type_per_edge=type_per_edge,
node_attributes=node_attributes,
edge_attributes=edge_attributes,
node_type_to_id=node_type_to_id,
edge_type_to_id=edge_type_to_id,
)
orig_graph_path = os.path.join(
os.path.dirname(part_config),
part_meta[f"part-{part_id}"]["part_graph"],
)
csc_graph_path = os.path.join(
os.path.dirname(orig_graph_path), "fused_csc_sampling_graph.pt"
)
torch.save(csc_graph, csc_graph_path)

return os.path.relpath(csc_graph_path, os.path.dirname(part_config))
# Update graph path.


def dgl_partition_to_graphbolt(
part_config,
*,
store_eids=True,
store_inner_node=False,
store_inner_edge=False,
graph_formats=None,
n_jobs=1,
):
"""Convert partitions of dgl to FusedCSCSamplingGraph of GraphBolt.
Expand Down Expand Up @@ -1411,6 +1588,9 @@ def dgl_partition_to_graphbolt(
specifying `coo` format to save edge ID mapping and destination node
IDs. If not specified, whether to save `coo` format is determined by
the availability of the format in DGL partitions. Default: None.
n_jobs: int
Number of parallel jobs to run during partition conversion. Max parallelism
is determined by the partition count.
"""
debug_mode = "DGL_DIST_DEBUG" in os.environ
if debug_mode:
Expand All @@ -1422,14 +1602,6 @@ def dgl_partition_to_graphbolt(
new_part_meta = copy.deepcopy(part_meta)
num_parts = part_meta["num_parts"]

# Utility functions.
def is_homogeneous(ntypes, etypes):
return len(ntypes) == 1 and len(etypes) == 1

def init_type_per_edge(graph, gpb):
etype_ids = gpb.map_to_per_etype(graph.edata[EID])[0]
return etype_ids

# [Rui] DGL partitions are always saved as homogeneous graphs even though
# the original graph is heterogeneous. But heterogeneous information like
# node/edge types are saved as node/edge data alongside with partitions.
Expand All @@ -1440,134 +1612,40 @@ def init_type_per_edge(graph, gpb):
# We can simply pass None to it.

# Iterate over partitions.
for part_id in range(num_parts):
graph, _, _, gpb, _, _, _ = load_partition(
part_config, part_id, load_feats=False
)
_, _, ntypes, etypes = load_partition_book(part_config, part_id)
is_homo = is_homogeneous(ntypes, etypes)
node_type_to_id = (
None
if is_homo
else {ntype: ntid for ntid, ntype in enumerate(ntypes)}
)
edge_type_to_id = (
None
if is_homo
else {
gb.etype_tuple_to_str(etype): etid
for etype, etid in etypes.items()
}
)
# Obtain CSC indtpr and indices.
indptr, indices, edge_ids = graph.adj_tensors("csc")

# Save node attributes. Detailed attributes are shown below.
# DGL_GB\Attributes dgl.NID("_ID") dgl.NTYPE("_TYPE") "inner_node" "part_id"
# DGL_Homograph ✅ 🚫 ✅ ✅
# GB_Homograph ✅ 🚫 optional 🚫
# DGL_Heterograph ✅ ✅ ✅ ✅
# GB_Heterograph ✅ 🚫 optional 🚫
required_node_attrs = [NID]
if store_inner_node:
required_node_attrs.append("inner_node")
if debug_mode:
required_node_attrs = list(graph.ndata.keys())
node_attributes = {
attr: graph.ndata[attr] for attr in required_node_attrs
}

# Save edge attributes. Detailed attributes are shown below.
# DGL_GB\Attributes dgl.EID("_ID") dgl.ETYPE("_TYPE") "inner_edge"
# DGL_Homograph ✅ 🚫 ✅
# GB_Homograph optional 🚫 optional
# DGL_Heterograph ✅ ✅ ✅
# GB_Heterograph optional ✅ optional
type_per_edge = None
if not is_homo:
type_per_edge = init_type_per_edge(graph, gpb)[edge_ids]
type_per_edge = type_per_edge.to(RESERVED_FIELD_DTYPE[ETYPE])
required_edge_attrs = []
if store_eids:
required_edge_attrs.append(EID)
if store_inner_edge:
required_edge_attrs.append("inner_edge")
if debug_mode:
required_edge_attrs = list(graph.edata.keys())
edge_attributes = {
attr: graph.edata[attr][edge_ids] for attr in required_edge_attrs
}
# When converting DGLGraph to FusedCSCSamplingGraph, edge IDs are
# re-ordered(actually FusedCSCSamplingGraph does not have edge IDs
# in nature). So we need to save such re-order info for any
# operations that uses original local edge IDs. For now, this is
# required by `DistGraph.find_edges()` for link prediction tasks.
#
# What's more, in order to find the dst nodes efficiently, we save
# dst nodes directly in the edge attributes.
#
# So we require additional `(2 * E) * dtype` space in total.
if graph_formats is not None and isinstance(graph_formats, str):
graph_formats = [graph_formats]
save_coo = (
graph_formats is None and "coo" in graph.formats()["created"]
) or (graph_formats is not None and "coo" in graph_formats)
if save_coo:
edge_attributes[DGL2GB_EID] = torch.argsort(edge_ids)
edge_attributes[GB_DST_ID] = gb.expand_indptr(
indptr, dtype=indices.dtype
)

# Cast various data to minimum dtype.
# Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr)
# Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices)
# Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE
)
# Cast 4: node/edge_attributes.
predicates = {
NID: part_meta["num_nodes"],
"part_id": num_parts,
NTYPE: len(ntypes),
EID: part_meta["num_edges"],
ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"],
GB_DST_ID: part_meta["num_nodes"],
}
for attributes in [node_attributes, edge_attributes]:
for key in attributes:
if key not in predicates:
continue
attributes[key] = _cast_to_minimum_dtype(
predicates[key], attributes[key], field=key
)

csc_graph = gb.fused_csc_sampling_graph(
indptr,
indices,
node_type_offset=None,
type_per_edge=type_per_edge,
node_attributes=node_attributes,
edge_attributes=edge_attributes,
node_type_to_id=node_type_to_id,
edge_type_to_id=edge_type_to_id,
)
orig_graph_path = os.path.join(
os.path.dirname(part_config),
part_meta[f"part-{part_id}"]["part_graph"],
)
csc_graph_path = os.path.join(
os.path.dirname(orig_graph_path), "fused_csc_sampling_graph.pt"
)
torch.save(csc_graph, csc_graph_path)
convert_with_format = partial(
gb_convert_single_dgl_partition,
graph_formats=graph_formats,
part_config=part_config,
store_eids=store_eids,
store_inner_node=store_inner_node,
store_inner_edge=store_inner_edge,
)
# Need to create entirely new interpreters, because we call C++ downstream
# See https://docs.python.org/3.12/library/multiprocessing.html#contexts-and-start-methods
# and https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil
rel_path_results = []
if n_jobs > 1 and num_parts > 1:
mp_ctx = mp.get_context("spawn")
with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg
max_workers=min(num_parts, n_jobs),
mp_context=mp_ctx,
) as executor:
futures = []
for part_id in range(num_parts):
futures.append(executor.submit(convert_with_format, part_id))

for part_id in range(num_parts):
rel_path_results.append(futures[part_id].result())
else:
# If running single-threaded, avoid spawning new interpreter, which is slow
for part_id in range(num_parts):
rel_path_results.append(convert_with_format(part_id))

for part_id in range(num_parts):
# Update graph path.
new_part_meta[f"part-{part_id}"][
"part_graph_graphbolt"
] = os.path.relpath(csc_graph_path, os.path.dirname(part_config))
] = rel_path_results[part_id]

# Save dtype info into partition config.
# [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more
Expand Down
Loading

0 comments on commit ceb6672

Please sign in to comment.