diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index a28d4c845d796..883fc5dcb0ca7 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -77,6 +77,23 @@ paddle::framework::FetchList InterpreterCore::Run( return *(fetch_var->GetMutable()); } +void InterpreterCore::BuildOperatorDependences() { + // analysis the dependences between ops, set the dependecy_count_ and Call + // Schedule + auto op_nums = vec_instruction_.size(); + dependecy_count_.resize(op_nums); + auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_); + for (size_t op = 0; op < vec_instruction_.size(); ++op) { + auto op_list = op2downstream[op]; + std::vector downsteam_vector(op_list.begin(), op_list.end()); + stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op); + + for (auto inst_id : op_list) { + dependecy_count_[inst_id]++; + } + } +} + void InterpreterCore::Convert( std::vector* op_func_nodes) { auto& vec_meta_info = global_scope_->MutableVecMetaInfo(); @@ -86,7 +103,6 @@ void InterpreterCore::Convert( auto op_nums = nodes.size(); vec_instruction_.reserve(op_nums); - dependecy_count_.resize(op_nums); for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { auto& op_func_node = nodes[op_idx]; @@ -146,30 +162,7 @@ void InterpreterCore::Convert( } } - for (size_t i = 0; i < vec_instruction_.size(); ++i) { - std::vector vec_temp; - for (auto& item : vec_instruction_[i].Outputs()) { - for (auto id : item.second) { - vec_temp = interpreter::merge_vector(vec_temp, input_var2op_info_[id]); - } - } - - // In Program, op order is a very important information. - // Op can only add op after it as next as next ops. - std::vector filter_next; - filter_next.reserve(vec_temp.size()); - for (auto item : vec_temp) { - if (item > i) { - filter_next.push_back(item); - } - } - - stream_analyzer_.Schedule(filter_next, &vec_instruction_, i); - - for (auto inst_id : filter_next) { - dependecy_count_[inst_id]++; - } - } + BuildOperatorDependences(); for (size_t i = 0; i < vec_instruction_.size(); ++i) { BuildAndCacheInstructionCtx(&vec_instruction_[i]); @@ -289,7 +282,7 @@ void InterpreterCore::BuildSkipShareLoDInfo() { void InterpreterCore::RunInstruction(const Instruction& instr_node) { auto* op = instr_node.OpBase(); auto place = instr_node.DeviceContext().GetPlace(); - VLOG(4) << place << " " << op->DebugStringEx(global_scope_); + VLOG(4) << "Start run" << place << " " << op->DebugStringEx(global_scope_); auto op_with_kernel = dynamic_cast(op); { @@ -320,7 +313,7 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get()); } - VLOG(3) << place << " " << op->DebugStringEx(global_scope_); + VLOG(4) << "End run" << place << " " << op->DebugStringEx(global_scope_); /*For profiling/benchmark only*/ if (FLAGS_benchmark) { @@ -494,6 +487,8 @@ void InterpreterCore::CheckGC(const Instruction& instr) { continue; } if (is_ready) { + VLOG(6) << "Async delete variable with name : " + << var_scope.GetNameById(var_id); gc_->Add(var_scope.Var(var_id), gc_event_.at(instr_id), &instr.DeviceContext()); } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 0925d715574fc..9902b9712b952 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -78,6 +78,8 @@ class InterpreterCore { void BuildSkipShareLoDInfo(); + void BuildOperatorDependences(); + bool is_build_; const platform::Place& place_; diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 011b1b6dece8e..b135c214bd33c 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -631,6 +631,97 @@ std::vector merge_vector(const std::vector& first, return out; } +void update_var_min_rw_op(const std::map>& op2dependences, + std::map>& var2min_rw_op, + int cur_op, int rw_var) { + // rw_var is inputs or outputs of cur_op + // this function update the var2min_rw_op set . + if (var2min_rw_op.find(rw_var) == var2min_rw_op.end()) + var2min_rw_op[rw_var] = std::list(); + for (auto dep_op : op2dependences.at(cur_op)) { + var2min_rw_op[rw_var].remove(dep_op); + } + var2min_rw_op[rw_var].push_back(cur_op); +} + +std::map> get_downstream_map( + const std::map>& op2dependences) { + // op2dependences is op -> it's dependences. we want to get op -> [ops] map, + // where ops is the next instruction of op. + std::map> result; + for (auto& item : op2dependences) { + int op = item.first; + for (auto dep_op : item.second) { + if (result.find(dep_op) == result.end()) + result[dep_op] = std::list(); + result[dep_op].push_back(op); + } + } + return std::move(result); +} + +std::map> build_op_downstream_map( + const std::vector& vec_instruction) { + auto var2min_rw_op = std::map< + int, std::list>(); // # map from variable id to read / write op id. + auto var2recent_write_op = + std::map(); // # map from variable to recent write op. + auto op2dependences = + std::map>(); //# map from op to the dependence list, + // op must run after the dependence. + std::set + remove_duplicate; // remove the duplicate between inputs and outputs + + // reserve + for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + op2dependences[op_idx] = std::set(); + } + + for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + remove_duplicate.clear(); + // step1: update the op2dependences structure + for (auto& item : + vec_instruction[op_idx].Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + if (var2recent_write_op.count(var)) + op2dependences[op_idx].insert(var2recent_write_op[var]); + } + } + + for (auto& item : + vec_instruction[op_idx].Outputs()) { // for all write vars + for (auto var : item.second) { + if (var2min_rw_op.count(var)) { + for (auto dep_op : var2min_rw_op[var]) { + op2dependences[op_idx].insert(dep_op); + } + } + } + } + + // step2: update 2 var2xxxx data structure + for (auto& item : + vec_instruction[op_idx].Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + update_var_min_rw_op(op2dependences, var2min_rw_op, op_idx, var); + remove_duplicate.insert(var); + } + } + + for (auto& item : + vec_instruction[op_idx].Outputs()) { // for all write vars + for (auto var : item.second) { + var2recent_write_op[var] = op_idx; + if (remove_duplicate.count(var) == + 0) { // var in input list and in output list, so remove it. + update_var_min_rw_op(op2dependences, var2min_rw_op, op_idx, var); + } + } + } + } + return std::move(get_downstream_map(op2dependences)); +} + } // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 375fed2356a01..f3b1a8a6b4a53 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -105,6 +105,9 @@ void build_op_func_list(const platform::Place& place, std::vector* vec_func_list, VariableScope* var_scope); +std::map> build_op_downstream_map( + const std::vector& vec_instruction); + void add_fetch(const std::vector& fetch_names, framework::BlockDesc* block); diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py new file mode 100644 index 0000000000000..5e298fc3dc7a6 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py @@ -0,0 +1,55 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import unittest +import paddle +from paddle.fluid import core +from paddle.fluid.core import StandaloneExecutor +import paddle.fluid as fluid +from paddle.fluid.framework import Program, program_guard +import paddle.fluid.layers as layers + +from test_standalone_controlflow import TestCompatibility +import numpy as np + +paddle.enable_static() + + +class TestMultiplyWrite(TestCompatibility): + def _get_feed(self): + """ return the feeds + """ + return None + + def build_program(self): + main_program = paddle.static.default_main_program() + startup_program = paddle.static.default_startup_program() + with paddle.static.program_guard(main_program, startup_program): + out = paddle.full((1, ), 1) + inp1 = paddle.full((1, ), 2) + inp2 = paddle.full((1, ), 3) + + paddle.fluid.layers.assign(inp1, out) + paddle.fluid.layers.assign(inp2, out) + return main_program, startup_program, out + + def setUp(self): + self.place = paddle.CPUPlace() + self.iter_run = 5 + + +if __name__ == "__main__": + unittest.main()