Skip to content

Commit

Permalink
[GraphBolt][io_uring] Improve detection and simplify code.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 10, 2024
1 parent 3a8ce85 commit 85bfcf8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 38 deletions.
74 changes: 36 additions & 38 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,47 +182,45 @@ void OnDiskNpyArray::IndexSelectIOUringImpl(
(group_id % group_size_)),
read_size, aligned_offset);

if (((group_id + 1) % group_size_ == 0) || i == end - 1) {
if (!error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current
// group.
while (num_finish < (group_id % group_size_ + 1)) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
std::exit(EXIT_FAILURE);
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
if (i + 1 == end && !error_flag.load()) {
io_uring_submit(&io_uring_queue_[thread_id]);
// Wait for completion of I/O requests.
int64_t num_finish = 0;
// Wait until all the disk blocks are loaded in current
// group.
while (num_finish < (group_id % group_size_ + 1)) {
struct io_uring_cqe *complete_queue;
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
std::exit(EXIT_FAILURE);
}
// Copy results into result_buffer.
for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
const auto local_id = (batch_id - begin) % group_size_;
const auto batch_offset =
(aligned_length + kDiskAlignmentSize) * local_id;
std::memcpy(
result_buffer + feature_size_ * batch_id,
read_buffer + local_read_buffer_offset +
(batch_offset +
residual[thread_id * group_size_ + local_id]),
feature_size_);
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
std::exit(EXIT_FAILURE);
}
batch_offset += group_size_;
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
num_finish += cqe_count;
}
// Copy results into result_buffer.
for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
const auto local_id = (batch_id - begin) % group_size_;
const auto batch_offset =
(aligned_length + kDiskAlignmentSize) * local_id;
std::memcpy(
result_buffer + feature_size_ * batch_id,
read_buffer + local_read_buffer_offset +
(batch_offset +
residual[thread_id * group_size_ + local_id]),
feature_size_);
}
batch_offset += group_size_;
}
}
}));
Expand Down
17 changes: 17 additions & 0 deletions graphbolt/src/detect_io_uring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
#include "./detect_io_uring.h"

#include <errno.h>
#include <liburing.h>
#include <linux/io_uring.h>
#include <stddef.h>
#include <sys/syscall.h>
#include <unistd.h>

#include <memory>
#include <mutex>

struct io_uring_probe_destroyer {
void operator()(struct io_uring_probe* p) {
if (p) io_uring_free_probe(p);
}
};
#endif

namespace graphbolt {
Expand All @@ -46,6 +54,15 @@ bool IsAvailable() {
!(syscall(
__NR_io_uring_register, 0, IORING_UNREGISTER_BUFFERS, NULL, 0) &&
errno == ENOSYS);

std::unique_ptr<struct io_uring_probe, io_uring_probe_destroyer> probe(
io_uring_get_probe(), io_uring_probe_destroyer());
cached_is_available =
cached_is_available &&
io_uring_opcode_supported(probe.get(), IORING_OP_READ);
cached_is_available =
cached_is_available &&
io_uring_opcode_supported(probe.get(), IORING_OP_READV);
});

return cached_is_available;
Expand Down

0 comments on commit 85bfcf8

Please sign in to comment.