From 05babf201ae265c7684a24d6c0846fc093873241 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 14:20:14 +0800 Subject: [PATCH 01/19] origin on python side --- .../fleet_executor/fleet_executor.cc | 45 +++++----- .../distributed/fleet/fleet_executor_utils.py | 7 ++ python/paddle/fluid/executor.py | 47 +++++----- .../test_fleet_executor_origin_schduler.py | 87 +++++++++++++++++++ 4 files changed, 138 insertions(+), 48 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 0369c442734a4..aa00113a6cf6e 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -40,32 +40,29 @@ void FleetExecutor::Init( const framework::ProgramDesc& program_desc, framework::Scope* scope, const platform::Place& place, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank) { - if (task_nodes.size() == 0) { - LOG(INFO) << "fleet executor will use c++ side scheduler construction."; - runtime_graph_ = std::make_shared(program_desc, exe_desc_); - } else { - LOG(INFO) << "fleet executor has been set dependency on python side."; - // TODO(fleet_exe devs): the unused_vars should be got from run time graph - std::vector> ops; - for (auto task_node : task_nodes) { - for (auto op : task_node->ops()) { - ops.emplace_back(std::unique_ptr(op)); - } - } - auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {}); - runtime_graph_ = std::make_shared(); - std::unordered_map interceptor_id_to_task; - for (auto task_node : task_nodes) { - task_node->SetUnusedVars(unused_vars); - int64_t interceptor_id = task_node->task_id(); - interceptor_id_to_task.emplace(interceptor_id, task_node); - } - runtime_graph_->SetInterceptorIdToRank(task_id_to_rank); - runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task); - for (auto& unique_op : ops) { - unique_op.release(); + PADDLE_ENFORCE_GT(task_nodes.size(), 0, + platform::errors::InvalidArgument( + "Fleet executor is inited with empty task node")); + // TODO(fleet_exe devs): the unused_vars should be got from run time graph + std::vector> ops; + for (auto task_node : task_nodes) { + for (auto op : task_node->ops()) { + ops.emplace_back(std::unique_ptr(op)); } } + auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {}); + runtime_graph_ = std::make_shared(); + std::unordered_map interceptor_id_to_task; + for (auto task_node : task_nodes) { + task_node->SetUnusedVars(unused_vars); + int64_t interceptor_id = task_node->task_id(); + interceptor_id_to_task.emplace(interceptor_id, task_node); + } + runtime_graph_->SetInterceptorIdToRank(task_id_to_rank); + runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task); + for (auto& unique_op : ops) { + unique_op.release(); + } root_scope_ = scope; place_ = place; PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 9422774bb6425..6b1b623171259 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -201,3 +201,10 @@ def create_task_node(role, ops, offset, node_type): for j in range(num_of_functionality): task_id_to_rank[int(i * num_of_functionality + j)] = i return task_nodes, task_id_to_rank + + +def origin(program, cur_rank): + task_node = core.TaskNode(program, cur_rank, 1, 1) + task_id = task_node.task_id() + task_id_to_rank = {task_id, cur_rank} + return task_node, task_id_to_rank diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b00449a747542..955b2fb3939a0 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1972,31 +1972,30 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): rank_info.rank = rank rank_info.ip_port = endpoint fleet_exe_desc.cluster_info.append(rank_info) - if "dist_strategy" in fleet_opt: - fleet_exe_desc.dp_degree = fleet_opt["dist_strategy"]["dp_degree"] - fleet_exe_desc.mp_degree = fleet_opt["dist_strategy"]["mp_degree"] - fleet_exe_desc.pp_degree = fleet_opt["dist_strategy"]["pp_degree"] - if "num_micro_batches" in fleet_opt: - fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"] - num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree - assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu." - if 'python_side' in fleet_opt: - strategy = fleet_opt['python_side'] - if strategy == '1F1B': - from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b - tasks, task_id_to_rank = one_f_one_b( - program, cur_rank, - fleet_opt.get('num_micro_batches', 1), - fleet_opt.get('dist_strategy', {}), nrank) - # NOTE: have to hold these vars, otherwise will be destructed - fleet_opt['tasks'] = tasks - fleet_opt['task_id_to_rank'] = task_id_to_rank - else: - raise "Fleet_executor only supports 1F1B scheduler if you choose python side split, " \ - "but received " + str(strategy) + "." + assert 'scheduler' in fleet_opt, \ + "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." + strategy = fleet_opt['scheduler'] + if strategy == '1F1B': + from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b + tasks, task_id_to_rank = one_f_one_b( + program, cur_rank, + fleet_opt.get('num_micro_batches', 1), + fleet_opt.get('dist_strategy', {}), nrank) + elif strategy == 'Origin': + from paddle.distributed.fleet.fleet_executor_utils import origin + if "dist_strategy" in fleet_opt: + assert fleet_opt["dist_strategy"]["pp_degree"] == 1, \ + "For pipeline mode, the scheduler should be 1F1B instead of Origin" + if "num_micro_batches" in fleet_opt: + assert fleet_opt["num_micro_batches"] == 1, \ + "For origin scheduler mode, the num micro batches should be 1" + tasks, task_id_to_rank = origin(program, cur_rank) else: - task_id_to_rank = fleet_opt.get("task_id_to_rank", {}) - tasks = fleet_opt.get("tasks", []) + raise "Fleet_executor only supports 1F1B and Origin scheduler, " \ + "but received " + str(strategy) + "." + # NOTE: have to hold these vars, otherwise will be destructed + fleet_opt['tasks'] = tasks + fleet_opt['task_id_to_rank'] = task_id_to_rank fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) place = core.Place() place.set_place(self.place) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py new file mode 100644 index 0000000000000..524379cdf7f44 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py @@ -0,0 +1,87 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import paddle +import paddle.fluid as fluid + +paddle.enable_static() + + +class TestFleetExecutor(unittest.TestCase): + def fake_fleet_opt(self): + # TODO: Fake for coverage will be removed in the future + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.sharding_configs = { + "dp_degree": 1, + "mp_degree": 1, + "pp_degree": 1 + } + strategy.pipeline_configs = {"accumulate_steps": 1} + fleet_opt = { + "dist_strategy": strategy.sharding_configs, + "num_micro_batches": strategy.pipeline_configs["accumulate_steps"], + "python_side": "Origin" + } + return fleet_opt + + def run_fleet_executor(self, place, x_data, y_data): + exe = paddle.static.Executor(place) + empty_program = paddle.static.Program() + with fluid.program_guard(empty_program, empty_program): + x = fluid.layers.data( + name='x', shape=x_data.shape, dtype=x_data.dtype) + y = fluid.layers.data( + name='y', shape=y_data.shape, dtype=y_data.dtype) + z = x + y + a = 2 * x + 3 * y + loss = paddle.mean(a) + base_lr = 0.1 + passes = [30, 60, 80, 90] + steps_per_pass = 10 + bd = [steps_per_pass * p for p in passes] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + lr_val = paddle.optimizer.lr.PiecewiseDecay( + boundaries=bd, values=lr) + opt = paddle.optimizer.AdamW( + learning_rate=lr_val, + grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0)) + opt.minimize(loss) + # TODO: section_program will be removed in the future + empty_program._pipeline_opt = { + "fleet_opt": self.fake_fleet_opt(), + "section_program": empty_program + } + res = exe.run(empty_program, + feed={'x': x_data, + 'y': y_data}, + fetch_list=[z.name, a.name]) + return res + + def test_executor_on_single_device(self): + if fluid.is_compiled_with_cuda(): + shape = (10000, 3462) + x_data = np.random.rand(*shape) + y_data = np.random.rand(*shape) + z_data = x_data + y_data + a_data = 2 * x_data + 3 * y_data + res = self.run_fleet_executor(fluid.CUDAPlace(0), x_data, y_data) + self.assertTrue(np.allclose(res[0], z_data)) + self.assertTrue(np.allclose(res[1], a_data)) + + +if __name__ == "__main__": + unittest.main() From 3d349f1c24b1674266a533062fe056413310e9a5 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 14:36:27 +0800 Subject: [PATCH 02/19] remove run time graph and task node useless funct --- .../fleet_executor/runtime_graph.cc | 289 +----------------- .../fleet_executor/runtime_graph.h | 19 -- .../distributed/fleet_executor/task_node.cc | 19 -- .../distributed/fleet_executor/task_node.h | 5 - 4 files changed, 2 insertions(+), 330 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index 32f9e36e53037..1ad144470af26 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -14,300 +14,15 @@ #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" -#include "paddle/fluid/framework/executor_gc_helper.h" -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/operator.h" -#include "paddle/fluid/framework/program_desc.h" namespace paddle { namespace distributed { -namespace { - -using OperatorBase = RuntimeGraph::OperatorBase; -using OpRole = paddle::framework::OpRole; -using OpRegistry = paddle::framework::OpRegistry; -using ProgramDesc = paddle::framework::ProgramDesc; - -bool IsForward(int32_t op_role) { - return (op_role == static_cast(OpRole::kForward)) || - (op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss))); -} - -bool IsLRSched(int32_t op_role) { - return op_role == static_cast(OpRole::kLRSched); -} - -bool IsBackward(int32_t op_role) { - return (op_role == static_cast(OpRole::kBackward)) || - (op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))); -} - -bool IsOptimize(int32_t op_role) { - return op_role == static_cast(OpRole::kOptimize); -} - -struct DistCoord { - int32_t dp_idx; - int32_t pp_idx; - int32_t mp_idx; -}; - -class DistCoordSys final { - public: - DistCoordSys(int32_t dp_degree, int32_t pp_degree, int32_t mp_degree) - : dp_degree_(dp_degree), pp_degree_(pp_degree), mp_degree_(mp_degree) {} - DistCoord RankToCoord(int64_t rank) const; - int64_t CoordToRank(const DistCoord& coord) const; - - private: - DISABLE_COPY_AND_ASSIGN(DistCoordSys); - bool InvalidCoord(const DistCoord& coord) const; - int32_t dp_degree_; - int32_t pp_degree_; - int32_t mp_degree_; -}; - -DistCoord DistCoordSys::RankToCoord(int64_t rank) const { - DistCoord coord; - coord.mp_idx = rank % mp_degree_; - rank /= mp_degree_; - coord.pp_idx = rank % pp_degree_; - rank /= pp_degree_; - coord.dp_idx = rank % dp_degree_; - return coord; -} - -int64_t DistCoordSys::CoordToRank(const DistCoord& coord) const { - if (InvalidCoord(coord)) { - return -1; - } - return coord.dp_idx * pp_degree_ * mp_degree_ + coord.pp_idx * mp_degree_ + - coord.mp_idx; -} - -bool DistCoordSys::InvalidCoord(const DistCoord& coord) const { - return coord.mp_idx < 0 || coord.mp_idx >= mp_degree_ || coord.pp_idx < 0 || - coord.pp_idx >= pp_degree_ || coord.dp_idx < 0 || - coord.dp_idx >= dp_degree_; -} - -} // namespace - -std::vector RuntimeGraph::functionality_order = { - OpRole::kLRSched, OpRole::kForward, OpRole::kBackward, OpRole::kOptimize}; - -RuntimeGraph::RuntimeGraph(const ProgramDesc& program, - const FleetExecutorDesc& exe_desc) - : exe_desc_(exe_desc) { - if (exe_desc.pp_degree() == 1) { - OriginProgramCompile(program); - } else { - SplitProgramBasedFunctionality(program); - AssignTaskToIntercepter(); - FakeDependence(); - FakeRuntimeInfo(); - } -} - -void RuntimeGraph::OriginProgramCompile(const ProgramDesc& program) { - int64_t cur_rank = exe_desc_.cur_rank(); - int64_t max_run_times = exe_desc_.num_micro_batches(); - int64_t max_slot_nums = exe_desc_.num_slots(); - - auto task_node = std::make_unique(program, cur_rank, max_run_times, - max_slot_nums); - // TODO(wangxi): add skip vars - auto unused_vars = - framework::GetUnusedVars(program.Block(0), task_node->unique_ops(), {}); - task_node->SetType("Compute"); - task_node->SetUnusedVars(unused_vars); - - task_nodes_.emplace_back(std::move(task_node)); - int64_t task_id = task_nodes_[0]->task_id(); - intercepter_id_to_rank_.insert({task_id, cur_rank}); - intercepter_id_to_node_.insert({task_id, task_nodes_[0].get()}); -} - -void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { - for (const auto& op_desc : program.Block(0).AllOps()) { - ops_.emplace_back(OpRegistry::CreateOp(*op_desc)); - } - // TODO(wangxi): how to gc pipeline backward send - auto unused_vars = framework::GetUnusedVars(program.Block(0), ops_, {}); - - std::unordered_map> role_to_ops; - for (const auto& op : ops_) { - int32_t op_role = op->Attr("op_role"); - OpRole new_op_role; - if (IsLRSched(op_role)) { - new_op_role = OpRole::kLRSched; - } else if (IsForward(op_role)) { - new_op_role = OpRole::kForward; - } else if (IsBackward(op_role)) { - new_op_role = OpRole::kBackward; - } else if (IsOptimize(op_role)) { - new_op_role = OpRole::kOptimize; - } else { - PADDLE_THROW(platform::errors::PreconditionNotMet( - "The op %s is None of LRSched, Forward, Backward or Optimize.", - op->Type())); - } - int32_t new_op_role_id = static_cast(new_op_role); - if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) { - role_to_ops.insert({new_op_role_id, {}}); - } - role_to_ops.at(new_op_role_id).emplace_back(op.get()); - } - - int64_t cur_rank = exe_desc_.cur_rank(); - DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), - exe_desc_.mp_degree()); - const auto& coord = coord_sys.RankToCoord(cur_rank); - int pipeline_stage = coord.pp_idx; - int64_t num_pipeline_stages = exe_desc_.pp_degree(); - - // TODO(fleet_executor dev): start up steps should be a config `num_slots` - int64_t start_up_steps = num_pipeline_stages - pipeline_stage; - int64_t num_micro_batches = exe_desc_.num_micro_batches(); - int64_t task_id = cur_rank * functionality_order.size(); - for (std::size_t i = 0; i < functionality_order.size(); ++i) { - VLOG(3) << "Runtime graph is creating task node for: " << task_id << "."; - OpRole role = functionality_order[i]; - int32_t role_id = static_cast(role); - int64_t max_run_times = num_micro_batches; - int64_t max_slot_nums = start_up_steps; - // NOTE: use short path, each interceptor should run for max_run_times - std::vector task_ops{}; - if (role_to_ops.find(role_id) != role_to_ops.end()) { - task_ops = role_to_ops.at(role_id); - } - std::unique_ptr task_node = std::make_unique( - role_id, task_ops, cur_rank, task_id, max_run_times, max_slot_nums); - if (IsLRSched(role_id) || IsOptimize(role_id)) { - task_node->SetType("Amplifier"); - if (IsLRSched(role_id)) { - task_node->SetRunPerSteps(max_run_times); - } else { - task_node->SetRunAtOffset(max_run_times - 1); - task_node->SetRunPerSteps(max_run_times); - } - } else { - task_node->SetType("Compute"); - } - task_node->SetUnusedVars(unused_vars); - task_nodes_.emplace_back(std::move(task_node)); - ++task_id; - } -} - -void RuntimeGraph::FakeDependence() { - int64_t cur_rank = exe_desc_.cur_rank(); - DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), - exe_desc_.mp_degree()); - const auto& coord = coord_sys.RankToCoord(cur_rank); - DistCoord upstream_coord = coord, downstream_coord = coord; - upstream_coord.pp_idx -= 1; - downstream_coord.pp_idx += 1; - int64_t pp_upstream = coord_sys.CoordToRank(upstream_coord); - int64_t pp_downstream = coord_sys.CoordToRank(downstream_coord); - bool is_first_stage = (pp_upstream == -1); - bool is_last_stage = (pp_downstream == -1); - - int32_t num_of_functionality = functionality_order.size(); - // lr(1:m) -> forward -> backward -> (m:1)optimize - // ↑ ↓ - // lr(1:m) -> forward -> backward -> (m:1)optimize - // ↑ ↓ - // lr(1:m) -> forward -> backward -> (m:1)optimize - for (std::size_t i = 0; i < task_nodes_.size(); ++i) { - auto& node = task_nodes_[i]; - bool is_forward = IsForward(node->role()); - bool is_backward = IsBackward(node->role()); - - int64_t cur_id = cur_rank * num_of_functionality + i; - int64_t prev_id = cur_id - 1; - int64_t next_id = cur_id + 1; - - int64_t upstream_id = pp_upstream * num_of_functionality + i; - int64_t downstream_id = pp_downstream * num_of_functionality + i; - - // 1F1B, last stage pp_buff_size should be 1, while first stage - // pp_buff_size should be pp_degree - int64_t pp_buff_size = exe_desc_.pp_degree() - coord.pp_idx; - - std::vector> ups; - std::vector> downs; - - if (i != 0) { // not lr - int64_t buff_size = is_backward ? pp_buff_size : 2; - ups.emplace_back(prev_id, buff_size); - } - if (i != task_nodes_.size() - 1) { // not optimize - int64_t buff_size = is_forward ? pp_buff_size : 2; - downs.emplace_back(next_id, buff_size); - } - - if (is_forward) { - if (!is_first_stage) { - ups.emplace_back(upstream_id, 2); - } - if (!is_last_stage) { - downs.emplace_back(downstream_id, 2); - } - } else if (is_backward) { - if (!is_last_stage) { - ups.emplace_back(downstream_id, 2); - } - if (!is_first_stage) { - downs.emplace_back(upstream_id, 2); - } - } - - for (auto up : ups) { - VLOG(3) << "Task(" << cur_id << ") AddUpstream Task(" << up.first - << ") with buff_size=" << up.second; - node->AddUpstreamTask(up.first, up.second); - } - for (auto down : downs) { - VLOG(3) << "Task(" << cur_id << ") AddDownstream Task(" << down.first - << ") with buff_size=" << down.second; - node->AddDownstreamTask(down.first, down.second); - } - } -} - -void RuntimeGraph::AssignTaskToIntercepter() { - for (const auto& task : task_nodes_) { - int64_t intercepter_id = task->task_id(); - VLOG(3) << "Runtime graph is assigning task to interceptor: " - << intercepter_id << " with type: " << task->type() << "."; - if (intercepter_id_to_node_.find(intercepter_id) != - intercepter_id_to_node_.end()) { - PADDLE_THROW(platform::errors::PreconditionNotMet( - "Repeated intercepter id: %d", intercepter_id)); - } - intercepter_id_to_node_.insert({intercepter_id, task.get()}); - } -} - -void RuntimeGraph::FakeRuntimeInfo() { - int64_t nrank = exe_desc_.cluster_info().size(); - int32_t num_of_functionality = functionality_order.size(); - for (int64_t i = 0; i < nrank; ++i) { - for (int32_t j = 0; j < num_of_functionality; ++j) { - int64_t intercepter_id = i * num_of_functionality + j; - intercepter_id_to_rank_.insert({intercepter_id, i}); - } - } -} std::string RuntimeGraph::DebugString() const { std::ostringstream os; os << "\nRuntime Graph Debug: \n"; - for (const auto& task : task_nodes_) { - os << task->DebugString(); + for (const auto& pair : intercepter_id_to_node_) { + os << pair.second->DebugString(); os << "\n"; } return os.str(); diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.h b/paddle/fluid/distributed/fleet_executor/runtime_graph.h index 9ffc9cc2cc137..3678e2e860a9d 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.h +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.h @@ -22,21 +22,12 @@ #include "paddle/fluid/platform/macros.h" namespace paddle { -namespace framework { -class ProgramDesc; -class OperatorBase; -} - namespace distributed { class TaskNode; class RuntimeGraph final { public: - using ProgramDesc = paddle::framework::ProgramDesc; - using OperatorBase = paddle::framework::OperatorBase; RuntimeGraph() = default; - explicit RuntimeGraph(const ProgramDesc& program, - const FleetExecutorDesc& exe_desc); ~RuntimeGraph() = default; const std::unordered_map& intercepter_id_to_node() const { return intercepter_id_to_node_; @@ -56,18 +47,8 @@ class RuntimeGraph final { private: DISABLE_COPY_AND_ASSIGN(RuntimeGraph); - void SplitProgramBasedFunctionality(const ProgramDesc& program); - void FakeDependence(); - void AssignTaskToIntercepter(); - void FakeRuntimeInfo(); - void OriginProgramCompile(const ProgramDesc& program); - // LRSched, Forward, Backward, Optimize - static std::vector functionality_order; - std::vector> task_nodes_; - std::vector> ops_; std::unordered_map intercepter_id_to_node_; std::unordered_map intercepter_id_to_rank_; - FleetExecutorDesc exe_desc_; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index f03ee0acb4731..94704fd56839c 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -60,25 +60,6 @@ TaskNode::TaskNode(int32_t role, } } -TaskNode::TaskNode(int32_t role, - const std::vector& ops, - int64_t rank, int64_t task_id, int64_t max_run_times, - int64_t max_slot_nums) - : ops_(ops), - role_(role), - rank_(rank), - task_id_(task_id), - max_run_times_(max_run_times), - max_slot_nums_(max_slot_nums) {} - -TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id, - int64_t max_run_times, int64_t max_slot_nums) - : role_(role), - rank_(rank), - task_id_(task_id), - max_run_times_(max_run_times), - max_slot_nums_(max_slot_nums) {} - bool TaskNode::AddUpstreamTask(int64_t task_id, int64_t buff_size) { const auto& ret = upstream_.emplace(task_id, buff_size); return ret.second; diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index d43cd99926d41..b81bdf72bf85b 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -32,14 +32,9 @@ namespace distributed { class TaskNode final { public: using OperatorBase = paddle::framework::OperatorBase; - TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, - int64_t max_slot_nums); TaskNode(int32_t role, const std::vector& op_descs, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); - TaskNode(int32_t role, const std::vector& ops, - int64_t rank, int64_t task_id, int64_t max_run_times, - int64_t max_slot_nums); TaskNode(const paddle::framework::ProgramDesc& program, int64_t rank, int64_t max_run_times, int64_t max_slot_nums); ~TaskNode() = default; From f32b56af66c20d68dcd4bfb6c7c72f7ad48c8589 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 14:40:47 +0800 Subject: [PATCH 03/19] add type --- python/paddle/distributed/fleet/fleet_executor_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 6b1b623171259..195efd4827555 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -205,6 +205,7 @@ def create_task_node(role, ops, offset, node_type): def origin(program, cur_rank): task_node = core.TaskNode(program, cur_rank, 1, 1) + task_node.set_type("Compute") task_id = task_node.task_id() task_id_to_rank = {task_id, cur_rank} return task_node, task_id_to_rank From f749cbf4225592363a5624df9d2fbd381c17af1e Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 14:57:01 +0800 Subject: [PATCH 04/19] add some enforce check --- paddle/fluid/distributed/fleet_executor/carrier.cc | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 009df6438e270..4802bbd12063d 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -229,13 +229,12 @@ void Carrier::CreateInterceptors() { task_node->run_at_offset(), task_node->run_per_steps())); std::unique_ptr interceptor; - if (task_node->type().empty()) { - // TODO(wangxi): delete this in future - interceptor.reset(new Interceptor(interceptor_id, task_node)); - } else { - interceptor = InterceptorFactory::Create(task_node->type(), - interceptor_id, task_node); - } + PADDLE_ENFORCE_NE(task_node->type().empty(), true, + platform::errors::NotFound( + "Cannot found type for task node with id %lld", + task_node->task_id())); + interceptor = InterceptorFactory::Create(task_node->type(), interceptor_id, + task_node); interceptor->SetPlace(place_); interceptor->SetMiniBatchScope(minibatch_scope_); interceptor->SetMicroBatchScope(microbatch_scopes_); From 63f1f654ed64f494fc9bee6b6bd072a28052d63e Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 15:13:59 +0800 Subject: [PATCH 05/19] task node for ut --- .../distributed/fleet_executor/task_node.cc | 19 +++++++++++++++++++ .../distributed/fleet_executor/task_node.h | 5 +++++ 2 files changed, 24 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index 94704fd56839c..f03ee0acb4731 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -60,6 +60,25 @@ TaskNode::TaskNode(int32_t role, } } +TaskNode::TaskNode(int32_t role, + const std::vector& ops, + int64_t rank, int64_t task_id, int64_t max_run_times, + int64_t max_slot_nums) + : ops_(ops), + role_(role), + rank_(rank), + task_id_(task_id), + max_run_times_(max_run_times), + max_slot_nums_(max_slot_nums) {} + +TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id, + int64_t max_run_times, int64_t max_slot_nums) + : role_(role), + rank_(rank), + task_id_(task_id), + max_run_times_(max_run_times), + max_slot_nums_(max_slot_nums) {} + bool TaskNode::AddUpstreamTask(int64_t task_id, int64_t buff_size) { const auto& ret = upstream_.emplace(task_id, buff_size); return ret.second; diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index b81bdf72bf85b..d43cd99926d41 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -32,9 +32,14 @@ namespace distributed { class TaskNode final { public: using OperatorBase = paddle::framework::OperatorBase; + TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, + int64_t max_slot_nums); TaskNode(int32_t role, const std::vector& op_descs, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); + TaskNode(int32_t role, const std::vector& ops, + int64_t rank, int64_t task_id, int64_t max_run_times, + int64_t max_slot_nums); TaskNode(const paddle::framework::ProgramDesc& program, int64_t rank, int64_t max_run_times, int64_t max_slot_nums); ~TaskNode() = default; From 061e0c42f42272b954481eadf2f587a50f0f552f Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 15:32:13 +0800 Subject: [PATCH 06/19] fix typo --- python/paddle/distributed/fleet/fleet_executor_utils.py | 4 ++-- python/paddle/fluid/tests/unittests/test_fleet_executor.py | 2 +- .../tests/unittests/test_fleet_executor_origin_schduler.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 195efd4827555..7717fa46c7506 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -204,8 +204,8 @@ def create_task_node(role, ops, offset, node_type): def origin(program, cur_rank): - task_node = core.TaskNode(program, cur_rank, 1, 1) + task_node = core.TaskNode(program.desc, cur_rank, 1, 1) task_node.set_type("Compute") task_id = task_node.task_id() task_id_to_rank = {task_id, cur_rank} - return task_node, task_id_to_rank + return [task_node], task_id_to_rank diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_executor.py index 30ba376dd4dc5..8b73a714bbbc5 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor.py @@ -34,7 +34,7 @@ def fake_fleet_opt(self): fleet_opt = { "dist_strategy": strategy.sharding_configs, "num_micro_batches": strategy.pipeline_configs["accumulate_steps"], - "python_side": "1F1B" + "scheduler": "1F1B" } return fleet_opt diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py index 524379cdf7f44..4bbb3bff07f97 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py @@ -34,7 +34,7 @@ def fake_fleet_opt(self): fleet_opt = { "dist_strategy": strategy.sharding_configs, "num_micro_batches": strategy.pipeline_configs["accumulate_steps"], - "python_side": "Origin" + "scheduler": "Origin" } return fleet_opt From ec1a3961cce08686ea032e132aac36a10173e391 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 15:53:15 +0800 Subject: [PATCH 07/19] bug fix --- python/paddle/distributed/fleet/fleet_executor_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 7717fa46c7506..3153a8dad54cb 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -207,5 +207,5 @@ def origin(program, cur_rank): task_node = core.TaskNode(program.desc, cur_rank, 1, 1) task_node.set_type("Compute") task_id = task_node.task_id() - task_id_to_rank = {task_id, cur_rank} + task_id_to_rank = {task_id: cur_rank} return [task_node], task_id_to_rank From 1f0ca542cea2ce1af375183d2871111585ed3fee Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:07:55 +0800 Subject: [PATCH 08/19] add assertion --- python/paddle/distributed/fleet/fleet_executor_utils.py | 9 +++++++++ python/paddle/fluid/executor.py | 7 ++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 3153a8dad54cb..614305846e43c 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -204,6 +204,15 @@ def create_task_node(role, ops, offset, node_type): def origin(program, cur_rank): + """ + Origin scheduler for fleet executor, supports non-pp mode + :param program: The origin program. + :param cur_rank: Current rank (can be got from fleet.worker_index()). + :return: + task_nodes (list): four task nodes for current rank + task_id_to_rank (dict): a fake dict, since there is no upstream or downstream ,this dict won't be used + """ + print("fleet executor will use python side origin scheduler.") task_node = core.TaskNode(program.desc, cur_rank, 1, 1) task_node.set_type("Compute") task_id = task_node.task_id() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 955b2fb3939a0..9c3e0de1ff62d 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1977,13 +1977,18 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): strategy = fleet_opt['scheduler'] if strategy == '1F1B': from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b + assert "dist_strategy" in fleet_opt and \ + "pp_degree" in fleet_opt["dist_strategy"] and \ + fleet_opt["dist_strategy"]["pp_degree"] > 1, \ + "For 1F1B scheduler mode, pp_degree should be greater then 1." tasks, task_id_to_rank = one_f_one_b( program, cur_rank, fleet_opt.get('num_micro_batches', 1), fleet_opt.get('dist_strategy', {}), nrank) elif strategy == 'Origin': from paddle.distributed.fleet.fleet_executor_utils import origin - if "dist_strategy" in fleet_opt: + if "dist_strategy" in fleet_opt and "pp_degree" in fleet_opt[ + "dist_strategy"]: assert fleet_opt["dist_strategy"]["pp_degree"] == 1, \ "For pipeline mode, the scheduler should be 1F1B instead of Origin" if "num_micro_batches" in fleet_opt: From 17b8171d2fdbfa075b6b566d8aef018c12f5e1ba Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:08:27 +0800 Subject: [PATCH 09/19] typo fix --- python/paddle/fluid/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 9c3e0de1ff62d..0dacfb3f75947 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1987,8 +1987,8 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): fleet_opt.get('dist_strategy', {}), nrank) elif strategy == 'Origin': from paddle.distributed.fleet.fleet_executor_utils import origin - if "dist_strategy" in fleet_opt and "pp_degree" in fleet_opt[ - "dist_strategy"]: + if "dist_strategy" in fleet_opt and \ + "pp_degree" in fleet_opt["dist_strategy"]: assert fleet_opt["dist_strategy"]["pp_degree"] == 1, \ "For pipeline mode, the scheduler should be 1F1B instead of Origin" if "num_micro_batches" in fleet_opt: From fc9e28693ca968cddb241e0b57cea99c5a23c430 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:11:01 +0800 Subject: [PATCH 10/19] for ut --- python/paddle/fluid/executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 0dacfb3f75947..2affe40a60aae 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1977,10 +1977,10 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): strategy = fleet_opt['scheduler'] if strategy == '1F1B': from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b - assert "dist_strategy" in fleet_opt and \ - "pp_degree" in fleet_opt["dist_strategy"] and \ - fleet_opt["dist_strategy"]["pp_degree"] > 1, \ - "For 1F1B scheduler mode, pp_degree should be greater then 1." + if "dist_strategy" not in fleet_opt or \ + "pp_degree" not in fleet_opt["dist_strategy"] or \ + fleet_opt["dist_strategy"]["pp_degree"] == 1: + warnings.warn("Using 1F1B scheduler with pp_degree == 1.") tasks, task_id_to_rank = one_f_one_b( program, cur_rank, fleet_opt.get('num_micro_batches', 1), From 9df5eaf505bbe71651d21310e6afb8522d82cfde Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:15:34 +0800 Subject: [PATCH 11/19] remove useless proto --- .../distributed/fleet_executor/fleet_executor_desc.proto | 5 ----- 1 file changed, 5 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto index 6890c311ec003..d048660774b39 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto @@ -23,9 +23,4 @@ message RankInfo { message FleetExecutorDesc { optional int64 cur_rank = 1 [ default = 0 ]; // Rank id of current processor repeated RankInfo cluster_info = 2; - optional int32 dp_degree = 3 [ default = 1 ]; - optional int32 mp_degree = 4 [ default = 1 ]; - optional int32 pp_degree = 5 [ default = 1 ]; - optional int64 num_micro_batches = 6 [ default = 1 ]; - optional int64 num_slots = 7 [ default = 1 ]; } From 977f350a270128c8e7cbf75f3c9161f68811d90b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:17:47 +0800 Subject: [PATCH 12/19] take back the num_micro_batches --- .../fluid/distributed/fleet_executor/fleet_executor_desc.proto | 1 + python/paddle/fluid/executor.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto index d048660774b39..aa553557852a7 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto @@ -23,4 +23,5 @@ message RankInfo { message FleetExecutorDesc { optional int64 cur_rank = 1 [ default = 0 ]; // Rank id of current processor repeated RankInfo cluster_info = 2; + optional int64 num_micro_batches = 3 [ default = 1 ]; } diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 2affe40a60aae..b6a6be4cef30a 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1972,6 +1972,8 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): rank_info.rank = rank rank_info.ip_port = endpoint fleet_exe_desc.cluster_info.append(rank_info) + if "num_micro_batches" in fleet_opt: + fleet_exe_desc.num_micro_batches = fleet_opt['num_micro_batches'] assert 'scheduler' in fleet_opt, \ "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." strategy = fleet_opt['scheduler'] From 96c84ac156815395e79721a262aec1883957114b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:23:42 +0800 Subject: [PATCH 13/19] typo fix --- python/paddle/fluid/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b6a6be4cef30a..1824655c6844b 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1973,7 +1973,7 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): rank_info.ip_port = endpoint fleet_exe_desc.cluster_info.append(rank_info) if "num_micro_batches" in fleet_opt: - fleet_exe_desc.num_micro_batches = fleet_opt['num_micro_batches'] + fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"] assert 'scheduler' in fleet_opt, \ "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." strategy = fleet_opt['scheduler'] From e322a96535c46f5cdb71e0ecd6c39e9985d386e5 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:26:40 +0800 Subject: [PATCH 14/19] rename --- python/paddle/fluid/executor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1824655c6844b..42e520ae41bce 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1976,8 +1976,8 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"] assert 'scheduler' in fleet_opt, \ "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." - strategy = fleet_opt['scheduler'] - if strategy == '1F1B': + scheduler = fleet_opt['scheduler'] + if scheduler == '1F1B': from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b if "dist_strategy" not in fleet_opt or \ "pp_degree" not in fleet_opt["dist_strategy"] or \ @@ -1987,15 +1987,15 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): program, cur_rank, fleet_opt.get('num_micro_batches', 1), fleet_opt.get('dist_strategy', {}), nrank) - elif strategy == 'Origin': + elif scheduler == 'Origin': from paddle.distributed.fleet.fleet_executor_utils import origin if "dist_strategy" in fleet_opt and \ "pp_degree" in fleet_opt["dist_strategy"]: assert fleet_opt["dist_strategy"]["pp_degree"] == 1, \ - "For pipeline mode, the scheduler should be 1F1B instead of Origin" + "For pipeline mode, the scheduler should be 1F1B instead of Origin." if "num_micro_batches" in fleet_opt: assert fleet_opt["num_micro_batches"] == 1, \ - "For origin scheduler mode, the num micro batches should be 1" + "For origin scheduler mode, the num micro batches should be 1." tasks, task_id_to_rank = origin(program, cur_rank) else: raise "Fleet_executor only supports 1F1B and Origin scheduler, " \ From 6837aeeca8e9cd3db0f4516a7fc46927c0583247 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 16:50:09 +0800 Subject: [PATCH 15/19] typo --- python/paddle/distributed/fleet/fleet_executor_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 614305846e43c..9b9427e7c6bb6 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -210,7 +210,7 @@ def origin(program, cur_rank): :param cur_rank: Current rank (can be got from fleet.worker_index()). :return: task_nodes (list): four task nodes for current rank - task_id_to_rank (dict): a fake dict, since there is no upstream or downstream ,this dict won't be used + task_id_to_rank (dict): a fake dict, since there is no upstream or downstream, this dict won't be used """ print("fleet executor will use python side origin scheduler.") task_node = core.TaskNode(program.desc, cur_rank, 1, 1) From d0e5c8c79a697cadb2cde03bebb5598c67d74d15 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 17:05:25 +0800 Subject: [PATCH 16/19] index --- python/paddle/distributed/fleet/fleet_executor_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 9b9427e7c6bb6..77db11999469f 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -208,7 +208,7 @@ def origin(program, cur_rank): Origin scheduler for fleet executor, supports non-pp mode :param program: The origin program. :param cur_rank: Current rank (can be got from fleet.worker_index()). - :return: + :return: task_nodes (list): four task nodes for current rank task_id_to_rank (dict): a fake dict, since there is no upstream or downstream, this dict won't be used """ From 2896424b231fe1f112410794b5bff7710879a4ea Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 17:08:15 +0800 Subject: [PATCH 17/19] rename one_f_one_b --- python/paddle/distributed/fleet/fleet_executor_utils.py | 2 +- python/paddle/fluid/executor.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/distributed/fleet/fleet_executor_utils.py b/python/paddle/distributed/fleet/fleet_executor_utils.py index 77db11999469f..dba3388d18ffa 100644 --- a/python/paddle/distributed/fleet/fleet_executor_utils.py +++ b/python/paddle/distributed/fleet/fleet_executor_utils.py @@ -89,7 +89,7 @@ def is_backward_op(op_role): (op_role == (int(OpRole.Backward) ^ int(OpRole.Loss))) -def one_f_one_b(program, cur_rank, max_run_times, dist_opt, nrank): +def run1f1b(program, cur_rank, max_run_times, dist_opt, nrank): """ Split the program to support 1f1b pipeline scheduler. This funct will split the program based on the op_role. diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 42e520ae41bce..fbc1b41eb873a 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1978,12 +1978,12 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin." scheduler = fleet_opt['scheduler'] if scheduler == '1F1B': - from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b + from paddle.distributed.fleet.fleet_executor_utils import run1f1b if "dist_strategy" not in fleet_opt or \ "pp_degree" not in fleet_opt["dist_strategy"] or \ fleet_opt["dist_strategy"]["pp_degree"] == 1: warnings.warn("Using 1F1B scheduler with pp_degree == 1.") - tasks, task_id_to_rank = one_f_one_b( + tasks, task_id_to_rank = run1f1b( program, cur_rank, fleet_opt.get('num_micro_batches', 1), fleet_opt.get('dist_strategy', {}), nrank) From 109c4093158747e9383a9f745999133893bb34cc Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 17 Dec 2021 17:27:43 +0800 Subject: [PATCH 18/19] fix ci --- python/paddle/fluid/tests/unittests/CMakeLists.txt | 1 + ...rigin_schduler.py => test_fleet_executor_origin_scheduler.py} | 0 2 files changed, 1 insertion(+) rename python/paddle/fluid/tests/unittests/{test_fleet_executor_origin_schduler.py => test_fleet_executor_origin_scheduler.py} (100%) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5fdcd6d0a9d38..598ca24f59550 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -146,6 +146,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_multi_devices) + LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_origin_scheduler) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_mapper) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_task_node) endif() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_origin_scheduler.py similarity index 100% rename from python/paddle/fluid/tests/unittests/test_fleet_executor_origin_schduler.py rename to python/paddle/fluid/tests/unittests/test_fleet_executor_origin_scheduler.py From 57cf07262f99cc8bb6cc602b9003d4980d6e77e7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sun, 19 Dec 2021 16:32:53 +0800 Subject: [PATCH 19/19] bug fix --- python/paddle/fluid/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index fbc1b41eb873a..a65370d99a8a5 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1999,7 +1999,7 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None): tasks, task_id_to_rank = origin(program, cur_rank) else: raise "Fleet_executor only supports 1F1B and Origin scheduler, " \ - "but received " + str(strategy) + "." + "but received " + str(scheduler) + "." # NOTE: have to hold these vars, otherwise will be destructed fleet_opt['tasks'] = tasks fleet_opt['task_id_to_rank'] = task_id_to_rank