Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphBolt] Add read_async to gb.Feature. [1] #7546

Merged
merged 3 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions python/dgl/graphbolt/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,46 @@ def read(self, ids: torch.Tensor = None):
"""
raise NotImplementedError

def read_async(self, ids: torch.Tensor):
"""Read the feature by index asynchronously.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
A generator object.
The returned generator object returns a future on
`read_async_num_stages(ids.device)`th invocation. The return result
can be accessed by calling `.wait()`. on the returned future object.
It is undefined behavior to call `.wait()` more than once.
Example Usage
--------
>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> async_handle = feature.read_async(ids)
>>> for _ in range(feature.read_async_num_stages(ids.device)):
... future = next(async_handle)
>>> result = future.wait() # result contains the read values.
"""
raise NotImplementedError

def read_async_num_stages(self, ids_device: torch.device):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you already returned an async_handle, why do you need to define read_async_num_stages?
Consider merging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because read_async returns a generator object. There is no way to also return how many stages are in it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can always override the generator object to provide an extra field?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. implement the len function of the object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I do that?

"""The number of stages of the read_async operation. See read_async
function for directions on its use.
Parameters
----------
ids_device : torch.device
The device of the ids parameter passed into read_async.
Returns
-------
int
The number of stages of the read_async operation.
"""
raise NotImplementedError

def size(self):
"""Get the size of the feature.

Expand Down
40 changes: 40 additions & 0 deletions python/dgl/graphbolt/impl/cpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ def read(self, ids: torch.Tensor = None):
self._feature.replace(missing_keys, missing_values)
return values

def read_async(self, ids: torch.Tensor):
"""Read the feature by index asynchronously.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
A generator object.
The returned generator object returns a future on
`read_async_num_stages(ids.device)`th invocation. The return result
can be accessed by calling `.wait()`. on the returned future object.
It is undefined behavior to call `.wait()` more than once.
Example Usage
--------
>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> async_handle = feature.read_async(ids)
>>> for _ in range(feature.read_async_num_stages(ids.device)):
... future = next(async_handle)
>>> result = future.wait() # result contains the read values.
"""
raise NotImplementedError

def read_async_num_stages(self, ids_device: torch.device):
"""The number of stages of the read_async operation. See read_async
function for directions on its use.
Parameters
----------
ids_device : torch.device
The device of the ids parameter passed into read_async.
Returns
-------
int
The number of stages of the read_async operation.
"""
raise NotImplementedError

def size(self):
"""Get the size of the feature.

Expand Down
40 changes: 40 additions & 0 deletions python/dgl/graphbolt/impl/gpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,46 @@ def read(self, ids: torch.Tensor = None):
self._feature.replace(missing_keys, missing_values)
return values

def read_async(self, ids: torch.Tensor):
"""Read the feature by index asynchronously.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
A generator object.
The returned generator object returns a future on
`read_async_num_stages(ids.device)`th invocation. The return result
can be accessed by calling `.wait()`. on the returned future object.
It is undefined behavior to call `.wait()` more than once.
Example Usage
--------
>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> async_handle = feature.read_async(ids)
>>> for _ in range(feature.read_async_num_stages(ids.device)):
... future = next(async_handle)
>>> result = future.wait() # result contains the read values.
"""
raise NotImplementedError

def read_async_num_stages(self, ids_device: torch.device):
"""The number of stages of the read_async operation. See read_async
function for directions on its use.
Parameters
----------
ids_device : torch.device
The device of the ids parameter passed into read_async.
Returns
-------
int
The number of stages of the read_async operation.
"""
raise NotImplementedError

def size(self):
"""Get the size of the feature.

Expand Down
80 changes: 80 additions & 0 deletions python/dgl/graphbolt/impl/torch_based_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,46 @@ def read(self, ids: torch.Tensor = None):
return self._tensor
return index_select(self._tensor, ids)

def read_async(self, ids: torch.Tensor):
"""Read the feature by index asynchronously.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
A generator object.
The returned generator object returns a future on
`read_async_num_stages(ids.device)`th invocation. The return result
can be accessed by calling `.wait()`. on the returned future object.
It is undefined behavior to call `.wait()` more than once.
Example Usage
--------
>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> async_handle = feature.read_async(ids)
>>> for _ in range(feature.read_async_num_stages(ids.device)):
... future = next(async_handle)
>>> result = future.wait() # result contains the read values.
"""
raise NotImplementedError

def read_async_num_stages(self, ids_device: torch.device):
"""The number of stages of the read_async operation. See read_async
function for directions on its use.
Parameters
----------
ids_device : torch.device
The device of the ids parameter passed into read_async.
Returns
-------
int
The number of stages of the read_async operation.
"""
raise NotImplementedError

def size(self):
"""Get the size of the feature.

Expand Down Expand Up @@ -303,6 +343,46 @@ def read(self, ids: torch.Tensor = None):
else:
return index_select(self._tensor, ids)

def read_async(self, ids: torch.Tensor):
"""Read the feature by index asynchronously.
Parameters
----------
ids : torch.Tensor
The index of the feature. Only the specified indices of the
feature are read.
Returns
-------
A generator object.
The returned generator object returns a future on
`read_async_num_stages(ids.device)`th invocation. The return result
can be accessed by calling `.wait()`. on the returned future object.
It is undefined behavior to call `.wait()` more than once.
Example Usage
--------
>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> async_handle = feature.read_async(ids)
>>> for _ in range(feature.read_async_num_stages(ids.device)):
... future = next(async_handle)
>>> result = future.wait() # result contains the read values.
"""
raise NotImplementedError

def read_async_num_stages(self, ids_device: torch.device):
"""The number of stages of the read_async operation. See read_async
function for directions on its use.
Parameters
----------
ids_device : torch.device
The device of the ids parameter passed into read_async.
Returns
-------
int
The number of stages of the read_async operation.
"""
raise NotImplementedError

def size(self):
"""Get the size of the feature.
Returns
Expand Down
Loading