Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Browse files Browse the repository at this point in the history
… fluid_move_selected_rows_to_pten_1
  • Loading branch information
veyron95 committed Jan 21, 2022
2 parents a1e79b9 + ab1abd4 commit 109e83f
Show file tree
Hide file tree
Showing 288 changed files with 8,803 additions and 4,399 deletions.
8 changes: 1 addition & 7 deletions cmake/inference_lib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,7 @@ copy(inference_lib_dist
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/api/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/common/*.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/bfloat16.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/complex.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/float16.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/any.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
Expand Down
5 changes: 0 additions & 5 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
# Adapt to custom op mechanism: Include the header files related to the data type
# to avoid exposing the path of the underlying file, remove it after moving
# float16.h/complex.h/bfloat16.h into pten
include_directories(${PADDLE_SOURCE_DIR}/paddle/fluid/platform)

add_subdirectory(memory)
add_subdirectory(platform)
add_subdirectory(distributed)
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ endif()

cc_library(task_loop_thread_pool SRCS task_loop_thread_pool.cc task_loop_thread.cc task_loop.cc DEPS enforce glog)

cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc
interceptor.cc compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc
cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc interceptor.cc
compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc dist_model_tensor_wrapper.cc
DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto task_loop_thread_pool collective_helper
op_registry executor_gc_helper gflags glog ${BRPC_DEPS})

Expand Down
61 changes: 59 additions & 2 deletions paddle/fluid/distributed/fleet_executor/dist_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <glog/logging.h>

#include "paddle/fluid/distributed/fleet_executor/dist_model.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/naive_executor.h"
#include "paddle/fluid/framework/op_proto_maker.h"
Expand Down Expand Up @@ -68,9 +70,15 @@ bool DistModel::Init() {
program_.reset(config_.program_desc);
scope_.reset(config_.scope);
}
if (!PrepareFeedAndFetch()) {
return false;
}
if (!CommInit()) {
return false;
}
if (!PrepareFleetExe()) {
return false;
}
return true;
}

Expand Down Expand Up @@ -298,8 +306,57 @@ bool DistModel::LoadParameters() {
return true;
}

void DistModel::Run(const std::vector<paddle::framework::Tensor> &input_data,
std::vector<paddle::framework::Tensor> *output_data) {
bool DistModel::PrepareFleetExe() {
task_node_.reset(new TaskNode(program_.get(), config_.local_rank));
if (config_.local_rank - config_.mp_degree >= 0) {
task_node_->AddUpstreamTask(config_.local_rank - config_.mp_degree);
}
if (config_.local_rank + config_.mp_degree < config_.nranks) {
task_node_->AddDownstreamTask(config_.local_rank + config_.mp_degree);
}
task_node_->SetType("Compute");
task_node_->Init();
executor_desc_ = FleetExecutorDesc();
executor_desc_.set_cur_rank(config_.local_rank);
std::unordered_map<int64_t, int64_t> id_to_rank;
for (int i = 0; i < config_.nranks; ++i) {
RankInfo *rank_info = executor_desc_.add_cluster_info();
rank_info->set_rank(i);
rank_info->set_ip_port(config_.trainer_endpoints[i]);
id_to_rank.insert({i, i});
}
fleet_exe.reset(new FleetExecutor(executor_desc_));
fleet_exe->Init("inference", *(program_.get()), scope_.get(), place_, 1,
{task_node_.get()}, id_to_rank);
return true;
}

bool DistModel::PrepareFeedAndFetch() {
for (auto *op : program_->Block(0).AllOps()) {
if (op->Type() == "feed") {
VLOG(3) << "feed op with feed var: " << op->Output("Out")[0];
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
if (feeds_.size() <= static_cast<size_t>(idx)) {
feeds_.resize(idx + 1);
}
feeds_[idx] = op;
feed_names_[op->Output("Out")[0]] = idx;
idx_to_feeds_[idx] = op->Output("Out")[0];
} else if (op->Type() == "fetch") {
VLOG(3) << "fetch op with fetch var: " << op->Input("X")[0];
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
if (fetches_.size() <= static_cast<size_t>(idx)) {
fetches_.resize(idx + 1);
}
fetches_[idx] = op;
id_to_fetches_[idx] = op->Input("X")[0];
}
}
return true;
}

void DistModel::Run(const std::vector<DistModelTensor> &input_data,
std::vector<DistModelTensor> *output_data) {
/* TODO(fleet exe dev): implement this funct */
}

Expand Down
17 changes: 15 additions & 2 deletions paddle/fluid/distributed/fleet_executor/dist_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <vector>

#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/macros.h"
Expand All @@ -32,6 +33,9 @@ class BlockDesc;

namespace distributed {

class TaskNode;
class FleetExecutor;

struct DistModelConfig {
std::string model_dir{};
framework::ProgramDesc* program_desc{nullptr};
Expand All @@ -53,8 +57,8 @@ class DistModel {
public:
explicit DistModel(const DistModelConfig& config) : config_(config) {}
bool Init();
void Run(const std::vector<paddle::framework::Tensor>& input_data,
std::vector<paddle::framework::Tensor>* output_data);
void Run(const std::vector<DistModelTensor>& input_data,
std::vector<DistModelTensor>* output_data);
~DistModel() = default;

private:
Expand All @@ -66,12 +70,21 @@ class DistModel {
bool LoadParameters();
bool PreparePlace();
bool CommInit();
bool PrepareFeedAndFetch();
bool PrepareFleetExe();
void InsertCommOp(std::string tmp_var_name, int nranks, int rank,
const std::vector<std::string>& peer_endpoints,
framework::BlockDesc* block, int ring_id);

std::vector<framework::OpDesc*> feeds_;
std::map<std::string, int64_t> feed_names_;
std::map<int64_t, std::string> idx_to_feeds_;
std::vector<framework::OpDesc*> fetches_;
std::map<int64_t, std::string> id_to_fetches_;
DistModelConfig config_;
FleetExecutorDesc executor_desc_;
std::shared_ptr<FleetExecutor> fleet_exe;
std::shared_ptr<TaskNode> task_node_;
std::shared_ptr<framework::Scope> scope_;
paddle::platform::Place place_;
std::shared_ptr<framework::ProgramDesc> program_;
Expand Down
100 changes: 100 additions & 0 deletions paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace distributed {

void DistModelDataBuf::Reset(void* data, size_t length) {
Free();
memory_owned_ = false;
data_ = data;
length_ = length;
}

void DistModelDataBuf::Free() {
if (memory_owned_ && data_) {
PADDLE_ENFORCE_GT(length_, 0UL,
platform::errors::PreconditionNotMet(
"Error occurred when deconstruct DistModelDataBuf: "
"it contains no data!"));
// NOTE: if own the memory, it must be char* type
delete[] static_cast<char*>(data_);
data_ = nullptr;
length_ = 0;
}
}

void DistModelDataBuf::Resize(size_t length) {
if (length_ >= length) {
return;
}
if (memory_owned_) {
Free();
data_ = new char[length];
length_ = length;
memory_owned_ = true;
} else {
PADDLE_THROW(platform::errors::PreconditionNotMet(
"The memory is allocated externally, can not Resized"));
}
}

DistModelDataBuf& DistModelDataBuf::operator=(const DistModelDataBuf& other) {
if (!other.memory_owned_) {
data_ = other.data_;
length_ = other.length_;
memory_owned_ = other.memory_owned_;
} else {
Resize(other.length_);
if (other.length() && other.data()) {
std::memcpy(data_, other.data(), other.length());
} else if (other.length()) {
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid argument, null pointer data with length %u is passed",
other.length()));
}
length_ = other.length_;
memory_owned_ = true;
}
return *this;
}

DistModelDataBuf& DistModelDataBuf::operator=(DistModelDataBuf&& other) {
data_ = other.data_;
memory_owned_ = other.memory_owned_;
length_ = other.length_;
other.data_ = nullptr;
other.length_ = 0;
other.memory_owned_ = false;
return *this;
}

DistModelDataBuf::DistModelDataBuf(DistModelDataBuf&& other)
: data_(other.data_),
length_(other.length_),
memory_owned_(other.memory_owned_) {
other.memory_owned_ = false;
other.data_ = nullptr;
other.length_ = 0;
}

DistModelDataBuf::DistModelDataBuf(const DistModelDataBuf& other) {
*this = other;
}

} // namespace distributed
} // namespace paddle
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <string>
#include <vector>
#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {

enum DistModelDataType { FLOAT16, FLOAT32, INT64, INT32, INT8 };

template <typename T>
constexpr DistModelDataType DistModelGetDtype();

template <>
constexpr DistModelDataType DistModelGetDtype<int32_t>() {
return DistModelDataType::INT32;
}

template <>
constexpr DistModelDataType DistModelGetDtype<int64_t>() {
return DistModelDataType::INT64;
}

template <>
constexpr DistModelDataType DistModelGetDtype<float>() {
return DistModelDataType::FLOAT32;
}

class DistModelDataBuf {
public:
explicit DistModelDataBuf(size_t length)
: data_(new char[length]), length_(length), memory_owned_(true) {}
DistModelDataBuf(void* data, size_t length)
: data_(data), length_(length), memory_owned_(false) {}
void Reset(void* data, size_t length);
size_t length() const { return length_; }
void* data() const { return data_; }
~DistModelDataBuf() { Free(); }
DistModelDataBuf() = default;
void Resize(size_t length);

DistModelDataBuf& operator=(const DistModelDataBuf& other);
DistModelDataBuf& operator=(DistModelDataBuf&& other);
DistModelDataBuf(DistModelDataBuf&& other);
DistModelDataBuf(const DistModelDataBuf& other);

private:
void Free();
void* data_{nullptr};
size_t length_{0};
bool memory_owned_{false};
};

struct DistModelTensor {
std::string name;
std::vector<int> shape;
DistModelDataBuf data;
DistModelDataType dtype;
std::vector<std::vector<size_t>> lod;
};

} // namespace distributed
} // namespace paddle
7 changes: 7 additions & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
InitMessageBus();
}

FleetExecutor::FleetExecutor(const FleetExecutorDesc& exe_desc)
: exe_desc_(exe_desc) {
// Message bus will be created and inited only once
GlobalVal<MessageBus>::Create();
InitMessageBus();
}

FleetExecutor::~FleetExecutor() {
for (const auto& carrier_id : carrier_ids_) {
GlobalMap<std::string, Carrier>::Get(carrier_id)->Release();
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FleetExecutor final {
public:
FleetExecutor() = delete;
explicit FleetExecutor(const std::string& exe_desc_str);
explicit FleetExecutor(const FleetExecutorDesc& exe_desc);
~FleetExecutor();
void Init(const std::string& carrier_id,
const framework::ProgramDesc& program_desc, framework::Scope* scope,
Expand Down
10 changes: 10 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ TaskNode::TaskNode(paddle::framework::ProgramDesc* program, int64_t rank,
task_id_ = task_node_cnt++;
}

TaskNode::TaskNode(paddle::framework::ProgramDesc* program, int64_t rank)
: program_(program), rank_(rank), task_id_(rank) {
max_run_times_ = 1;
max_slot_nums_ = 1;
LOG(INFO)
<< "Constructing TaskNode for DistModelInf. The TaskNode's id is: "
<< rank
<< ". And the TaskNode's max_run_time and max_slot_num will be set to 1.";
}

void TaskNode::SetProgram(paddle::framework::ProgramDesc* program) {
program_ = program;
}
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TaskNode final {
int64_t max_slot_nums);
TaskNode(paddle::framework::ProgramDesc* program, int64_t rank,
int64_t max_run_times, int64_t max_slot_nums);
TaskNode(paddle::framework::ProgramDesc* program, int64_t rank);
~TaskNode() = default;

void SetProgram(paddle::framework::ProgramDesc* program);
Expand Down
Loading

0 comments on commit 109e83f

Please sign in to comment.