Skip to content

Commit

Permalink
simplify code further and reduce overhead of hot path.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 11, 2024
1 parent daa2c1e commit 4f09f2e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 47 deletions.
73 changes: 34 additions & 39 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace graphbolt {
namespace storage {

static constexpr int kDiskAlignmentSize = 4096;
static constexpr int kGroupSize = 512;

OnDiskNpyArray::OnDiskNpyArray(
std::string filename, torch::ScalarType dtype, torch::Tensor shape,
Expand All @@ -44,8 +45,24 @@ OnDiskNpyArray::OnDiskNpyArray(

// Init io_uring queue.
for (int64_t t = 0; t < num_thread_; t++) {
io_uring_queue_init(group_size_, &io_uring_queue_[t], 0);
io_uring_queue_init(kGroupSize, &io_uring_queue_[t], 0);
}

// The minimum page size to contain one feature.
aligned_length_ = (feature_size_ + kDiskAlignmentSize - 1) &
(long)~(kDiskAlignmentSize - 1);

const size_t read_buffer_intended_size =
(aligned_length_ + kDiskAlignmentSize) * kGroupSize * num_thread_;
size_t read_buffer_size = read_buffer_intended_size + kDiskAlignmentSize - 1;
read_tensor_ = torch::empty(
read_buffer_size,
torch::TensorOptions().dtype(torch::kInt8).device(torch::kCPU));
auto read_buffer_void_ptr = read_tensor_.data_ptr();
read_buffer_ = reinterpret_cast<char *>(std::align(
kDiskAlignmentSize, read_buffer_intended_size, read_buffer_void_ptr,
read_buffer_size));
TORCH_CHECK(read_buffer_, "read_buffer allocation failed!");
#else
throw std::runtime_error("DiskBasedFeature is not available now.");
#endif // HAVE_LIBRARY_LIBURING
Expand Down Expand Up @@ -110,37 +127,19 @@ c10::intrusive_ptr<Future<torch::Tensor>> OnDiskNpyArray::IndexSelect(
#ifdef HAVE_LIBRARY_LIBURING
void OnDiskNpyArray::IndexSelectIOUringImpl(
torch::Tensor index, torch::Tensor result) {
const int64_t num_index = index.numel();
// The minimum page size to contain one feature.
const int64_t aligned_length = (feature_size_ + kDiskAlignmentSize - 1) &
(long)~(kDiskAlignmentSize - 1);

const size_t read_buffer_intended_size =
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_;
size_t read_buffer_size = read_buffer_intended_size + kDiskAlignmentSize - 1;
auto read_tensor = torch::empty(
read_buffer_size,
torch::TensorOptions().dtype(torch::kInt8).device(torch::kCPU));
auto read_buffer_void_ptr = read_tensor.data_ptr();
auto read_buffer = reinterpret_cast<char *>(std::align(
kDiskAlignmentSize, read_buffer_intended_size, read_buffer_void_ptr,
read_buffer_size));
TORCH_CHECK(read_buffer, "read_buffer allocation failed!");

auto result_buffer = reinterpret_cast<char *>(result.data_ptr());

// Record the inside offsets of feteched features.
auto residual =
std::unique_ptr<int64_t[]>(new int64_t[group_size_ * num_thread_]);
// Indicator for index error.
std::atomic<bool> error_flag{};
std::atomic<int64_t> work_queue{};
torch::parallel_for(0, num_thread_, 1, [&](int64_t begin, int64_t end) {
const auto thread_id = begin;
// Record the inside offsets of fetched features.
int64_t residual[kGroupSize];
while (true) {
begin = work_queue.fetch_add(group_size_, std::memory_order_relaxed);
if (begin >= num_index || error_flag.load()) break;
end = std::min(begin + group_size_, num_index);
begin = work_queue.fetch_add(kGroupSize, std::memory_order_relaxed);
if (begin >= index.numel() || error_flag.load()) break;
end = std::min(begin + kGroupSize, index.numel());
AT_DISPATCH_INDEX_TYPES(
index.scalar_type(), "IndexSelectIOUring", ([&] {
auto index_data = index.data_ptr<index_t>();
Expand All @@ -155,34 +154,31 @@ void OnDiskNpyArray::IndexSelectIOUringImpl(
int64_t offset = feature_id * feature_size_ + prefix_len_;
int64_t aligned_offset = offset & (long)~(kDiskAlignmentSize - 1);
// put offset of the feature into array.
const auto local_residual_offset =
thread_id * group_size_ + local_id;
residual[local_residual_offset] = offset - aligned_offset;
residual[local_id] = offset - aligned_offset;
// If the tail of the feature extends into another block,
// read an additional block.
int64_t read_size;
if (residual[thread_id * group_size_ + local_id] + feature_size_ >
kDiskAlignmentSize) {
read_size = aligned_length + kDiskAlignmentSize;
if (residual[local_id] + feature_size_ > kDiskAlignmentSize) {
read_size = aligned_length_ + kDiskAlignmentSize;
} else {
read_size = aligned_length;
read_size = aligned_length_;
}
const auto local_read_buffer_offset =
(aligned_length + kDiskAlignmentSize) * group_size_ *
(aligned_length_ + kDiskAlignmentSize) * kGroupSize *
thread_id;
// Put requests into io_uring queue.
struct io_uring_sqe *submit_queue =
io_uring_get_sqe(&io_uring_queue_[thread_id]);
io_uring_prep_read(
submit_queue, file_description_,
read_buffer + local_read_buffer_offset +
((aligned_length + kDiskAlignmentSize) * local_id),
read_buffer_ + local_read_buffer_offset +
((aligned_length_ + kDiskAlignmentSize) * local_id),
read_size, aligned_offset);

if (i + 1 == end && !error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
struct io_uring_cqe *complete_queues[group_size_];
struct io_uring_cqe *complete_queues[kGroupSize];
// Wait for submitted end - begin reads to finish.
if (io_uring_wait_cqe_nr(
&io_uring_queue_[thread_id], complete_queues,
Expand All @@ -197,12 +193,11 @@ void OnDiskNpyArray::IndexSelectIOUringImpl(
for (int64_t j = begin; j < end; j++) {
const auto local_id = j - begin;
const auto batch_offset =
(aligned_length + kDiskAlignmentSize) * local_id;
(aligned_length_ + kDiskAlignmentSize) * local_id;
std::memcpy(
result_buffer + feature_size_ * j,
read_buffer + local_read_buffer_offset +
(batch_offset +
residual[thread_id * group_size_ + local_id]),
read_buffer_ + local_read_buffer_offset +
(batch_offset + residual[local_id]),
feature_size_);
}
}
Expand Down
18 changes: 10 additions & 8 deletions graphbolt/src/cnumpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ class OnDiskNpyArray : public torch::CustomClassHolder {

#endif // HAVE_LIBRARY_LIBURING
private:
const std::string filename_; // Path to numpy file.
int file_description_; // File description.
size_t prefix_len_; // Length of head data in numpy file.
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
const torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int num_thread_; // Default thread number.
static constexpr int group_size_ = 512; // Default group size.
const std::string filename_; // Path to numpy file.
int file_description_; // File description.
size_t prefix_len_; // Length of head data in numpy file.
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
const torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int64_t aligned_length_; // Aligned feature_size.
int num_thread_; // Default thread number.
torch::Tensor read_tensor_; // Provides temporary read buffer.
char* read_buffer_; // Aligned pointer to read_tensor.

#ifdef HAVE_LIBRARY_LIBURING
std::unique_ptr<io_uring[]> io_uring_queue_; // io_uring queue.
Expand Down

0 comments on commit 4f09f2e

Please sign in to comment.