Skip to content

Commit

Permalink
[GraphBolt] Improve liburing code
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 5, 2024
1 parent fb0547d commit 67a6dd4
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 63 deletions.
27 changes: 1 addition & 26 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,6 @@ endif(EXTERNAL_PHMAP_PATH)
target_include_directories(dgl PRIVATE "tensoradapter/include")
target_include_directories(dgl PRIVATE "third_party/pcg/include")

if(USE_LIBURING)
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_definitions(-DHAVE_LIBRARY_LIBURING)
include(ExternalProject)
set(LIBURING_INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/liburing)
ExternalProject_Add(
liburing
SOURCE_DIR ${CMAKE_SOURCE_DIR}/third_party/liburing
CONFIGURE_COMMAND <SOURCE_DIR>/configure --prefix=/
BUILD_COMMAND bash -c "make -j 4"
BUILD_IN_SOURCE ON
INSTALL_COMMAND make install DESTDIR=${LIBURING_INSTALL_DIR}
BUILD_BYPRODUCTS ${LIBURING_INSTALL_DIR}/lib/liburing.a
BUILD_BYPRODUCTS ${LIBURING_INSTALL_DIR}/include
DOWNLOAD_EXTRACT_TIMESTAMP true
)
set(LIBURING_INCLUDE ${LIBURING_INSTALL_DIR}/include)
set(LIBURING ${LIBURING_INSTALL_DIR}/lib/liburing.a)
endif()
endif(USE_LIBURING)

if(EXTERNAL_NANOFLANN_PATH)
include_directories(SYSTEM ${EXTERNAL_NANOFLANN_PATH})
else(EXTERNAL_NANOFLANN_PATH)
Expand Down Expand Up @@ -580,6 +559,7 @@ if(BUILD_GRAPHBOLT)
CMAKE_COMMAND=${CMAKE_CMD}
CUDA_TOOLKIT_ROOT_DIR=${CUDA_TOOLKIT_ROOT_DIR}
USE_CUDA=${USE_CUDA}
USE_LIBURING=${USE_LIBURING}
BINDIR=${CMAKE_CURRENT_BINARY_DIR}
GPU_CACHE_INCLUDE_DIRS="${GPU_CACHE_INCLUDE_DIRS_ESCAPED}"
CFLAGS=${CMAKE_C_FLAGS}
Expand All @@ -593,9 +573,4 @@ if(BUILD_GRAPHBOLT)
if(USE_CUDA)
add_dependencies(graphbolt gpu_cache)
endif(USE_CUDA)
if(USE_LIBURING)
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_dependencies(graphbolt liburing)
endif()
endif(USE_LIBURING)
endif(BUILD_GRAPHBOLT)
29 changes: 25 additions & 4 deletions graphbolt/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required(VERSION 3.18)
project(graphbolt C CXX)
set (CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

if(USE_CUDA)
message(STATUS "Build graphbolt with CUDA support")
Expand Down Expand Up @@ -75,11 +76,31 @@ include_directories(BEFORE ${BOLT_DIR}
"../third_party/dmlc-core/include"
"../third_party/pcg/include")
target_link_libraries(${LIB_GRAPHBOLT_NAME} "${TORCH_LIBRARIES}")

if(CMAKE_SYSTEM_NAME MATCHES "Linux")
if(USE_LIBURING)
target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE "../third_party/liburing/src/include")
get_filename_component(PARENT_DIR "${CMAKE_SOURCE_DIR}" DIRECTORY)
target_link_libraries(${LIB_GRAPHBOLT_NAME} ${PARENT_DIR}/build/third_party/liburing/lib/liburing.a)
add_definitions(-DHAVE_LIBRARY_LIBURING)
include(ExternalProject)
set(LIBURING_INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}/third_party/liburing)
set(LIBURING_C_COMPILER "${CMAKE_C_COMPILER} -w")
ExternalProject_Add(
liburing
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/liburing
CONFIGURE_COMMAND <SOURCE_DIR>/configure --cc=${LIBURING_C_COMPILER} --cxx=${CMAKE_CXX_COMPILER} --prefix=/
BUILD_COMMAND bash -c "make -j 4"
BUILD_IN_SOURCE ON
INSTALL_COMMAND make install DESTDIR=${LIBURING_INSTALL_DIR}
BUILD_BYPRODUCTS ${LIBURING_INSTALL_DIR}/lib/liburing.a
BUILD_BYPRODUCTS ${LIBURING_INSTALL_DIR}/include
DOWNLOAD_EXTRACT_TIMESTAMP true
)
set(LIBURING_INCLUDE ${LIBURING_INSTALL_DIR}/include)
set(LIBURING ${LIBURING_INSTALL_DIR}/lib/liburing.a)

target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE ${LIBURING_INCLUDE})
add_dependencies(${LIB_GRAPHBOLT_NAME} liburing)
target_link_libraries(${LIB_GRAPHBOLT_NAME} ${CMAKE_CURRENT_BINARY_DIR}/third_party/liburing/lib/liburing.a)
message(STATUS "Build graphbolt with liburing.")
endif(USE_LIBURING)
endif()

Expand Down
4 changes: 2 additions & 2 deletions graphbolt/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ if ! [[ -z "${CUDAARCHS}" ]]; then
TORCH_CUDA_ARCH_LIST=${LAST_ARCHITECTURE:0:-1}'.'${LAST_ARCHITECTURE: -1}
fi
fi
CMAKE_FLAGS="-DCUDA_TOOLKIT_ROOT_DIR=$CUDA_TOOLKIT_ROOT_DIR -DUSE_CUDA=$USE_CUDA -DGPU_CACHE_BUILD_DIR=$BINDIR -DTORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST"
echo $CMAKE_FLAGS
CMAKE_FLAGS="-DCUDA_TOOLKIT_ROOT_DIR=$CUDA_TOOLKIT_ROOT_DIR -DUSE_CUDA=$USE_CUDA -DGPU_CACHE_BUILD_DIR=$BINDIR -DTORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST -DUSE_LIBURING=$USE_LIBURING"
echo "graphbolt cmake flags: $CMAKE_FLAGS"

if [ $# -eq 0 ]; then
$CMAKE_COMMAND $CMAKE_FLAGS ..
Expand Down
62 changes: 32 additions & 30 deletions graphbolt/src/cnumpy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <torch/torch.h>

#include <cstring>
#include <regex>
#include <memory>
#include <stdexcept>
namespace graphbolt {
namespace storage {
Expand Down Expand Up @@ -101,23 +101,40 @@ torch::Tensor OnDiskNpyArray::IndexSelect(torch::Tensor index) {
torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
index = index.to(torch::kLong);
// The minimum page size to contain one feature.
int64_t aligned_length =
(feature_size_ + kDiskAlignmentSize) & (long)~(kDiskAlignmentSize - 1);
int64_t num_index = index.numel();

char *read_buffer = (char *)aligned_alloc(
kDiskAlignmentSize,
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_);
char *result_buffer =
(char *)aligned_alloc(kDiskAlignmentSize, feature_size_ * num_index);
const int64_t aligned_length = (feature_size_ + kDiskAlignmentSize - 1) &

Check warning on line 104 in graphbolt/src/cnumpy.cc

View workflow job for this annotation

GitHub Actions / lintrunner

CLANGFORMAT format

See https://clang.llvm.org/docs/ClangFormat.html. Run `lintrunner -a` to apply this patch.
(long)~(kDiskAlignmentSize - 1);
const int64_t num_index = index.numel();

const size_t read_buffer_intended_size =
(aligned_length + kDiskAlignmentSize) * group_size_ * num_thread_;
size_t read_buffer_size = read_buffer_intended_size + kDiskAlignmentSize - 1;
auto read_tensor = torch::empty(
read_buffer_size,
torch::TensorOptions().dtype(torch::kInt8).device(torch::kCPU));
auto read_buffer_void_ptr = read_tensor.data_ptr();
auto read_buffer = reinterpret_cast<char *>(std::align(
kDiskAlignmentSize, read_buffer_intended_size, read_buffer_void_ptr,
read_buffer_size));
TORCH_CHECK(read_buffer, "read_buffer allocation failed!");

std::vector<int64_t> shape;
shape.push_back(num_index);
shape.insert(shape.end(), feature_dim_.begin() + 1, feature_dim_.end());
auto result = torch::empty(
shape, torch::TensorOptions()
.dtype(dtype_)
.layout(torch::kStrided)
.device(torch::kCPU)
.requires_grad(false));
auto result_buffer = reinterpret_cast<char *>(result.data_ptr());
auto index_data = index.data_ptr<int64_t>();

// Record the inside offsets of feteched features.
int64_t residual[group_size_ * num_thread_];
// Indicator for index error.
std::atomic<bool> error_flag{};
TORCH_CHECK(
num_thread_ >= torch::get_num_threads(),
torch::get_num_threads() <= num_thread_,
"The number of threads can not be changed to larger than the number of "
"threads when a disk feature fetcher is constructed.");
torch::parallel_for(
Expand Down Expand Up @@ -171,15 +188,15 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
if (io_uring_wait_cqe(
&io_uring_queue_[thread_id], &complete_queue) < 0) {
perror("io_uring_wait_cqe");
abort();
std::exit(EXIT_FAILURE);
}
struct io_uring_cqe *complete_queues[group_size_];
int cqe_count = io_uring_peek_batch_cqe(
&io_uring_queue_[thread_id], complete_queues,
group_size_);
if (cqe_count == -1) {
perror("io_uring_peek_batch error\n");
abort();
std::exit(EXIT_FAILURE);
}
// Move the head pointer of completion queue.
io_uring_cq_advance(&io_uring_queue_[thread_id], cqe_count);
Expand All @@ -189,7 +206,7 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {

for (int64_t batch_id = batch_offset; batch_id <= i;
batch_id++) {
memcpy(
std::memcpy(
result_buffer + feature_size_ * (batch_id),
read_buffer +
((aligned_length + kDiskAlignmentSize) * group_size_ *
Expand All @@ -207,25 +224,10 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUring(torch::Tensor index) {
}
}
});
auto result = torch::empty({0});
if (!error_flag.load()) {
auto options = torch::TensorOptions()
.dtype(dtype_)
.layout(torch::kStrided)
.device(torch::kCPU)
.requires_grad(false);

std::vector<int64_t> shape;
shape.push_back(num_index);
shape.insert(shape.end(), feature_dim_.begin() + 1, feature_dim_.end());
result = torch::from_blob(result_buffer, torch::IntArrayRef(shape), options)
.clone();
} else {
if (error_flag.load()) {
throw std::runtime_error("IndexError: Index out of range.");
}

free(read_buffer);
free(result_buffer);
return result;
}
#endif // HAVE_LIBRARY_LIBURING
Expand Down

0 comments on commit 67a6dd4

Please sign in to comment.