diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 0766a3151c8e5..f8dfccf58ff96 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -40,8 +40,7 @@ namespace framework { std::shared_ptr PSGPUWrapper::s_instance_ = NULL; bool PSGPUWrapper::is_initialized_ = false; -void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, - uint64_t table_id, int feature_dim) { +void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin"; platform::Timer timeline; timeline.Start(); @@ -137,17 +136,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, local_ptr[i].resize(local_keys[i].size()); } timeline.Start(); - auto ptl_func = [this, &local_keys, &local_ptr, &table_id, - &fleet_ptr](int i) { + auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { size_t key_size = local_keys[i].size(); #ifdef PADDLE_WITH_PSLIB auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( - reinterpret_cast(local_ptr[i].data()), table_id, + reinterpret_cast(local_ptr[i].data()), this->table_id_, local_keys[i].data(), key_size); #endif #ifdef PADDLE_WITH_PSCORE auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr( - reinterpret_cast(local_ptr[i].data()), table_id, + reinterpret_cast(local_ptr[i].data()), this->table_id_, local_keys[i].data(), key_size); #endif tt.wait(); @@ -270,11 +268,8 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, << " seconds."; } -void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) { +void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { int device_num = heter_devices_.size(); - std::shared_ptr gpu_task = gpu_task_pool_.Get(); - gpu_task->Reset(); - BuildTask(gpu_task, table_id, feature_dim); platform::Timer timeline; timeline.Start(); @@ -289,6 +284,10 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) { delete HeterPs_; HeterPs_ = nullptr; } + if (size_max <= 0) { + VLOG(1) << "Skip build gpu ps cause feasign nums = " << size_max; + return; + } std::vector threads(device_num); HeterPs_ = HeterPsBase::get_instance(size_max, resource_); HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_); @@ -297,7 +296,9 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) { this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(), gpu_task->device_values_[i].data(), feature_keys_count[i], 500000, 2); - HeterPs_->show_one_table(i); + if (feature_keys_count[i] > 0) { + HeterPs_->show_one_table(i); + } }; for (size_t i = 0; i < threads.size(); i++) { threads[i] = std::thread(build_func, i); @@ -308,7 +309,109 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) { timeline.Pause(); VLOG(1) << "GpuPs build table total costs: " << timeline.ElapsedSec() << " s."; - gpu_task_pool_.Push(gpu_task); +} + +void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) { + platform::Timer timer; + VLOG(3) << "Begin LoadIntoMemory(), dataset[" << dataset_ << "]"; + timer.Start(); + dataset_->LoadIntoMemory(); + timer.Pause(); + VLOG(0) << "LoadIntoMemory cost: " << timer.ElapsedSec() << "s"; + + // local shuffle + if (is_shuffle) { + dataset_->LocalShuffle(); + } + + std::shared_ptr gpu_task = gpu_task_pool_.Get(); + gpu_task->Reset(); + data_ready_channel_->Put(gpu_task); + VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]"; +} + +void PSGPUWrapper::start_build_thread() { + running_ = true; + VLOG(3) << "start build CPU&GPU ps thread."; + build_cpu_threads_ = std::thread([this] { build_cpu_thread(); }); + build_gpu_threads_ = std::thread([this] { build_gpu_thread(); }); +} + +void PSGPUWrapper::build_cpu_thread() { + while (running_) { + std::shared_ptr gpu_task = nullptr; + if (!data_ready_channel_->Get(gpu_task)) { + continue; + } + VLOG(3) << "thread BuildTask start."; + platform::Timer timer; + timer.Start(); + // build cpu ps data process + BuildTask(gpu_task); + timer.Pause(); + VLOG(1) << "thread BuildTask end, cost time: " << timer.ElapsedSec() << "s"; + buildcpu_ready_channel_->Put(gpu_task); + } + VLOG(3) << "build cpu thread end"; +} + +void PSGPUWrapper::build_gpu_thread() { + while (running_) { + std::shared_ptr gpu_task = nullptr; + if (!gpu_free_channel_->Get(gpu_task)) { + continue; + } + if (!buildcpu_ready_channel_->Get(gpu_task)) { + continue; + } + VLOG(3) << "thread BuildGPUTask start."; + platform::Timer timer; + timer.Start(); + BuildGPUTask(gpu_task); + timer.Pause(); + VLOG(1) << "thread BuildGPUTask end, cost time: " << timer.ElapsedSec() + << "s"; + + gpu_task_pool_.Push(gpu_task); + train_ready_channel_->Put(gpu_task); + } + VLOG(3) << "build gpu thread end"; +} + +void PSGPUWrapper::BeginPass() { + platform::Timer timer; + timer.Start(); + if (current_task_) { + PADDLE_THROW( + platform::errors::Fatal("[BeginPass] current task is not ended.")); + } + // load+build done + if (!train_ready_channel_->Get(current_task_)) { + PADDLE_THROW(platform::errors::Fatal("train_ready_channel_ failed.")); + } + timer.Pause(); + VLOG(1) << "BeginPass end, cost time: " << timer.ElapsedSec() << "s"; +} + +void PSGPUWrapper::EndPass() { + if (!current_task_) { + PADDLE_THROW( + platform::errors::Fatal("[EndPass] current task has been ended.")); + } + platform::Timer timer; + timer.Start(); + size_t keysize_max = 0; + // in case of feasign_num = 0, skip dump_to_cpu + for (size_t i = 0; i < heter_devices_.size(); i++) { + keysize_max = std::max(keysize_max, current_task_->device_keys_[i].size()); + } + if (keysize_max != 0) { + HeterPs_->end_pass(); + } + current_task_ = nullptr; + gpu_free_channel_->Put(current_task_); + timer.Pause(); + VLOG(1) << "EndPass end, cost time: " << timer.ElapsedSec() << "s"; } void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index 81b2b0a12b2c3..2bbe595419094 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -82,9 +82,33 @@ class PSGPUWrapper { const int hidden_size, const int64_t total_length, const int batch_size); - void BuildGPUPS(const uint64_t table_id, int feature_dim); - void BuildTask(std::shared_ptr gpu_task, uint64_t table_id, - int feature_dim); + void BuildGPUTask(std::shared_ptr gpu_task); + void BuildTask(std::shared_ptr gpu_task); + void LoadIntoMemory(bool is_shuffle); + void BeginPass(); + void EndPass(); + void start_build_thread(); + void build_cpu_thread(); + void build_gpu_thread(); + + void Finalize() { + VLOG(3) << "PSGPUWrapper Begin Finalize."; + if (s_instance_ == nullptr) { + return; + } + data_ready_channel_->Close(); + buildcpu_ready_channel_->Close(); + gpu_free_channel_->Close(); + train_ready_channel_->Close(); + running_ = false; + VLOG(3) << "begin stop build_cpu_threads_"; + build_cpu_threads_.join(); + VLOG(3) << "begin stop build_gpu_threads_"; + build_gpu_threads_.join(); + s_instance_ = nullptr; + VLOG(3) << "PSGPUWrapper Finalize Finished."; + } + void InitializeGPU(const std::vector& dev_ids) { if (s_instance_ != NULL && is_initialized_ == false) { VLOG(3) << "PSGPUWrapper Begin InitializeGPU"; @@ -129,6 +153,24 @@ class PSGPUWrapper { #endif } heter_devices_ = dev_ids; + data_ready_channel_->Open(); + data_ready_channel_->SetCapacity(3); + buildcpu_ready_channel_->Open(); + buildcpu_ready_channel_->SetCapacity(3); + gpu_free_channel_->Open(); + gpu_free_channel_->SetCapacity(1); + train_ready_channel_->Open(); + train_ready_channel_->SetCapacity(1); + + current_task_ = nullptr; + gpu_free_channel_->Put(current_task_); + + table_id_ = 1; +#ifdef PADDLE_WITH_PSLIB + table_id_ = 0; +#endif + // start build cpu&gpu ps thread + start_build_thread(); } } @@ -206,18 +248,8 @@ class PSGPUWrapper { slot_vector_ = slot_vector; } - void EndPass() { HeterPs_->end_pass(); } void ShowOneTable(int index) { HeterPs_->show_one_table(index); } - void Finalize() { - VLOG(3) << "PSGPUWrapper Begin Finalize."; - if (s_instance_ == nullptr) { - return; - } - s_instance_ = nullptr; - VLOG(3) << "PSGPUWrapper Finalize Finished."; - } - private: static std::shared_ptr s_instance_; Dataset* dataset_; @@ -231,6 +263,7 @@ class PSGPUWrapper { std::vector slot_vector_; int multi_node_{0}; int node_size_; + uint64_t table_id_; std::vector inner_comms_; std::vector inter_comms_; std::vector inter_ncclids_; @@ -242,6 +275,27 @@ class PSGPUWrapper { int thread_keys_shard_num_ = 37; uint64_t max_fea_num_per_pass_ = 5000000000; + std::shared_ptr< + paddle::framework::ChannelObject>> + data_ready_channel_ = + paddle::framework::MakeChannel>(); + std::shared_ptr< + paddle::framework::ChannelObject>> + buildcpu_ready_channel_ = + paddle::framework::MakeChannel>(); + std::shared_ptr< + paddle::framework::ChannelObject>> + gpu_free_channel_ = + paddle::framework::MakeChannel>(); + std::shared_ptr< + paddle::framework::ChannelObject>> + train_ready_channel_ = + paddle::framework::MakeChannel>(); + std::shared_ptr current_task_ = nullptr; + std::thread build_cpu_threads_; + std::thread build_gpu_threads_; + bool running_ = false; + protected: static bool is_initialized_; }; diff --git a/paddle/fluid/framework/heter_util.h b/paddle/fluid/framework/heter_util.h index a08f08428da34..eb9f3040afe25 100644 --- a/paddle/fluid/framework/heter_util.h +++ b/paddle/fluid/framework/heter_util.h @@ -36,7 +36,7 @@ enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE }; class HeterTask { public: HeterTask() {} - virtual ~HeterTask(){}; + virtual ~HeterTask() {} void Update() { if (state_ == PULL_SPARSE) { @@ -111,7 +111,7 @@ template class HeterObjectPool { public: HeterObjectPool() {} - virtual ~HeterObjectPool(){}; + virtual ~HeterObjectPool() {} std::shared_ptr Get() { std::lock_guard lock(mutex_); if (pool_.empty()) { @@ -131,6 +131,10 @@ class HeterObjectPool { std::lock_guard lock(mutex_); return pool_.size(); } + bool Empty() { + std::lock_guard lock(mutex_); + return pool_.empty(); + } std::shared_ptr& GetElement(int i) { return pool_[i]; } private: @@ -160,7 +164,7 @@ class BtObjectPool { virtual ~BtObjectPool() { bthread_cond_destroy(&cond_); bthread_mutex_destroy(&mutex_); - }; + } std::shared_ptr Get() { BthreadMutextGuard guard(&mutex_); diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index bdd7abe1d8332..48365f42b11ba 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -47,7 +47,9 @@ void BindPSGPUWrapper(py::module* m) { py::call_guard()) .def("end_pass", &framework::PSGPUWrapper::EndPass, py::call_guard()) - .def("build_gpu_ps", &framework::PSGPUWrapper::BuildGPUPS, + .def("begin_pass", &framework::PSGPUWrapper::BeginPass, + py::call_guard()) + .def("load_into_memory", &framework::PSGPUWrapper::LoadIntoMemory, py::call_guard()) .def("finalize", &framework::PSGPUWrapper::Finalize, py::call_guard()); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a93ce4ecd4826..5508c516fbbbc 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -185,6 +185,14 @@ bool IsCompiledWithMKLDNN() { #endif } +bool IsCompiledWithHETERPS() { +#ifndef PADDLE_WITH_HETERPS + return false; +#else + return true; +#endif +} + bool SupportsBfloat16() { #ifndef PADDLE_WITH_MKLDNN return false; @@ -1910,6 +1918,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("is_compiled_with_npu", IsCompiledWithNPU); m.def("is_compiled_with_xpu", IsCompiledWithXPU); m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN); + m.def("_is_compiled_with_heterps", IsCompiledWithHETERPS); m.def("supports_bfloat16", SupportsBfloat16); m.def("supports_bfloat16_fast_performance", SupportsBfloat16FastPerformance); m.def("op_supported_infos", OpSupportedInfos); diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index dc41e3589812f..8bc16dfbbae30 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -34,6 +34,7 @@ def __init__(self): self.thread_num = 1 self.filelist = [] self.use_ps_gpu = False + self.psgpu = None def init(self, batch_size=1, @@ -223,6 +224,11 @@ def _set_use_ps_gpu(self, use_ps_gpu): use_ps_gpu: bool """ self.use_ps_gpu = use_ps_gpu + # if not defined heterps with paddle, users will not use psgpu + if not core._is_compiled_with_heterps(): + self.use_ps_gpu = 0 + elif self.use_ps_gpu: + self.psgpu = core.PSGPU() def _finish_to_run(self): self.dataset.destroy_readers() @@ -677,12 +683,15 @@ def _generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) - def load_into_memory(self): + def load_into_memory(self, is_shuffle=False): """ :api_attr: Static Graph Load data into memory + Args: + is_shuffle(bool): whether to use local shuffle, default is False + Examples: .. code-block:: python @@ -707,7 +716,11 @@ def load_into_memory(self): dataset.load_into_memory() """ self._prepare_to_run() - self.dataset.load_into_memory() + if not self.use_ps_gpu: + self.dataset.load_into_memory() + elif core._is_compiled_with_heterps(): + self.psgpu.set_dataset(self.dataset) + self.psgpu.load_into_memory(is_shuffle) def preload_into_memory(self, thread_num=None): """ diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index ce9511a376672..c42580676af59 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -271,6 +271,7 @@ def to_list(s): from .core_avx import _set_paddle_lib_path from .core_avx import _create_loaded_parameter from .core_avx import _cuda_synchronize + from .core_avx import _is_compiled_with_heterps from .core_avx import _promote_types_if_complex_exists if sys.platform != 'win32': from .core_avx import _set_process_pids @@ -318,6 +319,7 @@ def to_list(s): from .core_noavx import _set_paddle_lib_path from .core_noavx import _create_loaded_parameter from .core_noavx import _cuda_synchronize + from .core_noavx import _is_compiled_with_heterps from .core_noavx import _promote_types_if_complex_exists if sys.platform != 'win32': from .core_noavx import _set_process_pids diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 2b9d512856005..ea9c2ea7550c9 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -75,6 +75,7 @@ def __init__(self): self.thread_num = 1 self.filelist = [] self.use_ps_gpu = False + self.psgpu = None def set_pipe_command(self, pipe_command): """ @@ -311,6 +312,11 @@ def _set_use_ps_gpu(self, use_ps_gpu): use_ps_gpu: bool """ self.use_ps_gpu = use_ps_gpu + # if not defined heterps with paddle, users will not use psgpu + if not core._is_compiled_with_heterps(): + self.use_ps_gpu = 0 + elif self.use_ps_gpu: + self.psgpu = core.PSGPU() def _finish_to_run(self): self.dataset.destroy_readers() @@ -694,10 +700,13 @@ def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, @deprecated( since="2.0.0", update_to="paddle.distributed.InMemoryDataset.load_into_memory") - def load_into_memory(self): + def load_into_memory(self, is_shuffle=False): """ Load data into memory + Args: + is_shuffle(bool): whether to use local shuffle, default is False + Examples: .. code-block:: python @@ -708,7 +717,11 @@ def load_into_memory(self): dataset.load_into_memory() """ self._prepare_to_run() - self.dataset.load_into_memory() + if not self.use_ps_gpu: + self.dataset.load_into_memory() + elif core._is_compiled_with_heterps(): + self.psgpu.set_dataset(self.dataset) + self.psgpu.load_into_memory(is_shuffle) @deprecated( since="2.0.0", diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 0b956d5031fec..6ab8a2c3a4b22 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -74,7 +74,7 @@ def test_communicator_ps_gpu(self): batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) dataset._set_use_ps_gpu(1) - dataset.load_into_memory() + dataset.load_into_memory(is_shuffle=True) os.environ["TEST_MODE"] = "1" exe = fluid.Executor(fluid.CPUPlace())