Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconstruct table,reduce hbm 20.8G #46

Merged
merged 4 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,36 @@ int32_t GraphTable::Load_to_ssd(const std::string &path,
paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
std::vector<uint64_t> &node_ids, int slot_num) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
for (int i = 0; i < task_pool_size_; i++) {
auto predsize = node_ids.size() / task_pool_size_;
bags[i].reserve(predsize * 1.2);
}

for (auto x : node_ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;
std::vector<uint64_t> feature_array[task_pool_size_];
std::vector<uint8_t> slot_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphFeaNode>
node_fea_array[task_pool_size_];
std::vector<uint64_t> node_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsFeaInfo>
node_fea_info_array[task_pool_size_];
for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphFeaNode x;
uint64_t node_id;
paddle::framework::GpuPsFeaInfo x;
std::vector<uint64_t> feature_ids;
for (size_t j = 0; j < bags[i].size(); j++) {
// TODO use FEATURE_TABLE instead
Node *v = find_node(1, bags[i][j]);
x.node_id = bags[i][j];
node_id = bags[i][j];
if (v == NULL) {
x.feature_size = 0;
x.feature_offset = 0;
node_fea_array[i].push_back(x);
node_fea_info_array[i].push_back(x);
} else {
// x <- v
x.feature_offset = feature_array[i].size();
Expand All @@ -91,8 +99,9 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
}
x.feature_size = total_feature_size;
node_fea_array[i].push_back(x);
node_fea_info_array[i].push_back(x);
}
node_id_array[i].push_back(node_id);
}
return 0;
}));
Expand All @@ -109,9 +118,10 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num);
unsigned int offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_fea_array[i].size(); j++) {
res.node_list[ind] = node_fea_array[i][j];
res.node_list[ind++].feature_offset += offset;
for (int j = 0; j < (int)node_id_array[i].size(); j++) {
res.node_list[ind] = node_id_array[i][j];
res.fea_info_list[ind] = node_fea_info_array[i][j];
res.fea_info_list[ind++].feature_offset += offset;
}
for (size_t j = 0; j < feature_array[i].size(); j++) {
res.feature_list[offset + j] = feature_array[i][j];
Expand All @@ -125,49 +135,62 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
int idx, std::vector<uint64_t> ids) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
for (int i = 0; i < task_pool_size_; i++) {
auto predsize = ids.size() / task_pool_size_;
bags[i].reserve(predsize * 1.2);
}
for (auto x : ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;
std::vector<uint64_t> edge_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphNode> node_array[task_pool_size_];
std::vector<uint64_t> node_array[task_pool_size_]; // node id list
std::vector<paddle::framework::GpuPsNodeInfo> info_array[task_pool_size_];
std::vector<uint64_t> edge_array[task_pool_size_]; // edge id list

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphNode x;
node_array[i].resize(bags[i].size());
info_array[i].resize(bags[i].size());
edge_array[i].reserve(bags[i].size());

for (size_t j = 0; j < bags[i].size(); j++) {
Node *v = find_node(0, idx, bags[i][j]);
x.node_id = bags[i][j];
if (v == NULL) {
x.neighbor_size = 0;
x.neighbor_offset = 0;
node_array[i].push_back(x);
} else {
x.neighbor_size = v->get_neighbor_size();
x.neighbor_offset = edge_array[i].size();
node_array[i].push_back(x);
for (size_t k = 0; k < (size_t)x.neighbor_size; k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
auto node_id = bags[i][j];
node_array[i][j] = node_id;
Node *v = find_node(0, idx, node_id);
if (v != nullptr) {
info_array[i][j].neighbor_offset = edge_array[i].size();
info_array[i][j].neighbor_size = v->get_neighbor_size();
for (size_t k = 0; k < v->get_neighbor_size(); k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
}
}
else {
info_array[i][j].neighbor_offset = 0;
info_array[i][j].neighbor_size = 0;
}
}
}
return 0;
}));
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res;

int64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
tot_len += edge_array[i].size();
}

paddle::framework::GpuPsCommGraph res;
res.init_on_cpu(tot_len, ids.size());
int64_t offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j];
res.node_list[ind++].neighbor_offset += offset;
res.node_info_list[ind] = info_array[i][j];
res.node_info_list[ind++].neighbor_offset += offset;
}
for (size_t j = 0; j < edge_array[i].size(); j++) {
res.neighbor_list[offset + j] = edge_array[i][j];
Expand Down Expand Up @@ -275,7 +298,7 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx,
std::string str;
if (_db->get(i, ch, sizeof(int) * 2 + sizeof(uint64_t), str) == 0) {
count[i] += (int64_t)str.size();
for (int j = 0; j < (int)str.size(); j += sizeof(uint64_t)) {
for (size_t j = 0; j < (int)str.size(); j += sizeof(uint64_t)) {
uint64_t id = *(uint64_t *)(str.c_str() + j);
add_comm_edge(idx, v, id);
}
Expand Down Expand Up @@ -345,7 +368,7 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
score[i] = 0;
}
}
for (int j = 0; j < (int)value.size(); j += sizeof(uint64_t)) {
for (size_t j = 0; j < (int)value.size(); j += sizeof(uint64_t)) {
uint64_t v = *((uint64_t *)(value.c_str() + j));
int index = -1;
if (id_map.find(v) != id_map.end()) {
Expand Down Expand Up @@ -438,7 +461,7 @@ void GraphTable::clear_graph(int idx) {
}
}
int32_t GraphTable::load_next_partition(int idx) {
if (next_partition >= partitions[idx].size()) {
if (next_partition >= (int)partitions[idx].size()) {
VLOG(0) << "partition iteration is done";
return -1;
}
Expand Down Expand Up @@ -500,7 +523,7 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
std::vector<Node *> &v = shards[i]->get_bucket();
for (size_t j = 0; j < v.size(); j++) {
std::vector<uint64_t> s;
for (int k = 0; k < (int)v[j]->get_neighbor_size(); k++) {
for (size_t k = 0; k < (int)v[j]->get_neighbor_size(); k++) {
s.push_back(v[j]->get_neighbor_id(k));
}
cost += v[j]->get_neighbor_size() * sizeof(uint64_t);
Expand Down Expand Up @@ -1794,7 +1817,7 @@ std::vector<std::vector<uint64_t>> GraphTable::get_all_id(int type_id, int idx,
auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx];
std::vector<std::future<std::vector<uint64_t>>> tasks;
VLOG(0) << "begin task, task_pool_size_[" << task_pool_size_ << "]";
for (int i = 0; i < search_shards.size(); i++) {
for (size_t i = 0; i < search_shards.size(); i++) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&search_shards, i]() -> std::vector<uint64_t> {
return search_shards[i]->get_all_id();
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ limitations under the License. */
#if defined(PADDLE_WITH_CUDA)
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

去掉吧。

#endif

DECLARE_int32(record_pool_max_size);
Expand Down Expand Up @@ -422,7 +423,6 @@ struct UsedSlotGpuType {
};

#if defined(PADDLE_WITH_CUDA) && defined(PADDLE_WITH_HETERPS)
#define CUDA_CHECK(val) CHECK(val == gpuSuccess)
template <typename T>
struct CudaBuffer {
T* cu_buffer;
Expand Down
Loading