Skip to content

Commit

Permalink
Intergrate GLOOParallelContext to support Multi-CPU Core for Dygraph …
Browse files Browse the repository at this point in the history
…DataParallel (PaddlePaddle#35154)

* can pass the fake test

* add files

* modify cmake to pass windows-ci

* for ci pass

* WITH_GLOO=ON

* for pass coverage test

* add cpuonly testcase

* add

* disable nccl when compile with cuda

* change python version in cpuonly

* add backend argument

* add required gpu

* add required:gpu
  • Loading branch information
2742195759 authored and AnnaTrainingG committed Sep 29, 2021
1 parent 02c896a commit bcbc418
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 27 deletions.
6 changes: 6 additions & 0 deletions paddle/fluid/imperative/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ if(NOT WIN32)
endif()
cc_library(data_loader SRCS data_loader.cc DEPS enforce)
endif(NOT WIN32)
if(WITH_GLOO)
cc_library(imperative_gloo_context SRCS gloo_context.cc DEPS collective_helper device_context tensor var_type_traits)
if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL) ))
cc_library(reducer SRCS reducer.cc DEPS layer)
endif()
endif()

cc_library(gradient_accumulator SRCS gradient_accumulator.cc DEPS blas operator lod_tensor selected_rows selected_rows_functor var_type_traits layer math_function)

Expand Down
109 changes: 109 additions & 0 deletions paddle/fluid/imperative/gloo_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/split.h"

namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle

namespace paddle {
namespace imperative {

void GLOOParallelContext::Init() {
// PADDLE_THROW(platform::errors::OutOfRange(
// "Still not implement Init"));
VLOG(4) << "Start GLOOParallelContext initialization";
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
gloo_wrapper->SetSize(strategy_.nranks_);
gloo_wrapper->SetRank(strategy_.local_rank_);
gloo_wrapper->SetPrefix("");
gloo_wrapper->SetIface("lo");
auto addr = paddle::string::Split(strategy_.trainer_endpoints_[0], ':');
VLOG(4) << "Server is" << strategy_.trainer_endpoints_[0];
std::string host = addr[0];
int port = std::stoi(addr[1]);
gloo_wrapper->SetHttpStore(host, port, "worker");
gloo_wrapper->Init();
device_ = std::unique_ptr<platform::CPUDeviceContext>(
new platform::CPUDeviceContext(platform::CPUPlace()));
}

void GLOOParallelContext::InitWithRingID(int ring_id) {
PADDLE_THROW(
platform::errors::OutOfRange("Still not implement InitWithRingID"));
}

#define GLOO_CASE(type, T, gw) \
case type: { \
VLOG(4) << "Use the gloo all reduce to sync. SRC:" << src_tensor; \
std::vector<T> send_vector##T; \
framework::TensorToVector<T>(src_tensor, &send_vector##T); \
auto recv_vector##T = gw->AllReduce<T>(send_vector##T); \
framework::TensorFromVector<T>(recv_vector##T, dst_tensor); \
VLOG(4) << "DST:" << *dst_tensor; \
break; \
}

void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id, bool use_calc_stream) {
// AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
auto src_tensor = src.Get<framework::LoDTensor>();
auto *dst_tensor = dst->GetMutable<framework::LoDTensor>();
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
dst_tensor->Resize(src_tensor.dims());
switch (src_tensor.type()) {
GLOO_CASE(framework::proto::VarType::FP32, float, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::FP64, double, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::INT32, int, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::INT64, int64_t, gloo_wrapper);
default: {
PADDLE_THROW(
platform::errors::InvalidArgument("Invalid datatype for allreduce"));
}
}
gloo_wrapper->Barrier();
}

paddle::platform::DeviceContext *GLOOParallelContext::GetDeviceContext(
int ring_id) {
// return the CPUDeviceContext
return device_.get();
}

void GLOOParallelContext::WaitCompute(int ring_id) {
// do nothing because cpu don't need sync
return;
}

void GLOOParallelContext::WaitComm(int ring_id) {
// do nothing because cpu don't need sync
return;
}

void GLOOParallelContext::SynchronizeCompute() {
// do nothing because cpu don't need sync
return;
}

} // namespace imperative
} // namespace paddle
60 changes: 60 additions & 0 deletions paddle/fluid/imperative/gloo_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/device_context.h"

namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle

namespace paddle {
namespace imperative {

class GLOOParallelContext : public ParallelContext {
public:
explicit GLOOParallelContext(const ParallelStrategy& strategy,
const platform::Place& place)
: ParallelContext(strategy, place) {}

~GLOOParallelContext() override = default;

void Init() override;

void InitWithRingID(int ring_id) override;

void AllReduceByStream(const framework::Variable& src,
framework::Variable* dst, int ring_id,
bool use_calc_stream) override;

paddle::platform::DeviceContext* GetDeviceContext(int ring_id) override;

void WaitCompute(int ring_id) override;

void WaitComm(int ring_id) override;

void SynchronizeCompute() override;

private:
std::unique_ptr<platform::CPUDeviceContext> device_;
};

} // namespace imperative
} // namespace paddle
17 changes: 11 additions & 6 deletions paddle/fluid/imperative/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace paddle {
namespace imperative {

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
// div the nranks
void Group::DivNRanks(const platform::DeviceContext &context, int64_t nranks) {
framework::Tensor *tensor =
Expand All @@ -42,9 +42,12 @@ void Group::DivNRanks(const platform::DeviceContext &context, int64_t nranks) {
DivNRanks(tensor, nranks, context);
#endif
} else if (platform::is_cpu_place(tensor->place())) {
VLOG(4) << "before div 2" << *tensor;
VLOG(4) << "NDiv for cpu devices : rank = " << nranks;
framework::VisitDataTypeSmall(
dtype_, DivNRanksForAllReduce<platform::CPUDeviceContext>(
tensor, nranks, context));
VLOG(4) << "after div 2" << *tensor;
} else if (platform::is_xpu_place(tensor->place())) {
#ifdef PADDLE_WITH_XPU_BKCL
// TODO(liuyuhui) support xpu about div nranks in the future
Expand Down Expand Up @@ -764,8 +767,8 @@ void Reducer::MarkGroupReady(size_t group_index) {

for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
auto &group = groups_[next_group_];
const int run_order = next_group_ % nrings_;
UNUSED auto &group = groups_[next_group_];
UNUSED const int run_order = next_group_ % nrings_;

// For CUDA or XPU, compute_stream --> comm_stream.
// For CPU, do nothing.
Expand All @@ -792,11 +795,12 @@ void Reducer::MarkGroupReady(size_t group_index) {
cv_.notify_all();
}
});
#elif defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)
#elif defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) || \
defined(PADDLE_WITH_GLOO)
FusedAllReduceSchedule(run_order, group, next_group_);
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Not compiled with BKCL or NCCL."));
"Not compiled with BKCL or NCCL or GLOO."));
#endif
}
}
Expand Down Expand Up @@ -974,7 +978,8 @@ void Reducer::FinalizeBackward() {

if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_GLOO)
ProcessUnusedDenseVars();
#endif
// Initialize local used vars
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/imperative/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace paddle {
namespace imperative {

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)

template <typename T>
struct DivNRanksFunctor {
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ endif()
if(WITH_GLOO)
set(PYBIND_DEPS ${PYBIND_DEPS} gloo_context)
set(PYBIND_SRCS ${PYBIND_SRCS} gloo_context_py.cc)
set(PYBIND_DEPS ${PYBIND_DEPS} imperative_gloo_context)
set(PYBIND_DEPS ${PYBIND_DEPS} reducer)
endif(WITH_GLOO)

if (WITH_CRYPTO)
Expand Down
17 changes: 16 additions & 1 deletion paddle/fluid/pybind/imperative.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ limitations under the License. */
#include "paddle/fluid/imperative/basic_engine.h"
#include "paddle/fluid/imperative/bkcl_context.h"
#include "paddle/fluid/imperative/data_loader.h"
#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/imperative/hooks.h"
#include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/imperative/nccl_context.h"
Expand Down Expand Up @@ -2017,7 +2018,7 @@ void BindImperative(py::module *m_ptr) {
py::call_guard<py::gil_scoped_release>());

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
py::class_<imperative::ParallelContext,
std::shared_ptr<imperative::ParallelContext>>(m,
"ParallelContext");
Expand Down Expand Up @@ -2062,6 +2063,20 @@ void BindImperative(py::module *m_ptr) {
&imperative::BKCLParallelContext::InitWithRingID,
py::arg("ring_id"));
#endif

#if defined(PADDLE_WITH_GLOO)
// xiongkun
py::class_<imperative::GLOOParallelContext, imperative::ParallelContext,
std::shared_ptr<imperative::GLOOParallelContext>>(
m, "GLOOParallelContext")
.def(py::init<const imperative::ParallelStrategy &,
const platform::CPUPlace &>())
.def("init", [](imperative::GLOOParallelContext &self) { self.Init(); })
.def("init_with_ring_id",
&imperative::GLOOParallelContext::InitWithRingID,
py::arg("ring_id"));
#endif

m.def("pylayer_apply",
[](const platform::CPUPlace &place, const py::object &cls,
const py::args args, const py::kwargs kwargs) {
Expand Down
Loading

0 comments on commit bcbc418

Please sign in to comment.