Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Browse files Browse the repository at this point in the history
… zyf_slice
  • Loading branch information
zyfncg committed Jul 29, 2021
2 parents deff69c + 9d985ca commit 1907345
Show file tree
Hide file tree
Showing 63 changed files with 2,144 additions and 554 deletions.
6 changes: 1 addition & 5 deletions cmake/generic.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -932,12 +932,8 @@ function(generate_dummy_static_lib)
if(NOT dummy_GENERATOR)
message(FATAL_ERROR "You must provide a generator file name.")
endif()
# if ${dummy_GENERATOR} contains "/", it may be a file path
if(NOT ${dummy_GENERATOR} MATCHES ".*/.*")
set(dummy_GENERATOR "${CMAKE_CURRENT_LIST_DIR}/${dummy_GENERATOR}")
endif()
if(NOT dummy_CONTENT)
set(dummy_CONTENT "${dummy_FILE_PATH} for lib ${dummy_LIB_NAME}")
set(dummy_CONTENT "${dummy_LIB_NAME}_dummy.c for lib ${dummy_LIB_NAME}")
endif()

configure_file(${PROJECT_SOURCE_DIR}/cmake/dummy.c.in ${dummy_FILE_PATH} @ONLY)
Expand Down
7 changes: 5 additions & 2 deletions cmake/unity_build.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ function(compose_unity_target_sources TARGET TYPE)
get_property(unity_group_index_max GLOBAL PROPERTY ${TARGET}_${TYPE}_group_index)
foreach(src ${ARGN})
set(unity_file "")
# UB use absolute path of source.
# Note(zhouwei25): UB use the path releative to CMAKE_SOURCE_DIR.
# If use absolute path, sccache/ccache hit rate will be reduced.
if(IS_ABSOLUTE ${src})
set(src_absolute_path ${src})
file(RELATIVE_PATH src_relative_path ${CMAKE_SOURCE_DIR} ${src})
else()
set(src_absolute_path ${CMAKE_CURRENT_SOURCE_DIR}/${src})
file(RELATIVE_PATH src_relative_path ${CMAKE_SOURCE_DIR} ${src_absolute_path})
endif()
# If `unity_group_index_max` is empty, there is no combination
# relationship.
Expand All @@ -106,7 +109,7 @@ function(compose_unity_target_sources TARGET TYPE)
set_property(GLOBAL APPEND PROPERTY ${unity_file_sources} ${UNITY_CU_BEFORE_CODE})
endif()
endif()
set_property(GLOBAL APPEND PROPERTY ${unity_file_sources} "#include \"${src_absolute_path}\"")
set_property(GLOBAL APPEND PROPERTY ${unity_file_sources} "#include \"${src_relative_path}\"")
set(unity_target_sources ${unity_target_sources} ${unity_file})
break()
endif()
Expand Down
64 changes: 49 additions & 15 deletions paddle/fluid/distributed/service/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,62 @@ class BlockingQueue {
}

bool Push(const T &elem) {
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return queue_.size() < capacity_; });
queue_.push_back(elem);
std::unique_lock<std::mutex> lock(mutex_);
WaitForWrite(lock);

queue_.push_back(elem);

Notify();
return true;
}
bool WaitForWrite(std::unique_lock<std::mutex> &lock) { // NOLINT
while (FullUnlocked()) {
if (empty_waiters_ != 0) {
empty_cond_.notify_one();
}
full_waiters_++;
full_cond_.wait(lock);
full_waiters_--;
}
cv_.notify_one();
return true;
}

bool Push(T &&elem) {
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return queue_.size() < capacity_; });
queue_.emplace_back(std::move(elem));
bool WaitForRead(std::unique_lock<std::mutex> &lock) { // NOLINT
while (EmptyUnlocked()) {
if (full_waiters_ != 0) {
full_cond_.notify_one();
}
empty_waiters_++;
empty_cond_.wait(lock);
empty_waiters_--;
}
cv_.notify_one();
return true;
}
bool EmptyUnlocked() { return queue_.empty(); }

bool FullUnlocked() { return queue_.size() >= capacity_; }
void Notify() {
if (empty_waiters_ != 0 && (!EmptyUnlocked())) {
empty_cond_.notify_one();
}
if (full_waiters_ != 0 && (!FullUnlocked())) {
full_cond_.notify_one();
}
}

bool Push(T &&elem) {
std::unique_lock<std::mutex> lock(mutex_);
WaitForWrite(lock);
queue_.emplace_back(std::move(elem));

Notify();
return true;
}
T Pop() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=] { return !queue_.empty(); });
WaitForRead(lock);
T rc(std::move(queue_.front()));
queue_.pop_front();
cv_.notify_one();
Notify();
return rc;
}

Expand All @@ -107,11 +138,14 @@ class BlockingQueue {
}

private:
int empty_waiters_ = 0;
int full_waiters_ = 0;
std::condition_variable empty_cond_;
std::condition_variable full_cond_;
const size_t capacity_;
std::deque<T> queue_;

mutable std::mutex mutex_;
std::condition_variable cv_;
};

template <typename T, int MajorType = Eigen::RowMajor,
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ message DistributedStrategy {
optional bool find_unused_parameters = 28 [ default = false ];
optional bool tensor_parallel = 29 [ default = false ];
optional bool without_graph_optimization = 30 [ default = false ];
optional int32 fuse_grad_size_in_num = 31 [ default = 1 ];
optional int32 fuse_grad_size_in_num = 31 [ default = 8 ];
optional bool calc_comm_same_stream = 32 [ default = false ];
optional bool asp = 33 [ default = false ];

Expand Down
42 changes: 34 additions & 8 deletions paddle/fluid/framework/ir/graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License. */
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/operator.h"

DEFINE_bool(convert_all_blocks, false,
DEFINE_bool(convert_all_blocks, true,
"Convert all blocks in program into SSAgraphs");

namespace paddle {
Expand Down Expand Up @@ -56,10 +56,12 @@ Graph::Graph(const ProgramDesc &program, const int64_t start_op_index,
// sub_graph.
std::unique_ptr<Graph> first_sub_graph = std::make_unique<Graph>(
program_.Block(0), this, start_op_index, end_op_index);
first_sub_graph->block_id_ = 0;
sub_graphs_.push_back(std::move(first_sub_graph));
for (size_t idx = 1; idx < program_.Size(); ++idx) {
std::unique_ptr<Graph> sub_graph =
std::make_unique<Graph>(program_.Block(idx), this);
sub_graph->block_id_ = idx;
sub_graphs_.push_back(std::move(sub_graph));
}
} else {
Expand Down Expand Up @@ -90,14 +92,32 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromProgram(
std::map<std::string, std::vector<ir::Node *>> Graph::InitFromBlock(
const BlockDesc &block, const int64_t start_op_index,
const int64_t end_op_index) {
std::unordered_map<std::string, VarDesc *> all_vars;
std::unordered_map<std::string, std::pair<VarDesc *, int>>
name_to_desc_block_id;

const BlockDesc *block_var_visible = &block;
while (block_var_visible != nullptr) {
for (auto *var : block_var_visible->AllVars()) {
name_to_desc_block_id.emplace(
var->Name(), std::make_pair(var, block_var_visible->ID()));
}
const BlockDesc *forward_block = block_var_visible->ForwardBlock();
if (forward_block != nullptr) {
for (auto *var : forward_block->AllVars()) {
name_to_desc_block_id.emplace(var->Name(),
std::make_pair(var, forward_block->ID()));
}
}
block_var_visible = block_var_visible->ParentBlock();
}
// var nodes for each var name, will have multiple versions in SSA
std::map<std::string, std::vector<ir::Node *>> var_nodes;
std::unordered_map<std::string, VarDesc *> not_visited_vars;
for (auto *var : block.AllVars()) {
all_vars.emplace(var->Name(), var);
not_visited_vars.emplace(var->Name(), var);
}

auto not_visited_vars = all_vars;
int desc_order = 0;
auto all_ops = block.AllOps();
PADDLE_ENFORCE_LE(
end_op_index, all_ops.size(),
Expand All @@ -109,15 +129,18 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromBlock(
auto *op = all_ops[i];
VLOG(3) << "create OpNode by " << op->Type();
ir::Node *node = CreateOpNode(op);
node->SetDescOrder(desc_order);
++desc_order;
// For input args, reuse the same var name if it was created before.
// Otherwise, create a new one.
for (auto &each_var_name : op->InputArgumentNames()) {
not_visited_vars.erase(each_var_name);
ir::Node *var = nullptr;
if (var_nodes.find(each_var_name) != var_nodes.end()) {
var = var_nodes.at(each_var_name).back();
} else if (all_vars.count(each_var_name) != 0) {
var = CreateVarNode(all_vars.at(each_var_name));
} else if (name_to_desc_block_id.count(each_var_name) != 0) {
auto desc_and_block_id = name_to_desc_block_id.at(each_var_name);
var = CreateVarNode(desc_and_block_id.first, desc_and_block_id.second);
var_nodes[each_var_name].push_back(var);
} else {
// Operation input var can be optional (dispensable). Which means
Expand All @@ -143,8 +166,9 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromBlock(
}

ir::Node *var = nullptr;
if (all_vars.count(each_var_name) != 0) {
var = CreateVarNode(all_vars.at(each_var_name));
if (name_to_desc_block_id.count(each_var_name) != 0) {
auto desc_and_block_id = name_to_desc_block_id.at(each_var_name);
var = CreateVarNode(desc_and_block_id.first, desc_and_block_id.second);
} else {
// Operation output vars can be @EMPTY@. For example, while_grad
// can have multi @EMPTY@ outputs with no VarDesc.
Expand Down Expand Up @@ -270,6 +294,7 @@ std::shared_ptr<Graph> Graph::Clone() {
auto cloned_graph = std::make_shared<Graph>(this->program_);
cloned_graph->ReleaseNodes();
cloned_graph->num_node_created_ = 0;
cloned_graph->block_id_ = this->block_id_;
std::unordered_map<ir::Node *, ir::Node *> origin_to_cloned;
for (auto *n : this->node_set_) {
PADDLE_ENFORCE_NOT_NULL(n, platform::errors::InvalidArgument(
Expand Down Expand Up @@ -313,6 +338,7 @@ std::unique_ptr<Graph> Graph::CloneSubGraph(const size_t idx) {
std::make_unique<Graph>(this->program_.Block(idx), this);
cloned_sub_graph->ReleaseNodes();
cloned_sub_graph->num_node_created_ = 0;
cloned_sub_graph->block_id_ = idx;
std::unordered_map<ir::Node *, ir::Node *> origin_to_cloned;
for (auto *n : this->sub_graphs_.at(idx)->Nodes()) {
PADDLE_ENFORCE_NOT_NULL(n, platform::errors::InvalidArgument(
Expand Down
32 changes: 27 additions & 5 deletions paddle/fluid/framework/ir/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ class Graph {
attr_dels_.clear();
}

bool IsConstructedByPartialProgram() const { return is_partial_; }
bool IsConstructedByPartialProgram() const {
if (FLAGS_convert_all_blocks) {
if (IsMainGraph()) {
return GetSubGraph(0)->IsConstructedByPartialProgram();
}
}
return is_partial_;
}

bool Has(const std::string &attr_name) const {
if (FLAGS_convert_all_blocks) {
Expand Down Expand Up @@ -210,7 +217,7 @@ class Graph {
}

// Create a normal variable with non-null VarDesc.
ir::Node *CreateVarNode(VarDesc *var_desc) {
ir::Node *CreateVarNode(VarDesc *var_desc, int block_id = -1) {
if (FLAGS_convert_all_blocks) {
if (IsMainGraph()) {
return GetSubGraph(0)->CreateVarNode(var_desc);
Expand All @@ -219,7 +226,8 @@ class Graph {
PADDLE_ENFORCE_NOT_NULL(
var_desc, platform::errors::InvalidArgument(
"The VarDesc used to create variable node is null."));
auto *x = AddNode(new ir::Node(var_desc));
auto *x =
AddNode(new ir::Node(var_desc, block_id == -1 ? block_id_ : block_id));
x->SetId(num_node_created_++);
return x;
}
Expand Down Expand Up @@ -252,7 +260,7 @@ class Graph {
const std::string name = string::Sprintf(
"%s@%llu", static_cast<const char *>(ir::Node::kControlDepVarName),
num_node_created_);
auto *x = AddNode(new ir::Node(name, ir::Node::Type::kVariable));
auto *x = AddNode(new ir::Node(name, ir::Node::Type::kVariable, block_id_));
x->SetId(num_node_created_++);
return x;
}
Expand All @@ -265,7 +273,7 @@ class Graph {
return GetSubGraph(0)->CreateEmptyNode(name, type);
}
}
auto *x = AddNode(new ir::Node(name, type));
auto *x = AddNode(new ir::Node(name, type, block_id_));
x->SetId(num_node_created_++);
return x;
}
Expand Down Expand Up @@ -365,6 +373,15 @@ class Graph {
return sub_graphs_.at(idx).get();
}

int GetBlockId() const {
if (FLAGS_convert_all_blocks) {
if (IsMainGraph()) {
return GetSubGraph(0)->block_id_;
}
}
return block_id_;
}

size_t SubGraphsSize() const {
PADDLE_ENFORCE_EQ(
this->IsMainGraph(), true,
Expand Down Expand Up @@ -394,6 +411,9 @@ class Graph {
PADDLE_ENFORCE_EQ(
this->IsMainGraph(), true,
platform::errors::InvalidArgument("This graph is not main_graph"));
PADDLE_ENFORCE_EQ(sub_graphs_.size(), sub_graph->block_id_,
platform::errors::InvalidArgument(
"sub_graph idx is not equal to block_id_"));
sub_graphs_.push_back(std::move(sub_graph));
}

Expand All @@ -416,6 +436,8 @@ class Graph {
// parts: forward graph and backward graph, which can be executed
// independently.
bool is_partial_{false};
// The block this SubGraph belongs to.
int block_id_{0};
};

bool IsControlDepVar(const ir::Node &var);
Expand Down
Loading

0 comments on commit 1907345

Please sign in to comment.