Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【HETERPS】pipeline adaptive for heterps #33159

Merged
merged 14 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 115 additions & 12 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ namespace framework {
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;

void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
platform::Timer timeline;
timeline.Start();
Expand Down Expand Up @@ -137,17 +136,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
local_ptr[i].resize(local_keys[i].size());
}
timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &table_id,
&fleet_ptr](int i) {
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
#ifdef PADDLE_WITH_PSLIB
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
#ifdef PADDLE_WITH_PSCORE
auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
tt.wait();
Expand Down Expand Up @@ -270,11 +268,8 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
<< " seconds.";
}

void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
int device_num = heter_devices_.size();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
BuildTask(gpu_task, table_id, feature_dim);
platform::Timer timeline;
timeline.Start();

Expand All @@ -289,6 +284,10 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
delete HeterPs_;
HeterPs_ = nullptr;
}
if (size_max <= 0) {
VLOG(1) << "Skip build gpu ps cause feasign nums = " << size_max;
return;
}
std::vector<std::thread> threads(device_num);
HeterPs_ = HeterPsBase::get_instance(size_max, resource_);
HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
Expand All @@ -297,7 +296,9 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
HeterPs_->show_one_table(i);
if (feature_keys_count[i] > 0) {
HeterPs_->show_one_table(i);
}
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(build_func, i);
Expand All @@ -308,7 +309,109 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
timeline.Pause();
VLOG(1) << "GpuPs build table total costs: " << timeline.ElapsedSec()
<< " s.";
gpu_task_pool_.Push(gpu_task);
}

void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) {
platform::Timer timer;
VLOG(3) << "Begin LoadIntoMemory(), dataset[" << dataset_ << "]";
timer.Start();
dataset_->LoadIntoMemory();
timer.Pause();
VLOG(0) << "LoadIntoMemory cost: " << timer.ElapsedSec() << "s";

// local shuffle
if (is_shuffle) {
dataset_->LocalShuffle();
}

std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
data_ready_channel_->Put(gpu_task);
VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]";
}

void PSGPUWrapper::start_build_thread() {
running_ = true;
VLOG(3) << "start build CPU&GPU ps thread.";
build_cpu_threads_ = std::thread([this] { build_cpu_thread(); });
build_gpu_threads_ = std::thread([this] { build_gpu_thread(); });
}

void PSGPUWrapper::build_cpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!data_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildTask start.";
platform::Timer timer;
timer.Start();
// build cpu ps data process
BuildTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildTask end, cost time: " << timer.ElapsedSec() << "s";
buildcpu_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build cpu thread end";
}

void PSGPUWrapper::build_gpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!gpu_free_channel_->Get(gpu_task)) {
continue;
}
if (!buildcpu_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildGPUTask start.";
platform::Timer timer;
timer.Start();
BuildGPUTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildGPUTask end, cost time: " << timer.ElapsedSec()
<< "s";

gpu_task_pool_.Push(gpu_task);
train_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build gpu thread end";
}

void PSGPUWrapper::BeginPass() {
platform::Timer timer;
timer.Start();
if (current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[BeginPass] current task is not ended."));
}
// load+build done
if (!train_ready_channel_->Get(current_task_)) {
PADDLE_THROW(platform::errors::Fatal("train_ready_channel_ failed."));
}
timer.Pause();
VLOG(1) << "BeginPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::EndPass() {
if (!current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[EndPass] current task has been ended."));
}
platform::Timer timer;
timer.Start();
size_t keysize_max = 0;
// in case of feasign_num = 0, skip dump_to_cpu
for (size_t i = 0; i < heter_devices_.size(); i++) {
keysize_max = std::max(keysize_max, current_task_->device_keys_[i].size());
}
if (keysize_max != 0) {
HeterPs_->end_pass();
}
current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);
timer.Pause();
VLOG(1) << "EndPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
Expand Down
80 changes: 67 additions & 13 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,33 @@ class PSGPUWrapper {
const int hidden_size, const int64_t total_length,
const int batch_size);

void BuildGPUPS(const uint64_t table_id, int feature_dim);
void BuildTask(std::shared_ptr<HeterContext> gpu_task, uint64_t table_id,
int feature_dim);
void BuildGPUTask(std::shared_ptr<HeterContext> gpu_task);
void BuildTask(std::shared_ptr<HeterContext> gpu_task);
void LoadIntoMemory(bool is_shuffle);
void BeginPass();
void EndPass();
void start_build_thread();
void build_cpu_thread();
void build_gpu_thread();

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
data_ready_channel_->Close();
buildcpu_ready_channel_->Close();
gpu_free_channel_->Close();
train_ready_channel_->Close();
running_ = false;
VLOG(3) << "begin stop build_cpu_threads_";
build_cpu_threads_.join();
VLOG(3) << "begin stop build_gpu_threads_";
build_gpu_threads_.join();
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

void InitializeGPU(const std::vector<int>& dev_ids) {
if (s_instance_ != NULL && is_initialized_ == false) {
VLOG(3) << "PSGPUWrapper Begin InitializeGPU";
Expand Down Expand Up @@ -129,6 +153,24 @@ class PSGPUWrapper {
#endif
}
heter_devices_ = dev_ids;
data_ready_channel_->Open();
data_ready_channel_->SetCapacity(3);
buildcpu_ready_channel_->Open();
buildcpu_ready_channel_->SetCapacity(3);
gpu_free_channel_->Open();
gpu_free_channel_->SetCapacity(1);
train_ready_channel_->Open();
train_ready_channel_->SetCapacity(1);

current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);

table_id_ = 1;
#ifdef PADDLE_WITH_PSLIB
table_id_ = 0;
#endif
// start build cpu&gpu ps thread
start_build_thread();
}
}

Expand Down Expand Up @@ -206,18 +248,8 @@ class PSGPUWrapper {
slot_vector_ = slot_vector;
}

void EndPass() { HeterPs_->end_pass(); }
void ShowOneTable(int index) { HeterPs_->show_one_table(index); }

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

private:
static std::shared_ptr<PSGPUWrapper> s_instance_;
Dataset* dataset_;
Expand All @@ -231,6 +263,7 @@ class PSGPUWrapper {
std::vector<int> slot_vector_;
int multi_node_{0};
int node_size_;
uint64_t table_id_;
std::vector<ncclComm_t> inner_comms_;
std::vector<ncclComm_t> inter_comms_;
std::vector<ncclUniqueId> inter_ncclids_;
Expand All @@ -242,6 +275,27 @@ class PSGPUWrapper {
int thread_keys_shard_num_ = 37;
uint64_t max_fea_num_per_pass_ = 5000000000;

std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
data_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
buildcpu_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
gpu_free_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
train_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<HeterContext> current_task_ = nullptr;
std::thread build_cpu_threads_;
std::thread build_gpu_threads_;
bool running_ = false;

protected:
static bool is_initialized_;
};
Expand Down
10 changes: 7 additions & 3 deletions paddle/fluid/framework/heter_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
HeterTask() {}
virtual ~HeterTask(){};
virtual ~HeterTask() {}

void Update() {
if (state_ == PULL_SPARSE) {
Expand Down Expand Up @@ -111,7 +111,7 @@ template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
virtual ~HeterObjectPool() {}
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
Expand All @@ -131,6 +131,10 @@ class HeterObjectPool {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.empty();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }

private:
Expand Down Expand Up @@ -160,7 +164,7 @@ class BtObjectPool {
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
}

std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/pybind/ps_gpu_wrapper_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("end_pass", &framework::PSGPUWrapper::EndPass,
py::call_guard<py::gil_scoped_release>())
.def("build_gpu_ps", &framework::PSGPUWrapper::BuildGPUPS,
.def("begin_pass", &framework::PSGPUWrapper::BeginPass,
py::call_guard<py::gil_scoped_release>())
.def("load_into_memory", &framework::PSGPUWrapper::LoadIntoMemory,
py::call_guard<py::gil_scoped_release>())
.def("finalize", &framework::PSGPUWrapper::Finalize,
py::call_guard<py::gil_scoped_release>());
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ bool IsCompiledWithMKLDNN() {
#endif
}

bool IsCompiledWithHETERPS() {
#ifndef PADDLE_WITH_HETERPS
return false;
#else
return true;
#endif
}

bool SupportsBfloat16() {
#ifndef PADDLE_WITH_MKLDNN
return false;
Expand Down Expand Up @@ -1910,6 +1918,7 @@ All parameter, weight, gradient are variables in Paddle.
m.def("is_compiled_with_npu", IsCompiledWithNPU);
m.def("is_compiled_with_xpu", IsCompiledWithXPU);
m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN);
m.def("_is_compiled_with_heterps", IsCompiledWithHETERPS);
m.def("supports_bfloat16", SupportsBfloat16);
m.def("supports_bfloat16_fast_performance", SupportsBfloat16FastPerformance);
m.def("op_supported_infos", OpSupportedInfos);
Expand Down
Loading