Skip to content

Commit

Permalink
[GraphBolt][CUDA] GPUCachedFeature update fix. (#7384)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin authored and Rhett-Ying committed May 10, 2024
1 parent bb97946 commit 2beb375
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
2 changes: 1 addition & 1 deletion examples/multigpu/graphbolt/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def parse_args():
"--gpu-cache-size",
type=int,
default=0,
help="The capacity of the GPU cache, the number of features to store.",
help="The capacity of the GPU cache in bytes.",
)
parser.add_argument(
"--dataset",
Expand Down
12 changes: 12 additions & 0 deletions examples/sampling/graphbolt/pyg/node_classification_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ def parse_args():
help="Graph storage - feature storage - Train device: 'cpu' for CPU and RAM,"
" 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.",
)
parser.add_argument(
"--gpu-cache-size",
type=int,
default=0,
help="The capacity of the GPU cache in bytes.",
)
parser.add_argument(
"--sample-mode",
default="sample_neighbor",
Expand Down Expand Up @@ -403,6 +409,12 @@ def main():

num_classes = dataset.tasks[0].metadata["num_classes"]

if args.gpu_cache_size > 0 and args.feature_device != "cuda":
features._features[("node", None, "feat")] = gb.GPUCachedFeature(
features._features[("node", None, "feat")],
args.gpu_cache_size,
)

train_dataloader, valid_dataloader = (
create_dataloader(
graph=graph,
Expand Down
37 changes: 29 additions & 8 deletions python/dgl/graphbolt/impl/gpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@
__all__ = ["GPUCachedFeature"]


def nbytes(tensor):
"""Returns the number of bytes to store the given tensor.
Needs to be defined only for torch versions less than 2.1. In torch >= 2.1,
we can simply use "tensor.nbytes".
"""
return tensor.numel() * tensor.element_size()


def num_cache_items(cache_capacity_in_bytes, single_item):
"""Returns the number of rows to be cached."""
item_bytes = nbytes(single_item)
# Round up so that we never get a size of 0, unless bytes is 0.
return (cache_capacity_in_bytes + item_bytes - 1) // item_bytes


class GPUCachedFeature(Feature):
r"""GPU cached feature wrapping a fallback feature.
Expand All @@ -17,8 +33,8 @@ class GPUCachedFeature(Feature):
----------
fallback_feature : Feature
The fallback feature.
cache_size : int
The capacity of the GPU cache, the number of features to store.
max_cache_size_in_bytes : int
The capacity of the GPU cache in bytes.
Examples
--------
Expand All @@ -42,16 +58,17 @@ class GPUCachedFeature(Feature):
torch.Size([5])
"""

def __init__(self, fallback_feature: Feature, cache_size: int):
def __init__(self, fallback_feature: Feature, max_cache_size_in_bytes: int):
super(GPUCachedFeature, self).__init__()
assert isinstance(fallback_feature, Feature), (
f"The fallback_feature must be an instance of Feature, but got "
f"{type(fallback_feature)}."
)
self._fallback_feature = fallback_feature
self.cache_size = cache_size
self.max_cache_size_in_bytes = max_cache_size_in_bytes
# Fetching the feature dimension from the underlying feature.
feat0 = fallback_feature.read(torch.tensor([0]))
cache_size = num_cache_items(max_cache_size_in_bytes, feat0)
self._feature = GPUCache((cache_size,) + feat0.shape[1:], feat0.dtype)

def read(self, ids: torch.Tensor = None):
Expand Down Expand Up @@ -104,11 +121,15 @@ def update(self, value: torch.Tensor, ids: torch.Tensor = None):
updated.
"""
if ids is None:
feat0 = value[:1]
self._fallback_feature.update(value)
size = min(self.cache_size, value.shape[0])
self._feature.replace(
torch.arange(0, size, device="cuda"),
value[:size].to("cuda"),
cache_size = min(
num_cache_items(self.max_cache_size_in_bytes, feat0),
value.shape[0],
)
self._feature = None # Destroy the existing cache first.
self._feature = GPUCache(
(cache_size,) + feat0.shape[1:], feat0.dtype
)
else:
self._fallback_feature.update(value, ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
[[[1, 2], [3, 4]], [[4, 5], [6, 7]]], dtype=dtype, pin_memory=True
)

cache_size_a *= a[:1].element_size() * a[:1].numel()
cache_size_b *= b[:1].element_size() * b[:1].numel()

feat_store_a = gb.GPUCachedFeature(gb.TorchBasedFeature(a), cache_size_a)
feat_store_b = gb.GPUCachedFeature(gb.TorchBasedFeature(b), cache_size_b)

Expand Down Expand Up @@ -94,3 +97,7 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
feat_store_a.read(),
torch.tensor([[2, 0, 1], [3, 5, 2]], dtype=dtype).to("cuda"),
)

# Test with different dimensionality
feat_store_a.update(b)
assert torch.equal(feat_store_a.read(), b.to("cuda"))

0 comments on commit 2beb375

Please sign in to comment.