From f5a37caaf9c3cd81e423602c88198d3f8dc8809c Mon Sep 17 00:00:00 2001 From: humingqing Date: Tue, 7 Feb 2023 18:42:02 +0800 Subject: [PATCH] add graph hard split --- .../ps/table/common_graph_table.cc | 178 +++++++++++++----- .../distributed/ps/table/common_graph_table.h | 12 +- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 17 +- 3 files changed, 155 insertions(+), 52 deletions(-) diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index 62c929ed6798c..80c4a05cd95f7 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -26,18 +26,22 @@ #include "paddle/fluid/distributed/ps/table/graph/graph_node.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h" +#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/framework/generator.h" #include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/platform/timer.h" #include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/string_helper.h" - DECLARE_bool(graph_load_in_parallel); DECLARE_bool(graph_get_neighbor_id); DECLARE_int32(gpugraph_storage_mode); DECLARE_uint64(gpugraph_slot_feasign_max_num); DECLARE_bool(graph_metapath_split_opt); +PADDLE_DEFINE_EXPORTED_bool(graph_edges_split_only_by_src_id, + false, + "multi-node split edges only by src id"); + namespace paddle { namespace distributed { @@ -1211,7 +1215,8 @@ int32_t GraphTable::Load(const std::string &path, const std::string ¶m) { std::string edge_type = param.substr(2); int ret = this->load_edges(path, reverse_edge, edge_type); if (ret != 0) { - VLOG(0) << "Fail to load edges, path[" << path << "] edge_type[" << edge_type << "]"; + VLOG(0) << "Fail to load edges, path[" << path << "] edge_type[" + << edge_type << "]"; return -1; } } @@ -1219,7 +1224,8 @@ int32_t GraphTable::Load(const std::string &path, const std::string ¶m) { std::string node_type = param.substr(1); int ret = this->load_nodes(path, node_type); if (ret != 0) { - VLOG(0) << "Fail to load nodes, path[" << path << "] node_type[" << node_type << "]"; + VLOG(0) << "Fail to load nodes, path[" << path << "] node_type[" + << node_type << "]"; return -1; } } @@ -1258,11 +1264,12 @@ int32_t GraphTable::parse_type_to_typepath( return 0; } -int32_t GraphTable::parse_edge_and_load(std::string etype2files, - std::string graph_data_local_path, - int part_num, - bool reverse, - const std::vector& is_reverse_edge_map) { +int32_t GraphTable::parse_edge_and_load( + std::string etype2files, + std::string graph_data_local_path, + int part_num, + bool reverse, + const std::vector &is_reverse_edge_map) { std::vector etypes; std::unordered_map edge_to_edgedir; int res = parse_type_to_typepath( @@ -1287,9 +1294,11 @@ int32_t GraphTable::parse_edge_and_load(std::string etype2files, only_load_reverse_edge = is_reverse_edge_map[i]; } if (only_load_reverse_edge) { - VLOG(1) << "only_load_reverse_edge is True, etype[" << etypes[i] << "], file_path[" << etype_path << "]"; + VLOG(1) << "only_load_reverse_edge is True, etype[" << etypes[i] + << "], file_path[" << etype_path << "]"; } else { - VLOG(1) << "only_load_reverse_edge is False, etype[" << etypes[i] << "], file_path[" << etype_path << "]"; + VLOG(1) << "only_load_reverse_edge is False, etype[" << etypes[i] + << "], file_path[" << etype_path << "]"; } auto etype_path_list = paddle::framework::localfs_list(etype_path); std::string etype_path_str; @@ -1355,7 +1364,8 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files, for (size_t j = 0; j < ntypes.size(); j++) { int ret = this->load_nodes(npath_str, ntypes[j]); if (ret != 0) { - VLOG(0) << "Fail to load nodes, path[" << npath << "], ntypes[" << ntypes[j] << "]"; + VLOG(0) << "Fail to load nodes, path[" << npath << "], ntypes[" + << ntypes[j] << "]"; return -1; } } @@ -1363,12 +1373,13 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files, return 0; } -int32_t GraphTable::load_node_and_edge_file(std::string etype2files, - std::string ntype2files, - std::string graph_data_local_path, - int part_num, - bool reverse, - const std::vector& is_reverse_edge_map) { +int32_t GraphTable::load_node_and_edge_file( + std::string etype2files, + std::string ntype2files, + std::string graph_data_local_path, + int part_num, + bool reverse, + const std::vector &is_reverse_edge_map) { std::vector etypes; std::unordered_map edge_to_edgedir; int res = parse_type_to_typepath( @@ -1403,9 +1414,11 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files, only_load_reverse_edge = is_reverse_edge_map[i]; } if (only_load_reverse_edge) { - VLOG(1) << "only_load_reverse_edge is True, etype[" << etypes[i] << "], file_path[" << etype_path << "]"; + VLOG(1) << "only_load_reverse_edge is True, etype[" << etypes[i] + << "], file_path[" << etype_path << "]"; } else { - VLOG(1) << "only_load_reverse_edge is False, etype[" << etypes[i] << "], file_path[" << etype_path << "]"; + VLOG(1) << "only_load_reverse_edge is False, etype[" << etypes[i] + << "], file_path[" << etype_path << "]"; } auto etype_path_list = paddle::framework::localfs_list(etype_path); std::string etype_path_str; @@ -1453,7 +1466,8 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files, for (size_t j = 0; j < ntypes.size(); j++) { int ret = this->load_nodes(npath_str, ntypes[j]); if (ret != 0) { - VLOG(0) << "Fail to load nodes, path[" << npath_str << "], ntypes[" << ntypes[j] << "]"; + VLOG(0) << "Fail to load nodes, path[" << npath_str + << "], ntypes[" << ntypes[j] << "]"; return -1; } } @@ -1478,7 +1492,8 @@ int32_t GraphTable::get_nodes_ids_by_ranges( std::mutex mutex; int start = 0, end, index = 0, total_size = 0; res.clear(); - auto &shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] + : feature_shards[idx]; std::vector> tasks; for (size_t i = 0; i < shards.size() && index < (int)ranges.size(); i++) { end = total_size + shards[i]->get_size(); @@ -1522,6 +1537,9 @@ int32_t GraphTable::get_nodes_ids_by_ranges( std::pair GraphTable::parse_node_file( const std::string &path, const std::string &node_type, int idx) { +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + auto ps_wrapper = paddle::framework::PSGPUWrapper::GetInstance(); +#endif std::ifstream file(path); std::string line; uint64_t local_count = 0; @@ -1541,6 +1559,13 @@ std::pair GraphTable::parse_node_file( continue; } uint64_t id = std::strtoul(vals[0].ptr, NULL, 10); +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + if (!ps_wrapper->IsKeyForSelfRank(id)) { + VLOG(2) << "id " << id << " not matched, node_id: " << node_id_ + << " , node_num:" << node_num_; + continue; + } +#endif size_t shard_id = id % shard_num; if (shard_id >= shard_end || shard_id < shard_start) { VLOG(4) << "will not load " << id << " from " << path @@ -1572,6 +1597,9 @@ std::pair GraphTable::parse_node_file( std::pair GraphTable::parse_node_file( const std::string &path) { +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + auto ps_wrapper = paddle::framework::PSGPUWrapper::GetInstance(); +#endif std::ifstream file(path); std::string line; uint64_t local_count = 0; @@ -1606,7 +1634,13 @@ std::pair GraphTable::parse_node_file( continue; } local_count++; - +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + if (!ps_wrapper->IsKeyForSelfRank(id)) { + VLOG(2) << "id " << id << " not matched, node_id: " << node_id_ + << " , node_num:" << node_num_; + continue; + } +#endif size_t index = shard_id - shard_start; auto node = feature_shards[idx][index]->add_feature_node(id, false); if (node != NULL) { @@ -1671,7 +1705,7 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) { } if (is_parse_node_fail_) { VLOG(0) << "Fail to load nodes, path[" << paths[0] << ".." - << paths[paths.size() -1] << "] node_type[" << node_type << "]"; + << paths[paths.size() - 1] << "] node_type[" << node_type << "]"; return -1; } @@ -1692,6 +1726,9 @@ int32_t GraphTable::build_sampler(int idx, std::string sample_type) { std::pair GraphTable::parse_edge_file( const std::string &path, int idx, bool reverse) { +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + auto ps_wrapper = paddle::framework::PSGPUWrapper::GetInstance(); +#endif std::string sample_type = "random"; bool is_weighted = false; std::ifstream file(path); @@ -1709,7 +1746,6 @@ std::pair GraphTable::parse_edge_file( while (std::getline(file, line)) { size_t start = line.find_first_of('\t'); if (start == std::string::npos) continue; - local_count++; uint64_t src_id = std::stoull(&line[0]); uint64_t dst_id = std::stoull(&line[start + 1]); if (reverse) { @@ -1721,6 +1757,15 @@ std::pair GraphTable::parse_edge_file( continue; } } + local_count++; +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + if (!ps_wrapper->IsKeyForSelfRank(src_id)) { + VLOG(2) << " node num :" << src_id + << " not split into node_id_:" << node_id_ + << " node_num:" << node_num_; + continue; + } +#endif float weight = 1; size_t last = line.find_last_of('\t'); @@ -1731,7 +1776,7 @@ std::pair GraphTable::parse_edge_file( } if (src_shard_id >= shard_end || src_shard_id < shard_start) { - VLOG(4) << "will not load " << src_id << " from " << path + VLOG(0) << "will not load " << src_id << " from " << path << ", please check id distribution"; continue; } @@ -1739,12 +1784,23 @@ std::pair GraphTable::parse_edge_file( auto node = edge_shards[idx][index]->add_graph_node(src_id); if (node != NULL) { node->build_edges(is_weighted); - node->add_edge(dst_id, weight); +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + if (FLAGS_graph_edges_split_only_by_src_id + || ps_wrapper->IsKeyForSelfRank(dst_id)) { +#endif + node->add_edge(dst_id, weight); + local_valid_count++; +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + } else { + VLOG(2) << " dest node num :" << dst_id + << " will not add egde, node_id_:" << node_id_ + << " node_num:" << node_num_; + } +#endif } - - local_valid_count++; } - VLOG(2) << local_valid_count << "/" << local_count << " edges are loaded from filepath->" << path; + VLOG(2) << local_valid_count << "/" << local_count + << " edges are loaded from filepath->" << path; return {local_count, local_valid_count}; } @@ -1835,7 +1891,8 @@ Node *GraphTable::find_node(GraphTableType table_type, uint64_t id) { } Node *node = nullptr; size_t index = shard_id - shard_start; - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; + auto &search_shards = + table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; for (auto &search_shard : search_shards) { PADDLE_ENFORCE_NOT_NULL(search_shard[index], paddle::platform::errors::InvalidArgument( @@ -1854,7 +1911,9 @@ Node *GraphTable::find_node(GraphTableType table_type, int idx, uint64_t id) { return nullptr; } size_t index = shard_id - shard_start; - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; PADDLE_ENFORCE_NOT_NULL(search_shards[index], paddle::platform::errors::InvalidArgument( "search_shard[%d] should not be null.", index)); @@ -1871,7 +1930,9 @@ uint32_t GraphTable::get_thread_pool_index_by_shard_index( } int32_t GraphTable::clear_nodes(GraphTableType table_type, int idx) { - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; for (size_t i = 0; i < search_shards.size(); i++) { search_shards[i]->clear(); } @@ -1884,7 +1945,8 @@ int32_t GraphTable::random_sample_nodes(GraphTableType table_type, std::unique_ptr &buffer, int &actual_size) { int total_size = 0; - auto &shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] + : feature_shards[idx]; for (int i = 0; i < (int)shards.size(); i++) { total_size += shards[i]->get_size(); } @@ -2262,7 +2324,8 @@ int GraphTable::get_all_id(GraphTableType table_type, int slice_num, std::vector> *output) { MergeShardVector shard_merge(output, slice_num); - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; + auto &search_shards = + table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; std::vector> tasks; for (size_t idx = 0; idx < search_shards.size(); idx++) { for (size_t j = 0; j < search_shards[idx].size(); j++) { @@ -2284,9 +2347,12 @@ int GraphTable::get_all_id(GraphTableType table_type, } int GraphTable::get_all_neighbor_id( - GraphTableType table_type, int slice_num, std::vector> *output) { + GraphTableType table_type, + int slice_num, + std::vector> *output) { MergeShardVector shard_merge(output, slice_num); - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; + auto &search_shards = + table_type == GraphTableType::EDGE_TABLE ? edge_shards : feature_shards; std::vector> tasks; for (size_t idx = 0; idx < search_shards.size(); idx++) { for (size_t j = 0; j < search_shards[idx].size(); j++) { @@ -2312,7 +2378,9 @@ int GraphTable::get_all_id(GraphTableType table_type, int slice_num, std::vector> *output) { MergeShardVector shard_merge(output, slice_num); - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; std::vector> tasks; VLOG(3) << "begin task, task_pool_size_[" << task_pool_size_ << "]"; for (size_t i = 0; i < search_shards.size(); i++) { @@ -2338,7 +2406,9 @@ int GraphTable::get_all_neighbor_id( int slice_num, std::vector> *output) { MergeShardVector shard_merge(output, slice_num); - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; std::vector> tasks; VLOG(3) << "begin task, task_pool_size_[" << task_pool_size_ << "]"; for (size_t i = 0; i < search_shards.size(); i++) { @@ -2365,7 +2435,9 @@ int GraphTable::get_all_feature_ids( int slice_num, std::vector> *output) { MergeShardVector shard_merge(output, slice_num); - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; std::vector> tasks; for (size_t i = 0; i < search_shards.size(); i++) { tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( @@ -2404,7 +2476,9 @@ int32_t GraphTable::pull_graph_list(GraphTableType table_type, int step) { if (start < 0) start = 0; int size = 0, cur_size; - auto &search_shards = table_type == GraphTableType::EDGE_TABLE ? edge_shards[idx] : feature_shards[idx]; + auto &search_shards = table_type == GraphTableType::EDGE_TABLE + ? edge_shards[idx] + : feature_shards[idx]; std::vector>> tasks; for (size_t i = 0; i < search_shards.size() && total_size > 0; i++) { cur_size = search_shards[i]->get_size(); @@ -2460,20 +2534,20 @@ int32_t GraphTable::get_server_index_by_id(uint64_t id) { } int32_t GraphTable::Initialize(const TableParameter &config, const FsClientParameter &fs_config) { - LOG(INFO) << "in graphTable initialize"; + VLOG(1) << "in graphTable initialize"; _config = config; if (InitializeAccessor() != 0) { - LOG(WARNING) << "Table accessor initialize failed"; + VLOG(1) << "Warning: Table accessor initialize failed"; return -1; } if (_afs_client.initialize(fs_config) != 0) { - LOG(WARNING) << "Table fs_client initialize failed"; + VLOG(1) << "Warning: Table fs_client initialize failed"; // return -1; } auto graph = config.graph_parameter(); shard_num = _config.shard_num(); - LOG(INFO) << "in graphTable initialize over"; + VLOG(1) << "in graphTable initialize over"; return Initialize(graph); } @@ -2536,7 +2610,18 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { auto graph_feature = graph.graph_feature(); auto node_types = graph.node_types(); auto edge_types = graph.edge_types(); - VLOG(0) << "got " << edge_types.size() << " edge types in total"; + +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO) + auto ps_wrapper = paddle::framework::PSGPUWrapper::GetInstance(); + node_num_ = ps_wrapper->GetRankNum(); + node_id_ = ps_wrapper->GetRankId(); +#endif + + VLOG(0) << "got " << edge_types.size() + << " edge types in total, rank id=" << node_id_ + << ", rank size=" << node_num_ + << ", graph_edges_split_only_by_src_id=" + << FLAGS_graph_edges_split_only_by_src_id; feat_id_map.resize(node_types.size()); for (int k = 0; k < edge_types.size(); k++) { VLOG(0) << "in initialize: get a edge_type " << edge_types[k]; @@ -2644,7 +2729,8 @@ void GraphTable::build_graph_type_keys() { for (auto &it : this->feature_to_id) { auto node_idx = it.second; std::vector> keys; - this->get_all_feature_ids(GraphTableType::FEATURE_TABLE, node_idx, 1, &keys); + this->get_all_feature_ids( + GraphTableType::FEATURE_TABLE, node_idx, 1, &keys); graph_total_keys_.insert( graph_total_keys_.end(), keys[0].begin(), keys[0].end()); } diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.h b/paddle/fluid/distributed/ps/table/common_graph_table.h index e64d6c047fecd..86eaab28f2f74 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.h +++ b/paddle/fluid/distributed/ps/table/common_graph_table.h @@ -565,12 +565,12 @@ class GraphTable : public Table { std::string graph_data_local_path, int part_num, bool reverse, - const std::vector& is_reverse_edge_map); + const std::vector &is_reverse_edge_map); int32_t parse_edge_and_load(std::string etype2files, std::string graph_data_local_path, int part_num, bool reverse, - const std::vector& is_reverse_edge_map); + const std::vector &is_reverse_edge_map); int32_t parse_node_and_load(std::string ntype2files, std::string graph_data_local_path, int part_num); @@ -629,7 +629,7 @@ class GraphTable : public Table { virtual void Clear() {} virtual int32_t Flush() { return 0; } virtual int32_t Shrink(const std::string ¶m) { return 0; } - //指定保存路径 + // 指定保存路径 virtual int32_t Save(const std::string &path, const std::string &converter) { return 0; } @@ -743,6 +743,10 @@ class GraphTable : public Table { void build_graph_total_keys(); void build_graph_type_keys(); + void set_node_num(int node_num) { node_num_ = node_num; } + + void set_node_id(int node_id) { node_id_ = node_id; } + std::vector graph_total_keys_; std::vector> graph_type_keys_; std::unordered_map type_to_index_; @@ -790,6 +794,8 @@ class GraphTable : public Table { std::string feature_separator_ = std::string(" "); std::vector slot_feature_num_map_; bool is_parse_node_fail_ = false; + int node_num_ = 1; + int node_id_ = 0; }; /* diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index ec0cdb27aaf17..89459905dfb27 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -708,8 +708,19 @@ class PSGPUWrapper { #endif // for node rank int PartitionKeyForRank(const uint64_t& key) { - return ((key / device_num_) % node_size_); + return static_cast((key / device_num_) % node_size_); } + // is key for self rank + bool IsKeyForSelfRank(const uint64_t& key) { + if (node_size_ == 1) { + return true; + } + return (static_cast((key / device_num_) % node_size_) == rank_id_); + } + // rank id + int GetRankId(void) { return rank_id_; } + // rank size + int GetRankNum(void) { return node_size_; } private: static std::shared_ptr s_instance_; @@ -745,8 +756,8 @@ class PSGPUWrapper { double time_4 = 0.0; int multi_node_{0}; - int rank_id_; - int node_size_; + int rank_id_ = 0; + int node_size_ = 1; int device_num_ = 8; uint64_t table_id_; int gpu_graph_mode_ = 0;