Skip to content

Commit

Permalink
simplify waiting for requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 10, 2024
1 parent 0db100c commit 2c652e2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 30 deletions.
33 changes: 11 additions & 22 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static constexpr int kDiskAlignmentSize = 4096;
OnDiskNpyArray::OnDiskNpyArray(
std::string filename, torch::ScalarType dtype, torch::Tensor shape,
torch::optional<int64_t> num_threads)
: filename_(filename), dtype_(dtype), group_size_(512) {
: filename_(filename), dtype_(dtype) {
#ifndef __linux__
throw std::runtime_error(
"OnDiskNpyArray is not supported on non-Linux systems.");
Expand Down Expand Up @@ -185,28 +185,17 @@ void OnDiskNpyArray::IndexSelectIOUringImpl(
if (i + 1 == end && !error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current
// group.
while (num_finish < (group_id % group_size_ + 1)) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
std::exit(EXIT_FAILURE);
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
struct io_uring_cqe *complete_queues[group_size_];
// Wait for submitted end - begin reads to finish.
if (io_uring_wait_cqe_nr(
&io_uring_queue_[thread_id], complete_queues,
end - begin) < 0) {
perror("io_uring_wait_cqe_nr error\n");
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], end - begin);

// Copy results into result_buffer.
for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
Expand Down
16 changes: 8 additions & 8 deletions graphbolt/src/cnumpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ class OnDiskNpyArray : public torch::CustomClassHolder {

#endif // HAVE_LIBRARY_LIBURING
private:
const std::string filename_; // Path to numpy file.
int file_description_; // File description.
size_t prefix_len_; // Length of head data in numpy file.
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
const torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int num_thread_; // Default thread number.
const int64_t group_size_; // Default group size.
const std::string filename_; // Path to numpy file.
int file_description_; // File description.
size_t prefix_len_; // Length of head data in numpy file.
std::vector<int64_t> feature_dim_; // Shape of features, e.g. {N,M,K,L}.
const torch::ScalarType dtype_; // Feature data type.
int64_t feature_size_; // Number of bytes of feature size.
int num_thread_; // Default thread number.
static constexpr int group_size_ = 512; // Default group size.

#ifdef HAVE_LIBRARY_LIBURING
std::unique_ptr<io_uring[]> io_uring_queue_; // io_uring queue.
Expand Down

0 comments on commit 2c652e2

Please sign in to comment.