Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#17 from Thunderbrook/gpugraph_0602
Browse files Browse the repository at this point in the history
adapt uint64 and iterate edge table
  • Loading branch information
Thunderbrook committed Jun 2, 2022
2 parents d01a280 + 4fb5c13 commit 20a1aa3
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 39 deletions.
26 changes: 13 additions & 13 deletions paddle/fluid/framework/data_feed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ __global__ void GraphFillFirstStepKernel(int *prefix_sum, int *sampleidx2row,
}

// Fill sample_res to the stepth column of walk
void GraphDataGenerator::FillOneStep(int64_t *d_start_ids, int64_t *walk,
void GraphDataGenerator::FillOneStep(uint64_t *d_start_ids, uint64_t *walk,
int len, NeighborSampleResult &sample_res,
int cur_degree, int step,
int *len_per_row) {
Expand Down Expand Up @@ -469,8 +469,8 @@ int GraphDataGenerator::FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk) {
size_t device_key_size = h_device_keys_[type_index]->size();
VLOG(2) << "type: " << node_type << " size: " << device_key_size
<< " start: " << start;
int64_t *d_type_keys =
reinterpret_cast<int64_t *>(d_device_keys_[type_index]->ptr());
uint64_t *d_type_keys =
reinterpret_cast<uint64_t *>(d_device_keys_[type_index]->ptr());
int tmp_len = start + once_sample_startid_len_ > device_key_size
? device_key_size - start
: once_sample_startid_len_;
Expand All @@ -492,7 +492,7 @@ int GraphDataGenerator::FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk) {
uint64_t *cur_walk = walk + i;

NeighborSampleQuery q;
q.initialize(gpuid_, path[0], (int64_t)(d_type_keys + start), walk_degree_,
q.initialize(gpuid_, path[0], (uint64_t)(d_type_keys + start), walk_degree_,
tmp_len);
auto sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false);

Expand All @@ -518,11 +518,11 @@ int GraphDataGenerator::FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk) {
break;
}
auto sample_key_mem = sample_res.actual_val_mem;
int64_t *sample_keys_ptr =
reinterpret_cast<int64_t *>(sample_key_mem->ptr());
uint64_t *sample_keys_ptr =
reinterpret_cast<uint64_t *>(sample_key_mem->ptr());
int edge_type_id = path[(step - 1) % path_len];
VLOG(2) << "sample edge type: " << edge_type_id << " step: " << step;
q.initialize(gpuid_, edge_type_id, (int64_t)sample_keys_ptr, 1,
q.initialize(gpuid_, edge_type_id, (uint64_t)sample_keys_ptr, 1,
sample_res.total_sample_size);
sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false);

Expand Down Expand Up @@ -588,11 +588,11 @@ void GraphDataGenerator::AllocResource(const paddle::platform::Place &place,
VLOG(3) << "h_device_keys_[" << i << "][" << j
<< "] = " << (*(h_device_keys_[i]))[j];
}
auto buf = memory::AllocShared(place_,
h_device_keys_[i]->size() * sizeof(int64_t));
auto buf = memory::AllocShared(
place_, h_device_keys_[i]->size() * sizeof(uint64_t));
d_device_keys_.push_back(buf);
CUDA_CHECK(cudaMemcpyAsync(buf->ptr(), h_device_keys_[i]->data(),
h_device_keys_[i]->size() * sizeof(int64_t),
h_device_keys_[i]->size() * sizeof(uint64_t),
cudaMemcpyHostToDevice, stream_));
}
// h_device_keys_ = h_device_keys;
Expand All @@ -610,10 +610,10 @@ void GraphDataGenerator::AllocResource(const paddle::platform::Place &place,
(once_max_sample_keynum + 1) * sizeof(int), stream_);
cursor_ = 0;
jump_rows_ = 0;
d_walk_ = memory::AllocShared(place_, buf_size_ * sizeof(int64_t));
cudaMemsetAsync(d_walk_->ptr(), 0, buf_size_ * sizeof(int64_t), stream_);
d_walk_ = memory::AllocShared(place_, buf_size_ * sizeof(uint64_t));
cudaMemsetAsync(d_walk_->ptr(), 0, buf_size_ * sizeof(uint64_t), stream_);
d_sample_keys_ =
memory::AllocShared(place_, once_max_sample_keynum * sizeof(int64_t));
memory::AllocShared(place_, once_max_sample_keynum * sizeof(uint64_t));

d_sampleidx2rows_.push_back(
memory::AllocShared(place_, once_max_sample_keynum * sizeof(int)));
Expand Down
8 changes: 4 additions & 4 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -895,11 +895,11 @@ class GraphDataGenerator {
int AcquireInstance(BufState* state);
int GenerateBatch();
int FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk);
void FillOneStep(int64_t* start_ids, int64_t* walk, int len,
void FillOneStep(uint64_t* start_ids, uint64_t* walk, int len,
NeighborSampleResult& sample_res, int cur_degree, int step,
int* len_per_row);
int FillInsBuf();
void SetDeviceKeys(std::vector<int64_t>* device_keys, int type) {
void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
type_to_index_[type] = h_device_keys_.size();
h_device_keys_.push_back(device_keys);
}
Expand All @@ -913,7 +913,7 @@ class GraphDataGenerator {
// start ids
// int64_t* device_keys_;
// size_t device_key_size_;
std::vector<std::vector<int64_t>*> h_device_keys_;
std::vector<std::vector<uint64_t>*> h_device_keys_;
std::unordered_map<int, int> type_to_index_;
// point to device_keys_
size_t cursor_;
Expand Down Expand Up @@ -1018,7 +1018,7 @@ class DataFeed {
virtual void SetParseLogKey(bool parse_logkey) {}
virtual void SetEnablePvMerge(bool enable_pv_merge) {}
virtual void SetCurrentPhase(int current_phase) {}
virtual void SetDeviceKeys(std::vector<int64_t>* device_keys, int type) {
virtual void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
gpu_graph_data_generator_.SetDeviceKeys(device_keys, type);
}
virtual void SetGpuGraphMode(int gpu_graph_mode) {
Expand Down
16 changes: 15 additions & 1 deletion paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ void MultiSlotDataset::PrepareTrain() {

template <typename T>
void DatasetImpl<T>::SetGraphDeviceKeys(
const std::vector<int64_t>& h_device_keys) {
const std::vector<uint64_t>& h_device_keys) {
// for (size_t i = 0; i < gpu_graph_device_keys_.size(); i++) {
// gpu_graph_device_keys_[i].clear();
// }
Expand All @@ -452,6 +452,7 @@ void DatasetImpl<T>::LoadIntoMemory() {
graph_all_type_total_keys_.clear();
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
auto node_to_id = gpu_graph_ptr->feature_to_id;
auto edge_to_id = gpu_graph_ptr->edge_to_id;
graph_all_type_total_keys_.resize(node_to_id.size());
int cnt = 0;
for (auto& iter : node_to_id) {
Expand All @@ -474,6 +475,19 @@ void DatasetImpl<T>::LoadIntoMemory() {
}
cnt++;
}
// FIX: trick for iterate edge table
for (auto& iter : edge_to_id) {
int edge_idx = iter.second;
auto gpu_graph_device_keys =
gpu_graph_ptr->get_all_id(0, edge_idx, thread_num_);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(1) << "edge type: " << edge_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
}
}

} else {
for (int64_t i = 0; i < thread_num_; ++i) {
Expand Down
8 changes: 5 additions & 3 deletions paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ class DatasetImpl : public Dataset {
return multi_consume_channel_;
}
}
std::vector<int64_t>& GetGpuGraphTotalKeys() { return gpu_graph_total_keys_; }
std::vector<uint64_t>& GetGpuGraphTotalKeys() {
return gpu_graph_total_keys_;
}
Channel<T>& GetInputChannelRef() { return input_channel_; }

protected:
Expand Down Expand Up @@ -331,8 +333,8 @@ class DatasetImpl : public Dataset {
bool enable_heterps_ = false;
int gpu_graph_mode_ = 1;
// std::vector<std::vector<int64_t>> gpu_graph_device_keys_;
std::vector<std::vector<std::vector<int64_t>>> graph_all_type_total_keys_;
std::vector<int64_t> gpu_graph_total_keys_;
std::vector<std::vector<std::vector<uint64_t>>> graph_all_type_total_keys_;
std::vector<uint64_t> gpu_graph_total_keys_;
};

// use std::vector<MultiSlotType> or Record as data type
Expand Down
43 changes: 25 additions & 18 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {

std::vector<std::thread> threads;
// data should be in input channel

thread_dim_keys_.resize(thread_keys_thread_num_);
for (int i = 0; i < thread_keys_thread_num_; i++) {
thread_dim_keys_[i].resize(thread_keys_shard_num_);
Expand All @@ -130,21 +130,23 @@ void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {
size_t begin = 0;

std::string data_set_name = std::string(typeid(*dataset_).name());
VLOG(0) <<"gpu_graph_mode_:" << gpu_graph_mode_;

VLOG(0) << "gpu_graph_mode_:" << gpu_graph_mode_;
if (!gpu_graph_mode_) {
if (data_set_name.find("SlotRecordDataset") != std::string::npos) {
VLOG(0) << "ps_gpu_wrapper use SlotRecordDataset";
SlotRecordDataset* dataset = dynamic_cast<SlotRecordDataset*>(dataset_);
auto input_channel = dataset->GetInputChannel();
VLOG(0) << "psgpu wrapperinputslotchannle size: " << input_channel->Size();
VLOG(0) << "psgpu wrapperinputslotchannle size: "
<< input_channel->Size();
const std::deque<SlotRecord>& vec_data = input_channel->GetData();
total_len = vec_data.size();
len_per_thread = total_len / thread_keys_thread_num_;
remain = total_len % thread_keys_thread_num_;
VLOG(0) << "total len: " << total_len;
auto gen_dynamic_mf_func = [this](const std::deque<SlotRecord>& total_data,
int begin_index, int end_index, int i) {
auto gen_dynamic_mf_func = [this](
const std::deque<SlotRecord>& total_data, int begin_index,
int end_index, int i) {
for (auto iter = total_data.begin() + begin_index;
iter != total_data.begin() + end_index; iter++) {
const auto& ins = *iter;
Expand All @@ -157,24 +159,26 @@ void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {
int shard_id = feasign_v[j] % thread_keys_shard_num_;
int dim_id = slot_index_vec_[slot_idx];
if (feasign_v[j] != 0) {
this->thread_dim_keys_[i][shard_id][dim_id].insert(feasign_v[j]);
this->thread_dim_keys_[i][shard_id][dim_id].insert(
feasign_v[j]);
}
}
}
}
};
for (int i = 0; i < thread_keys_thread_num_; i++) {
threads.push_back(
std::thread(gen_dynamic_mf_func, std::ref(vec_data), begin,
begin + len_per_thread + (i < remain ? 1 : 0), i));
std::thread(gen_dynamic_mf_func, std::ref(vec_data), begin,
begin + len_per_thread + (i < remain ? 1 : 0), i));

begin += len_per_thread + (i < remain ? 1 : 0);
}
for (std::thread& t : threads) {
t.join();
}
timeline.Pause();
VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec()
<< " seconds.";
} else {
CHECK(data_set_name.find("MultiSlotDataset") != std::string::npos);
VLOG(0) << "ps_gpu_wrapper use MultiSlotDataset";
Expand Down Expand Up @@ -208,29 +212,31 @@ void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {
t.join();
}
timeline.Pause();
VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec()
<< " seconds.";
}
} else {
VLOG(0) << "PreBuild in GpuGraph mode";
SlotRecordDataset* dataset = dynamic_cast<SlotRecordDataset*>(dataset_);
const std::vector<int64_t>& vec_data = dataset->GetGpuGraphTotalKeys();
const std::vector<uint64_t>& vec_data = dataset->GetGpuGraphTotalKeys();
total_len = vec_data.size();
len_per_thread = total_len / thread_keys_thread_num_;
VLOG(0) << "GpuGraphTotalKeys: " << total_len;
remain = total_len % thread_keys_thread_num_;
auto gen_graph_data_func = [this](const std::vector<int64_t>& total_data,
int begin_index, int end_index, int i) {
auto gen_graph_data_func = [this](const std::vector<uint64_t>& total_data,
int begin_index, int end_index, int i) {
for (auto iter = total_data.begin() + begin_index;
iter != total_data.begin() + end_index; iter++) {
uint64_t cur_key = *iter;
int shard_id = cur_key % thread_keys_shard_num_;
this->thread_keys_[i][shard_id].insert(cur_key);
}
};
auto gen_graph_dynamic_mf_func = [this](const std::vector<int64_t>& total_data,
int begin_index, int end_index, int i) {
auto gen_graph_dynamic_mf_func = [this](
const std::vector<uint64_t>& total_data, int begin_index, int end_index,
int i) {
for (auto iter = total_data.begin() + begin_index;
iter != total_data.begin() + end_index; iter++) {
iter != total_data.begin() + end_index; iter++) {
uint64_t cur_key = *iter;
int shard_id = cur_key % thread_keys_shard_num_;
// int dim_id = slot_index_vec_[slot_idx];
Expand Down Expand Up @@ -895,7 +901,8 @@ void PSGPUWrapper::EndPass() {
auto& device_keys = this->current_task_->device_dim_keys_[i][j];
size_t len = device_keys.size();
int mf_dim = this->index_dim_vec_[j];
VLOG(0) << "dump pool to cpu table: " << i << "with mf dim: " << mf_dim << " key_len :" << len;
VLOG(0) << "dump pool to cpu table: " << i << "with mf dim: " << mf_dim
<< " key_len :" << len;
size_t feature_value_size =
TYPEALIGN(8, sizeof(FeatureValue) + ((mf_dim + 1) * sizeof(float)));

Expand Down

0 comments on commit 20a1aa3

Please sign in to comment.