Skip to content

Commit

Permalink
[GraphBolt] CPUCachedFeature.read_async tests [9]
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 20, 2024
1 parent 8d770b6 commit 2b52591
Showing 1 changed file with 86 additions and 1 deletion.
87 changes: 86 additions & 1 deletion tests/python/pytorch/graphbolt/impl/test_cpu_cached_feature.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import backend as F
import os
import tempfile
import unittest

import backend as F
import numpy as np
import pytest
import torch

from dgl import graphbolt as gb


def to_on_disk_numpy(test_dir, name, t):
path = os.path.join(test_dir, name + ".npy")
np.save(path, t.numpy())
return path


@pytest.mark.parametrize(
"dtype",
[
Expand Down Expand Up @@ -93,3 +103,78 @@ def test_cpu_cached_feature(dtype, policy):
# Test with different dimensionality
feat_store_a.update(b)
assert torch.equal(feat_store_a.read(), b)


@pytest.mark.parametrize(
"dtype",
[
torch.bool,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.float16,
torch.bfloat16,
torch.float32,
torch.float64,
],
)
def test_cpu_cached_feature_read_async(dtype):
a = torch.randint(0, 2, [1000, 13], dtype=dtype)

cache_size = 256 * a[:1].nbytes

feat_store = gb.CPUCachedFeature(gb.TorchBasedFeature(a), cache_size)

# Test read with ids.
ids1 = torch.tensor([0, 15, 71, 101])
ids2 = torch.tensor([71, 101, 202, 303])
for ids in [ids1, ids2]:
reader = feat_store.read_async(ids)
for _ in range(feat_store.read_async_num_stages(ids.device)):
values = next(reader)
assert torch.equal(values.wait(), a[ids])


@unittest.skipIf(
not torch.ops.graphbolt.detect_io_uring(),
reason="DiskBasedFeature is not available on this system.",
)
@pytest.mark.parametrize(
"dtype",
[
torch.bool,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.float16,
torch.float32,
torch.float64,
],
)
def test_cpu_cached_disk_feature_read_async(dtype):
a = torch.randint(0, 2, [1000, 13], dtype=dtype)

cache_size = 256 * a[:1].nbytes

ids1 = torch.tensor([0, 15, 71, 101])
ids2 = torch.tensor([71, 101, 202, 303])

with tempfile.TemporaryDirectory() as test_dir:
path = to_on_disk_numpy(test_dir, "tensor", a)

feat_store = gb.CPUCachedFeature(
gb.DiskBasedFeature(path=path), cache_size
)

# Test read feature.
for ids in [ids1, ids2]:
reader = feat_store.read_async(ids)
for _ in range(feat_store.read_async_num_stages(ids.device)):
values = next(reader)
assert torch.equal(values.wait(), a[ids])

feat_store = None

0 comments on commit 2b52591

Please sign in to comment.