Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphBolt][CUDA] GPUCachedFeature update fix. #7384

Merged
merged 10 commits into from
May 9, 2024
2 changes: 1 addition & 1 deletion examples/multigpu/graphbolt/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def parse_args():
"--gpu-cache-size",
type=int,
default=0,
help="The capacity of the GPU cache, the number of features to store.",
help="The capacity of the GPU cache in bytes.",
)
parser.add_argument(
"--dataset",
Expand Down
12 changes: 12 additions & 0 deletions examples/sampling/graphbolt/pyg/node_classification_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ def parse_args():
help="Graph storage - feature storage - Train device: 'cpu' for CPU and RAM,"
" 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.",
)
parser.add_argument(
"--gpu-cache-size",
type=int,
default=0,
help="The capacity of the GPU cache in bytes.",
)
parser.add_argument(
"--sample-mode",
default="sample_neighbor",
Expand Down Expand Up @@ -403,6 +409,12 @@ def main():

num_classes = dataset.tasks[0].metadata["num_classes"]

if args.gpu_cache_size > 0 and args.feature_device != "cuda":
features._features[("node", None, "feat")] = gb.GPUCachedFeature(
features._features[("node", None, "feat")],
args.gpu_cache_size,
)

train_dataloader, valid_dataloader = (
create_dataloader(
graph=graph,
Expand Down
37 changes: 29 additions & 8 deletions python/dgl/graphbolt/impl/gpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@
__all__ = ["GPUCachedFeature"]


def nbytes(tensor):
mfbalin marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the number of bytes to store the given tensor.

Needs to be defined only for torch versions less than 2.1. In torch >= 2.1,
we can simply use "tensor.nbytes".
"""
return tensor.numel() * tensor.element_size()


def num_cache_items(cache_capacity_in_bytes, single_item):
"""Returns the number of rows to be cached."""
item_bytes = nbytes(single_item)
# Round up so that we never get a size of 0, unless bytes is 0.
return (cache_capacity_in_bytes + item_bytes - 1) // item_bytes


class GPUCachedFeature(Feature):
r"""GPU cached feature wrapping a fallback feature.

Expand All @@ -17,8 +33,8 @@ class GPUCachedFeature(Feature):
----------
fallback_feature : Feature
The fallback feature.
cache_size : int
The capacity of the GPU cache, the number of features to store.
max_cache_size_in_bytes : int
The capacity of the GPU cache in bytes.

Examples
--------
Expand All @@ -42,16 +58,17 @@ class GPUCachedFeature(Feature):
torch.Size([5])
"""

def __init__(self, fallback_feature: Feature, cache_size: int):
def __init__(self, fallback_feature: Feature, max_cache_size_in_bytes: int):
super(GPUCachedFeature, self).__init__()
assert isinstance(fallback_feature, Feature), (
f"The fallback_feature must be an instance of Feature, but got "
f"{type(fallback_feature)}."
)
self._fallback_feature = fallback_feature
self.cache_size = cache_size
self.max_cache_size_in_bytes = max_cache_size_in_bytes
# Fetching the feature dimension from the underlying feature.
feat0 = fallback_feature.read(torch.tensor([0]))
cache_size = num_cache_items(max_cache_size_in_bytes, feat0)
self._feature = GPUCache((cache_size,) + feat0.shape[1:], feat0.dtype)

def read(self, ids: torch.Tensor = None):
Expand Down Expand Up @@ -104,11 +121,15 @@ def update(self, value: torch.Tensor, ids: torch.Tensor = None):
updated.
"""
if ids is None:
feat0 = value[:1]
self._fallback_feature.update(value)
size = min(self.cache_size, value.shape[0])
self._feature.replace(
torch.arange(0, size, device="cuda"),
value[:size].to("cuda"),
cache_size = min(
num_cache_items(self.max_cache_size_in_bytes, feat0),
value.shape[0],
)
self._feature = None # Destroy the existing cache first.
self._feature = GPUCache(
(cache_size,) + feat0.shape[1:], feat0.dtype
)
else:
self._fallback_feature.update(value, ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
[[[1, 2], [3, 4]], [[4, 5], [6, 7]]], dtype=dtype, pin_memory=True
)

cache_size_a *= a[:1].element_size() * a[:1].numel()
cache_size_b *= b[:1].element_size() * b[:1].numel()

feat_store_a = gb.GPUCachedFeature(gb.TorchBasedFeature(a), cache_size_a)
feat_store_b = gb.GPUCachedFeature(gb.TorchBasedFeature(b), cache_size_b)

Expand Down Expand Up @@ -94,3 +97,7 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
feat_store_a.read(),
torch.tensor([[2, 0, 1], [3, 5, 2]], dtype=dtype).to("cuda"),
)

# Test with different dimensionality
feat_store_a.update(b)
assert torch.equal(feat_store_a.read(), b.to("cuda"))
Loading