Skip to content

Commit

Permalink
Merge branch 'master' into DiskBasedFeature_dglexample
Browse files Browse the repository at this point in the history
  • Loading branch information
Liu-rj committed Aug 2, 2024
2 parents 8b23f19 + 1a8cf7e commit d177173
Show file tree
Hide file tree
Showing 25 changed files with 236 additions and 179 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@
[submodule "third_party/GKlib"]
path = third_party/GKlib
url = https://github.com/KarypisLab/GKlib.git
[submodule "third_party/taskflow"]
path = third_party/taskflow
url = https://github.com/taskflow/taskflow.git
3 changes: 1 addition & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ def is_authorized(name) {
'rudongyu', 'classicsong', 'HuXiangkun', 'hetong007', 'kylasa',
'frozenbugs', 'peizhou001', 'zheng-da', 'czkkkkkk', 'thvasilo',
// Intern:
'keli-wen', 'caojy1998', 'RamonZhou', 'xiangyuzhi', 'Skeleton003', 'yxy235',
'hutiechuan', 'pyynb', 'az15240', 'BowenYao18', 'kec020',
'pyynb', 'az15240', 'BowenYao18', 'kec020', 'Liu-rj',
// Friends:
'nv-dlasalle', 'yaox12', 'chang-l', 'Kh4L', 'VibhuJawa', 'kkranen',
'TristonC', 'mfbalin',
Expand Down
4 changes: 3 additions & 1 deletion docs/source/api/python/dgl.graphbolt.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

.. currentmodule:: dgl.graphbolt

**dgl.graphbolt** is a dataloading framework for GNN that provides well-defined
**dgl.graphbolt** is a dataloading framework for GNNs that provides well-defined
APIs for each stage of the data pipeline and multiple standard implementations.

Dataset
Expand Down Expand Up @@ -55,6 +55,8 @@ collection of features.
BasicFeatureStore
TorchBasedFeature
TorchBasedFeatureStore
DiskBasedFeature
CPUCachedFeature
GPUCachedFeature


Expand Down
5 changes: 5 additions & 0 deletions graphbolt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TORCH_CXX_FLAGS}")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g3 -ggdb")

set(LIB_GRAPHBOLT_NAME "graphbolt_pytorch_${TORCH_VER}")
option(BUILD_WITH_TASKFLOW "Use taskflow as parallel backend" ON)

set(BOLT_DIR "${CMAKE_CURRENT_SOURCE_DIR}/src")
set(BOLT_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/include")
Expand Down Expand Up @@ -79,6 +80,10 @@ include_directories(BEFORE ${BOLT_DIR}
"../third_party/pcg/include"
"../third_party/phmap")
target_link_libraries(${LIB_GRAPHBOLT_NAME} "${TORCH_LIBRARIES}")
if(BUILD_WITH_TASKFLOW)
target_include_directories(${LIB_GRAPHBOLT_NAME} PRIVATE "../third_party/taskflow")
target_compile_definitions(${LIB_GRAPHBOLT_NAME} PRIVATE BUILD_WITH_TASKFLOW=1)
endif()

if(CMAKE_SYSTEM_NAME MATCHES "Linux")
if(USE_LIBURING)
Expand Down
113 changes: 97 additions & 16 deletions graphbolt/include/graphbolt/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,62 @@
#include <exception>
#include <future>
#include <memory>
#include <mutex>
#include <type_traits>

#ifdef BUILD_WITH_TASKFLOW
#include <taskflow/algorithm/for_each.hpp>
#include <taskflow/taskflow.hpp>
#endif

namespace graphbolt {

enum ThreadPool { intraop, interop };

#ifdef BUILD_WITH_TASKFLOW

template <ThreadPool pool_type>
inline tf::Executor& _get_thread_pool() {
static std::unique_ptr<tf::Executor> pool;
static std::once_flag flag;
std::call_once(flag, [&] {
const int num_threads = pool_type == ThreadPool::intraop
? torch::get_num_threads()
: torch::get_num_interop_threads();
pool = std::make_unique<tf::Executor>(num_threads);
});
return *pool.get();
}

inline tf::Executor& intraop_pool() {
return _get_thread_pool<ThreadPool::intraop>();
}

inline tf::Executor& interop_pool() {
return _get_thread_pool<ThreadPool::interop>();
}

inline tf::Executor& get_thread_pool(ThreadPool pool_type) {
return pool_type == ThreadPool::intraop ? intraop_pool() : interop_pool();
}
#endif // BUILD_WITH_TASKFLOW

inline int get_num_threads() {
#ifdef BUILD_WITH_TASKFLOW
return intraop_pool().num_workers();
#else
return torch::get_num_threads();
#endif
}

inline int get_num_interop_threads() {
#ifdef BUILD_WITH_TASKFLOW
return interop_pool().num_workers();
#else
return torch::get_num_interop_threads();
#endif
}

template <typename T>
class Future : public torch::CustomClassHolder {
public:
Expand All @@ -52,6 +104,9 @@ class Future : public torch::CustomClassHolder {
template <typename F>
inline auto async(F function) {
using T = decltype(function());
#ifdef BUILD_WITH_TASKFLOW
auto future = interop_pool().async(function);
#else
auto promise = std::make_shared<std::promise<T>>();
auto future = promise->get_future();
at::launch([=]() {
Expand All @@ -61,27 +116,16 @@ inline auto async(F function) {
} else
promise->set_value(function());
});
#endif
return c10::make_intrusive<Future<T>>(std::move(future));
}

/**
* @brief GraphBolt's version of torch::parallel_for. Since torch::parallel_for
* uses OpenMP threadpool, async tasks can not make use of it due to multiple
* OpenMP threadpools being created for each async thread. Moreover, inside
* graphbolt::parallel_for, we should not make use of any native CPU torch ops
* as they will spawn an OpenMP threadpool.
*/
template <typename F>
inline void parallel_for(
template <ThreadPool pool_type, typename F>
inline void _parallel_for(
const int64_t begin, const int64_t end, const int64_t grain_size,
const F& f) {
if (begin >= end) return;
std::promise<void> promise;
std::future<void> future;
std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;

int64_t num_threads = torch::get_num_threads();
int64_t num_threads = get_num_threads();
const auto num_iter = end - begin;
const bool use_parallel =
(num_iter > grain_size && num_iter > 1 && num_threads > 1);
Expand All @@ -93,12 +137,27 @@ inline void parallel_for(
num_threads = std::min(num_threads, at::divup(end - begin, grain_size));
}
int64_t chunk_size = at::divup((end - begin), num_threads);
#ifdef BUILD_WITH_TASKFLOW
tf::Taskflow flow;
flow.for_each_index(int64_t{0}, num_threads, int64_t{1}, [=](int64_t tid) {
const int64_t begin_tid = begin + tid * chunk_size;
if (begin_tid < end) {
const int64_t end_tid = std::min(end, begin_tid + chunk_size);
f(begin_tid, end_tid);
}
});
_get_thread_pool<pool_type>().run(flow).wait();
#else
std::promise<void> promise;
std::future<void> future;
std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;
int num_launched = 0;
std::atomic<int> num_finished = 0;
for (int tid = num_threads - 1; tid >= 0; tid--) {
const int64_t begin_tid = begin + tid * chunk_size;
const int64_t end_tid = std::min(end, begin_tid + chunk_size);
if (begin_tid < end) {
const int64_t end_tid = std::min(end, begin_tid + chunk_size);
if (tid == 0) {
// Launch the thread 0's work inline.
f(begin_tid, end_tid);
Expand Down Expand Up @@ -132,6 +191,28 @@ inline void parallel_for(
std::rethrow_exception(eptr);
}
}
#endif
}

/**
* @brief GraphBolt's version of torch::parallel_for. Since torch::parallel_for
* uses OpenMP threadpool, async tasks can not make use of it due to multiple
* OpenMP threadpools being created for each async thread. Moreover, inside
* graphbolt::parallel_for, we should not make use of any native CPU torch ops
* as they will spawn an OpenMP threadpool.
*/
template <typename F>
inline void parallel_for(
const int64_t begin, const int64_t end, const int64_t grain_size,
const F& f) {
_parallel_for<ThreadPool::intraop>(begin, end, grain_size, f);
}

template <typename F>
inline void parallel_for_interop(
const int64_t begin, const int64_t end, const int64_t grain_size,
const F& f) {
_parallel_for<ThreadPool::interop>(begin, end, grain_size, f);
}

} // namespace graphbolt
Expand Down
75 changes: 34 additions & 41 deletions graphbolt/src/cache_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ BaseCachePolicy::QueryImpl(CachePolicy& policy, torch::Tensor keys) {

template <typename CachePolicy>
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
BaseCachePolicy::QueryAndThenReplaceImpl(
CachePolicy& policy, torch::Tensor keys) {
BaseCachePolicy::QueryAndReplaceImpl(CachePolicy& policy, torch::Tensor keys) {
auto positions = torch::empty_like(
keys, keys.options()
.dtype(torch::kInt64)
Expand All @@ -100,9 +99,9 @@ BaseCachePolicy::QueryAndThenReplaceImpl(
auto pointers_ptr =
reinterpret_cast<CacheKey**>(pointers.data_ptr<int64_t>());
auto missing_keys_ptr = missing_keys.data_ptr<index_t>();
auto iterators = std::unique_ptr<typename CachePolicy::map_iterator[]>(
new typename CachePolicy::map_iterator[keys.size(0)]);
// QueryImpl here.
set_t<int64_t> position_set;
position_set.reserve(keys.size(0));
// Query and Replace combined.
for (int64_t i = 0; i < keys.size(0); i++) {
const auto key = keys_ptr[i];
const auto [it, can_read] = policy.Emplace(key);
Expand All @@ -114,28 +113,20 @@ BaseCachePolicy::QueryAndThenReplaceImpl(
} else {
indices_ptr[--missing_cnt] = i;
missing_keys_ptr[missing_cnt] = key;
iterators[missing_cnt] = it;
}
}
// ReplaceImpl here.
set_t<int64_t> position_set;
position_set.reserve(keys.size(0));
for (int64_t i = missing_cnt; i < missing_keys.size(0); i++) {
auto it = iterators[i];
if (it->second == policy.getMapSentinelValue()) {
policy.Insert(it);
// After Insert, it->second is not nullptr anymore.
TORCH_CHECK(
// If there are duplicate values and the key was just inserted,
// we do not have to check for the uniqueness of the positions.
std::get<1>(position_set.insert(it->second->getPos())),
"Can't insert all, larger cache capacity is needed.");
} else {
policy.MarkExistingWriting(it);
CacheKey* cache_key_ptr;
if (it->second == policy.getMapSentinelValue()) {
cache_key_ptr = policy.Insert(it);
TORCH_CHECK(
// We check for the uniqueness of the positions.
std::get<1>(position_set.insert(cache_key_ptr->getPos())),
"Can't insert all, larger cache capacity is needed.");
} else {
cache_key_ptr = &*it->second;
policy.MarkExistingWriting(it);
}
positions_ptr[missing_cnt] = cache_key_ptr->getPos();
pointers_ptr[missing_cnt] = cache_key_ptr;
}
auto& cache_key = *it->second;
positions_ptr[i] = cache_key.getPos();
pointers_ptr[i] = &cache_key;
}
}));
return {positions, indices, pointers, missing_keys.slice(0, found_cnt)};
Expand Down Expand Up @@ -192,15 +183,16 @@ void BaseCachePolicy::ReadingWritingCompletedImpl(
}

S3FifoCachePolicy::S3FifoCachePolicy(int64_t capacity)
: small_queue_(capacity),
main_queue_(capacity),
// We sometimes first insert and then evict. + 1 is to compensate for that.
: small_queue_(capacity + 1),
main_queue_(capacity + 1),
ghost_queue_(capacity - capacity / 10),
capacity_(capacity),
cache_usage_(0),
small_queue_size_target_(capacity / 10) {
TORCH_CHECK(small_queue_size_target_ > 0, "Capacity is not large enough.");
ghost_set_.reserve(ghost_queue_.Capacity());
key_to_cache_key_.reserve(kCapacityFactor * capacity);
key_to_cache_key_.reserve(kCapacityFactor * (capacity + 1));
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
Expand All @@ -209,8 +201,8 @@ S3FifoCachePolicy::Query(torch::Tensor keys) {
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
S3FifoCachePolicy::QueryAndThenReplace(torch::Tensor keys) {
return QueryAndThenReplaceImpl(*this, keys);
S3FifoCachePolicy::QueryAndReplace(torch::Tensor keys) {
return QueryAndReplaceImpl(*this, keys);
}

std::tuple<torch::Tensor, torch::Tensor> S3FifoCachePolicy::Replace(
Expand All @@ -230,7 +222,7 @@ SieveCachePolicy::SieveCachePolicy(int64_t capacity)
// Ensure that queue_ is constructed first before accessing its `.end()`.
: queue_(), hand_(queue_.end()), capacity_(capacity), cache_usage_(0) {
TORCH_CHECK(capacity > 0, "Capacity needs to be positive.");
key_to_cache_key_.reserve(kCapacityFactor * capacity);
key_to_cache_key_.reserve(kCapacityFactor * (capacity + 1));
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
Expand All @@ -239,8 +231,8 @@ SieveCachePolicy::Query(torch::Tensor keys) {
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
SieveCachePolicy::QueryAndThenReplace(torch::Tensor keys) {
return QueryAndThenReplaceImpl(*this, keys);
SieveCachePolicy::QueryAndReplace(torch::Tensor keys) {
return QueryAndReplaceImpl(*this, keys);
}

std::tuple<torch::Tensor, torch::Tensor> SieveCachePolicy::Replace(
Expand All @@ -259,7 +251,7 @@ void SieveCachePolicy::WritingCompleted(torch::Tensor keys) {
LruCachePolicy::LruCachePolicy(int64_t capacity)
: capacity_(capacity), cache_usage_(0) {
TORCH_CHECK(capacity > 0, "Capacity needs to be positive.");
key_to_cache_key_.reserve(kCapacityFactor * capacity);
key_to_cache_key_.reserve(kCapacityFactor * (capacity + 1));
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
Expand All @@ -268,8 +260,8 @@ LruCachePolicy::Query(torch::Tensor keys) {
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
LruCachePolicy::QueryAndThenReplace(torch::Tensor keys) {
return QueryAndThenReplaceImpl(*this, keys);
LruCachePolicy::QueryAndReplace(torch::Tensor keys) {
return QueryAndReplaceImpl(*this, keys);
}

std::tuple<torch::Tensor, torch::Tensor> LruCachePolicy::Replace(
Expand All @@ -286,9 +278,10 @@ void LruCachePolicy::WritingCompleted(torch::Tensor keys) {
}

ClockCachePolicy::ClockCachePolicy(int64_t capacity)
: queue_(capacity), capacity_(capacity), cache_usage_(0) {
// We sometimes first insert and then evict. + 1 is to compensate for that.
: queue_(capacity + 1), capacity_(capacity), cache_usage_(0) {
TORCH_CHECK(capacity > 0, "Capacity needs to be positive.");
key_to_cache_key_.reserve(kCapacityFactor * capacity);
key_to_cache_key_.reserve(kCapacityFactor * (capacity + 1));
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
Expand All @@ -297,8 +290,8 @@ ClockCachePolicy::Query(torch::Tensor keys) {
}

std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>
ClockCachePolicy::QueryAndThenReplace(torch::Tensor keys) {
return QueryAndThenReplaceImpl(*this, keys);
ClockCachePolicy::QueryAndReplace(torch::Tensor keys) {
return QueryAndReplaceImpl(*this, keys);
}

std::tuple<torch::Tensor, torch::Tensor> ClockCachePolicy::Replace(
Expand Down
Loading

0 comments on commit d177173

Please sign in to comment.