Skip to content

Commit

Permalink
add send/recv to/from switch module for PrcoessGroupHeter (PaddlePadd…
Browse files Browse the repository at this point in the history
  • Loading branch information
lilong12 authored and wu.zeng committed Apr 10, 2022
1 parent 05a4ce3 commit 24739f9
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 51 deletions.
4 changes: 4 additions & 0 deletions cmake/flags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,7 @@ if(WITH_ROCM)
string (REPLACE "-Werror" "-Wno-error" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
endif()

if(WITH_PSCORE OR WITH_PSLIB)
string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
endif()
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/collective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ endif()

if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()

if(WITH_ASCEND_CL)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {
void ProcessGroup::Task::Synchronize() {}

ProcessGroup::ProcessGroup(int rank, int size, int gid)
: rank_(rank), size_(size) {
: rank_(rank), size_(size), gid_(gid) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
map->insert(gid, this);
map->insert(gid_, this);
}
}

Expand Down
18 changes: 11 additions & 7 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class ProcessGroup {
}

virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast for static",
PADDLE_THROW(platform::errors::Fatal(
"ProcessGroup%s does not support broadcast for static mode runtime",
GetBackendName()));
}

Expand Down Expand Up @@ -148,6 +148,7 @@ class ProcessGroup {
protected:
const int rank_;
const int size_;
const int gid_;
};

class ProcessGroupMapFromGid {
Expand All @@ -158,17 +159,20 @@ class ProcessGroupMapFromGid {
}

void insert(int gid, ProcessGroup* pg) {
// TODO(sandyhouse): address ut and uncomment the following codes
// PADDLE_ENFORCE_EQ(has(gid), false,
// platform::errors::PreconditionNotMet(
// "The process group with id %d does exist.", gid));
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
map_[gid] = pg;
}

ProcessGroup* get(int gid) {
// TODO(sandyhouse): address ut and uncomment the following codes
// PADDLE_ENFORCE_EQ(has(gid), true,
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
return map_.find(gid)->second;
}

Expand Down
6 changes: 0 additions & 6 deletions paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {

// bool CheckTensorsInNPUPlace(const std::vector<Tensor>& tensors) {
// return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) {
// return t.place() == platform::DeviceType::NPU;
// });
// }

void SyncDefaultStream(
const std::vector<Place>& places,
std::vector<NPUEventManager>& hcclEvents, // NOLINT
Expand Down
188 changes: 156 additions & 32 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,35 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch) {
with_switch_(with_switch),
switch_endpoint_(switch_endpoint) {
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
IGNORE_ID);
#else
PADDLE_THROW(platform::errors::InvalidArgument(
PADDLE_THROW(platform::errors::Fatal(
"ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
if (with_switch_) {
// TODO(sandyhouse) starts a client to connect the cloud switch module
// std::shared_ptr<HeterClient> client_ =
// HeterClient::GetInstance({switch_endpoint}, {}, 0);
} else if (local_rank_ == 0) {
if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
gloo_size_, IGNORE_ID, opts);
}
}

template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
for (size_t i = 0; i < size; i++) {
*dst += *src;
dst++;
src++;
}
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
std::vector<Tensor>& tensors, const AllreduceOptions& opts) {
#if defined(PADDLE_WITH_NCCL)
Expand All @@ -93,33 +99,92 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(

// Step2: copy tensors to CPU
if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
std::vector<Tensor> cpu_tensors;
cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
// Step3: do inter cluster allreduce
if (with_switch_) {
// TODO(sandyhouse) send to and recv from switch, and do add
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0].impl());
std::vector<int> send_size;
send_size.push_back(dense_cpu_tensor->numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor->name()}, send_size,
dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_cpu_tensor->dtype(), dense_cpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor2 =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor2->ResizeAndAllocate(dense_cpu_tensor->dims());
Tensor cpu_tensor_temp =
paddle::experimental::Tensor(dense_cpu_tensor2);
ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor2->data(),
dense_cpu_tensor2->numel() *
framework::DataTypeSize(dense_cpu_tensor2->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Recv from the switch module error."));

switch (dense_cpu_tensor->dtype()) {
case DataType::FLOAT32:
_do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor->data()),
reinterpret_cast<float*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
case DataType::FLOAT64:
_do_add<double>(
reinterpret_cast<double*>(dense_cpu_tensor->data()),
reinterpret_cast<double*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
case DataType::INT32:
_do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor->data()),
reinterpret_cast<int*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
default:
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Unsupported data type (%s) to do add.",
framework::DataType2String(dense_cpu_tensor->dtype())));
}
}
} else {
auto gloo_task = inter_pg_->AllReduce(cpu_tensors, opts);
gloo_task->Wait();
}
// Step4: copy cpu tensors to gpu
// TODO(sandyhouse)
// copy cpu tensors to gpu
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
Expand Down Expand Up @@ -147,18 +212,57 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
inner_pg_->Broadcast(tensors, b_opts);

if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
std::vector<Tensor> cpu_tensors;
cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
if (with_switch_) {
// TODO(sandyhouse) send to and recv
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0].impl());
if (gloo_rank_ == 0) {
std::vector<int> send_size;
send_size.push_back(dense_cpu_tensor->numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor->name()}, send_size,
dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else {
int ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
}
}
} else {
auto gloo_task = inter_pg_->Broadcast(cpu_tensors, opts);
gloo_task->Wait();
Expand All @@ -168,8 +272,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
Expand All @@ -185,22 +287,44 @@ void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in,
inner_pg_->Broadcast(in, out);

if (local_rank_ == 0) {
Tensor cpu_tensor;
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensor.impl());
dense_cpu_tensor->Resize(in->dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(in->dtype(), in->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(in->dims());
Tensor cpu_tensor = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*in, platform::CPUPlace(),
dense_cpu_tensor.get());
if (with_switch_) {
// TODO(sandyhouse) send to and recv
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
if (gloo_rank_ == 0) {
std::vector<int> send_size;
send_size.push_back(in->numel());
int ret = client_->Send(
gid_, {in->name()}, send_size, dense_cpu_tensor->data(),
in->numel() * framework::DataTypeSize(in->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else {
int ret =
client_->Recv(gid_, {in->name()}, dense_cpu_tensor->data(),
in->numel() * framework::DataTypeSize(in->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
}
}
} else {
std::vector<Tensor> cpu_tensors = {cpu_tensor};
// auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
// gloo_task->Wait();
inter_pg_->Broadcast(cpu_tensors);
auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
gloo_task->Wait();
}
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
out);
framework::TensorCopySync(*dense_cpu_tensor, out->place(), out);
}
inner_pg_->Broadcast(out, out);
}
Expand Down
7 changes: 6 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroupHeter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
// #include "paddle/fluid/distributed/ps/service/heter_client.h"
#include "paddle/fluid/platform/device_context.h"

#ifdef PADDLE_WITH_GLOO
Expand All @@ -48,6 +47,11 @@
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif

#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
#include "paddle/fluid/distributed/ps/service/heter_client.h"
#endif

#include "paddle/fluid/distributed/collective/Common.h"

constexpr const char* HETER_BACKEND_NAME = "HETER_BACKEND";
Expand Down Expand Up @@ -108,6 +112,7 @@ class ProcessGroupHeter : public ProcessGroup {
int gloo_rank_;
int gloo_size_;
bool with_switch_;
std::string switch_endpoint_;
};

} // namespace distributed
Expand Down
Loading

0 comments on commit 24739f9

Please sign in to comment.