Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphBolt][io_uring] Improve detection and simplify code. #7515

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 55 additions & 78 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ namespace graphbolt {
namespace storage {

static constexpr int kDiskAlignmentSize = 4096;
static constexpr int kGroupSize = 512;

OnDiskNpyArray::OnDiskNpyArray(
std::string filename, torch::ScalarType dtype, torch::Tensor shape,
torch::optional<int64_t> num_threads)
: filename_(filename), dtype_(dtype), group_size_(512) {
: filename_(filename), dtype_(dtype) {
#ifndef __linux__
throw std::runtime_error(
"OnDiskNpyArray is not supported on non-Linux systems.");
Expand All @@ -44,8 +45,24 @@ OnDiskNpyArray::OnDiskNpyArray(

// Init io_uring queue.
for (int64_t t = 0; t < num_thread_; t++) {
io_uring_queue_init(group_size_, &io_uring_queue_[t], 0);
io_uring_queue_init(kGroupSize, &io_uring_queue_[t], 0);
}

// The minimum page size to contain one feature.
aligned_length_ = (feature_size_ + kDiskAlignmentSize - 1) &
(long)~(kDiskAlignmentSize - 1);

const size_t read_buffer_intended_size =
(aligned_length_ + kDiskAlignmentSize) * kGroupSize * num_thread_;
size_t read_buffer_size = read_buffer_intended_size + kDiskAlignmentSize - 1;
read_tensor_ = torch::empty(
read_buffer_size,
torch::TensorOptions().dtype(torch::kInt8).device(torch::kCPU));
auto read_buffer_void_ptr = read_tensor_.data_ptr();
read_buffer_ = reinterpret_cast<char *>(std::align(
kDiskAlignmentSize, read_buffer_intended_size, read_buffer_void_ptr,
read_buffer_size));
TORCH_CHECK(read_buffer_, "read_buffer allocation failed!");
#else
throw std::runtime_error("DiskBasedFeature is not available now.");
#endif // HAVE_LIBRARY_LIBURING
Expand Down Expand Up @@ -110,43 +127,24 @@ c10::intrusive_ptr<Future<torch::Tensor>> OnDiskNpyArray::IndexSelect(
#ifdef HAVE_LIBRARY_LIBURING
void OnDiskNpyArray::IndexSelectIOUringImpl(
torch::Tensor index, torch::Tensor result) {
const int64_t num_index = index.numel();
// The minimum page size to contain one feature.
const int64_t aligned_length = (feature_size_ + kDiskAlignmentSize - 1) &
(long)~(kDiskAlignmentSize - 1);

const size_t read_buffer_intended_size =
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_;
size_t read_buffer_size = read_buffer_intended_size + kDiskAlignmentSize - 1;
auto read_tensor = torch::empty(
read_buffer_size,
torch::TensorOptions().dtype(torch::kInt8).device(torch::kCPU));
auto read_buffer_void_ptr = read_tensor.data_ptr();
auto read_buffer = reinterpret_cast<char *>(std::align(
kDiskAlignmentSize, read_buffer_intended_size, read_buffer_void_ptr,
read_buffer_size));
TORCH_CHECK(read_buffer, "read_buffer allocation failed!");

auto result_buffer = reinterpret_cast<char *>(result.data_ptr());

// Record the inside offsets of feteched features.
auto residual =
std::unique_ptr<int64_t[]>(new int64_t[group_size_ * num_thread_]);
// Indicator for index error.
std::atomic<bool> error_flag{};
std::atomic<int64_t> work_queue{};
torch::parallel_for(0, num_thread_, 1, [&](int64_t begin, int64_t end) {
const auto thread_id = begin;
// Record the inside offsets of fetched features.
int64_t residual[kGroupSize];
while (true) {
begin = work_queue.fetch_add(group_size_, std::memory_order_relaxed);
if (begin >= num_index || error_flag.load()) break;
end = std::min(begin + group_size_, num_index);
int64_t batch_offset = begin;
begin = work_queue.fetch_add(kGroupSize, std::memory_order_relaxed);
if (begin >= index.numel() || error_flag.load()) break;
end = std::min(begin + kGroupSize, index.numel());
AT_DISPATCH_INDEX_TYPES(
index.scalar_type(), "IndexSelectIOUring", ([&] {
auto index_data = index.data_ptr<index_t>();
for (int64_t i = begin; i < end; ++i) {
int64_t group_id = i - begin;
int64_t local_id = i - begin;
int64_t feature_id = index_data[i]; // Feature id.
if (feature_id >= feature_dim_[0]) {
error_flag.store(true);
Expand All @@ -156,72 +154,51 @@ void OnDiskNpyArray::IndexSelectIOUringImpl(
int64_t offset = feature_id * feature_size_ + prefix_len_;
int64_t aligned_offset = offset & (long)~(kDiskAlignmentSize - 1);
// put offset of the feature into array.
const auto local_residual_offset =
thread_id * group_size_ + (group_id % group_size_);
residual[local_residual_offset] = offset - aligned_offset;
residual[local_id] = offset - aligned_offset;
// If the tail of the feature extends into another block,
// read an additional block.
int64_t read_size;
if (residual[thread_id * group_size_ + (group_id % group_size_)] +
feature_size_ >
kDiskAlignmentSize) {
read_size = aligned_length + kDiskAlignmentSize;
if (residual[local_id] + feature_size_ > kDiskAlignmentSize) {
read_size = aligned_length_ + kDiskAlignmentSize;
} else {
read_size = aligned_length;
read_size = aligned_length_;
}
const auto local_read_buffer_offset =
(aligned_length + kDiskAlignmentSize) * group_size_ *
(aligned_length_ + kDiskAlignmentSize) * kGroupSize *
thread_id;
// Put requests into io_uring queue.
struct io_uring_sqe *submit_queue =
io_uring_get_sqe(&io_uring_queue_[thread_id]);
io_uring_prep_read(
submit_queue, file_description_,
read_buffer + local_read_buffer_offset +
((aligned_length + kDiskAlignmentSize) *
(group_id % group_size_)),
read_buffer_ + local_read_buffer_offset +
((aligned_length_ + kDiskAlignmentSize) * local_id),
read_size, aligned_offset);

if (((group_id + 1) % group_size_ == 0) || i == end - 1) {
if (!error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current
// group.
while (num_finish < (group_id % group_size_ + 1)) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
std::exit(EXIT_FAILURE);
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
if (i + 1 == end && !error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
struct io_uring_cqe *complete_queues[kGroupSize];
// Wait for submitted end - begin reads to finish.
if (io_uring_wait_cqe_nr(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
}
// Copy results into result_buffer.
for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
const auto local_id = (batch_id - begin) % group_size_;
const auto batch_offset =
(aligned_length + kDiskAlignmentSize) * local_id;
std::memcpy(
result_buffer + feature_size_ * batch_id,
read_buffer + local_read_buffer_offset +
(batch_offset +
residual[thread_id * group_size_ + local_id]),
feature_size_);
}
batch_offset += group_size_;
end - begin) < 0) {
perror("io_uring_wait_cqe_nr error\n");
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], end - begin);

// Copy results into result_buffer.
for (int64_t j = begin; j < end; j++) {
const auto local_id = j - begin;
const auto batch_offset =
(aligned_length_ + kDiskAlignmentSize) * local_id;
std::memcpy(
result_buffer + feature_size_ * j,
read_buffer_ + local_read_buffer_offset +
(batch_offset + residual[local_id]),
feature_size_);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion graphbolt/src/cnumpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
const torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int64_t aligned_length_; // Aligned feature_size.
int num_thread_; // Default thread number.
const int64_t group_size_; // Default group size.
torch::Tensor read_tensor_; // Provides temporary read buffer.
char* read_buffer_; // Aligned pointer to read_tensor.

#ifdef HAVE_LIBRARY_LIBURING
std::unique_ptr<io_uring[]> io_uring_queue_; // io_uring queue.
Expand Down
21 changes: 21 additions & 0 deletions graphbolt/src/detect_io_uring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
#include "./detect_io_uring.h"

#include <errno.h>
#include <liburing.h>
#include <linux/io_uring.h>
#include <stddef.h>
#include <sys/syscall.h>
#include <unistd.h>

#include <memory>
#include <mutex>

struct io_uring_probe_destroyer {
void operator()(struct io_uring_probe* p) {
if (p) io_uring_free_probe(p);
}
};
#endif

namespace graphbolt {
Expand All @@ -46,6 +54,19 @@ bool IsAvailable() {
!(syscall(
__NR_io_uring_register, 0, IORING_UNREGISTER_BUFFERS, NULL, 0) &&
errno == ENOSYS);

std::unique_ptr<struct io_uring_probe, io_uring_probe_destroyer> probe(
io_uring_get_probe(), io_uring_probe_destroyer());
if (probe.get()) {
cached_is_available =
cached_is_available &&
io_uring_opcode_supported(probe.get(), IORING_OP_READ);
cached_is_available =
cached_is_available &&
io_uring_opcode_supported(probe.get(), IORING_OP_READV);
} else {
cached_is_available = false;
}
});

return cached_is_available;
Expand Down
Loading