Skip to content

Commit

Permalink
[GraphBolt] Fix DiskBasedFeature Memory Leak Problem (#7349)
Browse files Browse the repository at this point in the history
Co-authored-by: Ubuntu <ubuntu@ip-172-31-2-128.us-west-2.compute.internal>
  • Loading branch information
pyynb and Ubuntu committed Jul 3, 2024
1 parent 6b222af commit 713ffb5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 50 deletions.
103 changes: 55 additions & 48 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <cstring>
#include <regex>
#include <stdexcept>

namespace graphbolt {
namespace storage {

Expand Down Expand Up @@ -111,12 +110,10 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_);
char *result_buffer =
(char *)aligned_alloc(kDiskAlignmentSize, feature_size_ * num_index);

auto index_data = index.data_ptr<int64_t>();

// Record the inside offsets of feteched features.
int64_t residual[group_size_ * num_thread_];

// Indicator for index error.
std::atomic<bool> error_flag{};
TORCH_CHECK(
Expand All @@ -127,26 +124,30 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
0, num_index, group_size_, [&](int64_t begin, int64_t end) {
auto thread_id = torch::get_thread_num();
if (!error_flag.load()) {
for (int64_t i = begin; i < end; i++) {
int64_t batch_offset = begin;
for (int64_t i = begin; i < end; ++i) {
int64_t group_id = i - begin;
int64_t feature_id = index_data[i]; // Feature id.
if (feature_id >= feature_dim_[0]) {
error_flag.store(true);
break;
}
// calculate offset of the feature.
int64_t offset = feature_id * feature_size_ + prefix_len_;
int64_t aligned_offset = offset & (long)~(kDiskAlignmentSize - 1);
residual[thread_id * group_size_ + group_id] =
// put offset of the feature into array.
residual[thread_id * group_size_ + (group_id % group_size_)] =
offset - aligned_offset;

// If the tail of the feature extends into another block, read an
// additional block.
int64_t read_size;
if (residual[thread_id * group_size_ + group_id] + feature_size_ >
if (residual[thread_id * group_size_ + (group_id % group_size_)] +
feature_size_ >
kDiskAlignmentSize) {
read_size = aligned_length + kDiskAlignmentSize;
} else {
read_size = aligned_length;
}

// Put requests into io_uring queue.
struct io_uring_sqe *submit_queue =
io_uring_get_sqe(&io_uring_queue_[thread_id]);
Expand All @@ -155,50 +156,57 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
thread_id) +
((aligned_length + kDiskAlignmentSize) * group_id),
((aligned_length + kDiskAlignmentSize) *
(group_id % group_size_)),
read_size, aligned_offset);
}
}
if (!error_flag.load()) {
// Submit I/O requests.
io_uring_submit(&io_uring_queue_[thread_id]);

// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current group.
while (num_finish < end - begin) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
abort();
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues, group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
abort();
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
}

// Copy the features in the disk blocks to the result buffer.
for (int64_t group_id = 0; group_id < end - begin; group_id++) {
memcpy(
result_buffer + feature_size_ * (begin + group_id),
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
thread_id) +
((aligned_length + kDiskAlignmentSize) * group_id +
residual[thread_id * group_size_ + group_id]),
feature_size_);
if (((group_id + 1) % group_size_ == 0) || i == end - 1) {
if (!error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current group.
while (num_finish < (group_id % group_size_ + 1)) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
abort();
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
abort();
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
}
// copy results into result_buffer.

for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
memcpy(
result_buffer + feature_size_ * (batch_id),
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
thread_id) +
((aligned_length + kDiskAlignmentSize) *
((batch_id - begin) % group_size_) +
residual
[thread_id * group_size_ +
((batch_id - begin) % group_size_)]),
feature_size_);
}
batch_offset += group_size_;
}
}
}
}
});

auto result = torch::empty({0});
if (!error_flag.load()) {
auto options = torch::TensorOptions()
Expand All @@ -218,7 +226,6 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {

free(read_buffer);
free(result_buffer);

return result;
}
#endif // HAVE_LIBRARY_LIBURING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ def test_disk_based_feature():
feature_a.read(), torch.tensor([[1, 2, 3], [4, 5, 6]])
)

# Test read the feature with ids.
assert torch.equal(
feature_b.read(), torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
)

# Read the feature with ids.
# Test read the feature with ids.
assert torch.equal(
feature_a.read(torch.tensor([0])),
torch.tensor([[1, 2, 3]]),
Expand All @@ -52,6 +51,20 @@ def test_disk_based_feature():
torch.tensor([[[4, 5], [6, 7]]]),
)

# test when the index tensor is large.
torch_based_feature_a = gb.TorchBasedFeature(a)
ind_a = torch.randint(low=0, high=2, size=(1, 4097))[0]
assert torch.equal(
feature_a.read(ind_a),
torch_based_feature_a.read(ind_a),
)
torch_based_feature_b = gb.TorchBasedFeature(b)
ind_b = torch.randint(low=0, high=2, size=(1, 4097))[0]
assert torch.equal(
feature_b.read(ind_b),