-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backward on parallel do using nccl #8361
Changes from 33 commits
67881ad
634f523
1c91574
672cdc2
e9ddaab
f2129b1
0815c0f
23bbaad
0d57ca4
bb3ae20
4bb492e
bfa78ca
cd9e660
3067114
82c33c6
816fa8f
ae2296e
190119b
0e2deaa
0c45eab
f35401c
37792e5
3c47c73
da97d9d
5f343e3
a259ad4
7129fa3
e021ad6
3f09620
bea80b0
9d26f1a
1d9fd1c
5229ccb
eb82b5c
3494b79
ec01f63
ae69f0b
4b957af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,8 +28,8 @@ using Tensor = framework::Tensor; | |
|
||
// Base convolution operator definations for other conv | ||
// like operators to reuse the implementation. | ||
inline int OutputSize(int input_size, int filter_size, int dilation, | ||
int padding, int stride) { | ||
inline int ConvOutputSize(int input_size, int filter_size, int dilation, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name |
||
int padding, int stride) { | ||
const int dkernel = dilation * (filter_size - 1) + 1; | ||
const int output_size = (input_size + 2 * padding - dkernel) / stride + 1; | ||
return output_size; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,10 +14,13 @@ limitations under the License. */ | |
|
||
#include "paddle/fluid/framework/op_registry.h" | ||
#include "paddle/fluid/operators/nccl/nccl_gpu_common.h" | ||
#include "paddle/fluid/operators/nccl/nccl_gpu_common.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo. thanks for pointing it out. |
||
|
||
namespace paddle { | ||
namespace operators { | ||
|
||
static constexpr char kParallelScopes[] = "parallel_scopes"; | ||
|
||
// NCCLinitOp | ||
class NCCLInitOp : public framework::OperatorBase { | ||
public: | ||
|
@@ -29,11 +32,22 @@ class NCCLInitOp : public framework::OperatorBase { | |
private: | ||
void RunImpl(const framework::Scope &scope, | ||
const platform::Place &place) const override { | ||
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(Input(kParallelScopes)), | ||
"Can not find variable '%s' in the scope.", | ||
kParallelScopes); | ||
const auto &name = Output("Communicator"); | ||
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(name), | ||
"Can not find variable '%s' in the scope.", name); | ||
std::vector<int> gpus = Attr<std::vector<int>>("gpus"); | ||
PADDLE_ENFORCE(!gpus.empty(), "Attr(gpus) should not be empty."); | ||
// A parallel do may not use all the gpus. For example, the batch size is 7 | ||
// in the last batch while we have 8 gpu. In this case, parallel_do will | ||
// create 7 parallel scopes, so should ncclInitOp create 7 gpu peers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mentioned "last batch", is it implying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
auto ¶llel_scopes = scope.FindVar(Input(kParallelScopes)) | ||
->Get<std::vector<framework::Scope *>>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When do we need to serialize scope? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, never mind, I got confused. |
||
std::vector<int> gpus(parallel_scopes.size()); | ||
for (int i = 0; i < static_cast<int>(parallel_scopes.size()); ++i) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't know |
||
gpus[i] = i; | ||
} | ||
PADDLE_ENFORCE(!gpus.empty(), "NCCL init with 0 gpus."); | ||
|
||
if (scope.FindVar(name) == nullptr) { | ||
PADDLE_THROW("Output(Communicator) is needed for ncclInit operator."); | ||
|
@@ -45,17 +59,29 @@ class NCCLInitOp : public framework::OperatorBase { | |
} | ||
}; | ||
|
||
class NCCLInitOpVarTypeInference : public framework::VarTypeInference { | ||
public: | ||
void operator()(const framework::OpDesc &op_desc, | ||
framework::BlockDesc *block) const override { | ||
auto out_var_name = op_desc.Output("Communicator").front(); | ||
auto &out_var = block->FindRecursiveOrCreateVar(out_var_name); | ||
auto var_type = framework::proto::VarType::NCCL_COM; | ||
out_var.SetType(var_type); | ||
} | ||
}; | ||
|
||
class NCCLInitOpShapeInference : public framework::InferShapeBase { | ||
public: | ||
void operator()(framework::InferShapeContext *ctx) const override {} | ||
}; | ||
|
||
class NCCLInitOpMaker : public framework::OpProtoAndCheckerMaker { | ||
public: | ||
NCCLInitOpMaker(OpProto *proto, OpAttrChecker *op_checker) | ||
: OpProtoAndCheckerMaker(proto, op_checker) { | ||
AddInput(kParallelScopes, "The working place of parallel do."); | ||
AddOutput("Communicator", | ||
"Create Communicator for communicating between gpus"); | ||
AddAttr<std::vector<int>>("gpus", "(vector<int>) GPU id lists"); | ||
AddAttr<int>("dtype", | ||
"(int, default 5 (FP32)) " | ||
"Output data type") | ||
.SetDefault(framework::proto::DataType::FP32); | ||
AddComment(R"DOC( | ||
NCCLInit Operator. | ||
|
||
|
@@ -78,7 +104,7 @@ class NCCLAllReduceOp : public framework::OperatorWithKernel { | |
ctx->HasInput("Communicator"), | ||
" Input(Communicator) of AllReduce op input should not be NULL"); | ||
PADDLE_ENFORCE(ctx->HasOutput("Out"), | ||
" Input(X) of AllReduce op input should not be NULL"); | ||
" Output(Out) of AllReduce op output should not be NULL"); | ||
|
||
auto x_dims = ctx->GetInputsDim("X"); | ||
|
||
|
@@ -215,7 +241,9 @@ Bcast the tensors. | |
|
||
namespace ops = paddle::operators; | ||
REGISTER_OPERATOR(ncclInit, ops::NCCLInitOp, | ||
paddle::framework::EmptyGradOpMaker, ops::NCCLInitOpMaker); | ||
paddle::framework::EmptyGradOpMaker, ops::NCCLInitOpMaker, | ||
ops::NCCLInitOpVarTypeInference, | ||
ops::NCCLInitOpShapeInference); | ||
|
||
REGISTER_OP_WITHOUT_GRADIENT(ncclAllReduce, ops::NCCLAllReduceOp, | ||
ops::NCCLAllReduceOpMaker); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ static constexpr char kOutputs[] = "outputs"; | |
static constexpr char kParallelScopes[] = "parallel_scopes"; | ||
|
||
static constexpr char kParallelBlock[] = "sub_block"; | ||
static constexpr char kUseNCCL[] = "use_nccl"; | ||
|
||
using LoDTensor = framework::LoDTensor; | ||
using SelectedRows = framework::SelectedRows; | ||
|
@@ -194,6 +195,8 @@ class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker { | |
AddOutput(kOutputs, "").AsDuplicable(); | ||
AddOutput(kParallelScopes, ""); | ||
AddAttr<framework::BlockDesc *>(kParallelBlock, ""); | ||
AddAttr<bool>(kUseNCCL, "true if we use nccl on backward") | ||
.SetDefault(false); | ||
AddComment(R"DOC( | ||
ParallelDo Operator. | ||
)DOC"); | ||
|
@@ -216,7 +219,6 @@ class ParallelDoGradOp : public framework::OperatorBase { | |
|
||
auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) | ||
->Get<std::vector<framework::Scope *>>(); | ||
|
||
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>(); | ||
|
||
// feed output@grad | ||
|
@@ -243,14 +245,34 @@ class ParallelDoGradOp : public framework::OperatorBase { | |
} | ||
WaitOnPlaces(places); | ||
|
||
AccumulateGrad(scope, place, sub_scopes, places); | ||
// NCCL allreduce op will be added by backward, | ||
// so no need to explicitly accumulate grad | ||
if (!(Attr<bool>(kUseNCCL))) { | ||
AccumulateGrad(scope, place, sub_scopes, places); | ||
} else { | ||
for (auto &place : places) { | ||
PADDLE_ENFORCE(platform::is_gpu_place(place), | ||
"NCCL only supports cuda place"); | ||
} | ||
} | ||
for (auto &s : Outputs(framework::GradVarName(kParameters))) { | ||
if (s == "@EMPTY@") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backward will change some of the gradients to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure. |
||
continue; | ||
} | ||
VLOG(3) << "Moving " << s; | ||
CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s)); | ||
} | ||
WaitOnPlaces(places); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this PR, but I am curious why parallel do has to wait for all stream to complete? I thought even the executor does not wait. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't think of a case where it is wrong without waiting. But just as we always wait for threads to be joined after we launched them, I feel it's nature for parallel_do to wait for all streams. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could affect performance. We introduced a synchronization point which we are not sure if we need. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are several places in parallel do where we have to wait. This line won't be a large effect |
||
} | ||
|
||
void AccumulateGrad(const framework::Scope &scope, | ||
const platform::Place &place, | ||
const std::vector<framework::Scope *> &sub_scopes, | ||
const platform::PlaceList &places) const { | ||
for (auto &s : Outputs(framework::GradVarName(kParameters))) { | ||
if (s == "@EMPTY@") { | ||
continue; | ||
} | ||
VLOG(3) << "Accumulating " << s; | ||
if (s == framework::kEmptyVarName) continue; | ||
std::string tmp_name; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -199,12 +199,76 @@ def _op_can_be_removed_(op_desc, no_grad_set): | |
return op_descs | ||
|
||
|
||
import proto.framework_pb2 as framework_pb2 | ||
|
||
|
||
def serialize_op_decs(op_desc): | ||
protostr = op_desc.serialize_to_string() | ||
proto = framework_pb2.OpDesc.FromString(str(protostr)) | ||
return proto.__str__() | ||
|
||
|
||
def _callback_lookup_(op): | ||
""" | ||
Only used in _append_backward_ops_ | ||
Build and returns a callback function for certain op. For example | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add more comment about what is callback function? (e.g, is it something gets called after a OP is completed?) |
||
|
||
parallel_do: AllReduce | ||
|
||
:param op: | ||
:return: callback function | ||
""" | ||
if op.type == 'parallel_do' and op.attr('use_nccl'): | ||
param_names = set(op.input('parameters')) | ||
param_grad_names = [n + "@GRAD" for n in param_names] | ||
|
||
class ParallelDoCallBack(object): | ||
def __init__(self, param_grad_names, parallel_scopes_name): | ||
self.has_inserted_nccl_init = False | ||
self.param_grad_names = param_grad_names | ||
self.parallel_scopes_name = parallel_scopes_name | ||
|
||
def __call__(self, block, context): | ||
if not self.has_inserted_nccl_init: | ||
op_desc = _create_op_desc_( | ||
"ncclInit", | ||
{"parallel_scopes": self.parallel_scopes_name}, | ||
{"Communicator": ['nccl_com__do_not_change_']}, {}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you put |
||
block.program.global_block().desc.append_op().copy_from( | ||
op_desc) | ||
self.has_inserted_nccl_init = True | ||
|
||
current_op_desc = context["__current_op_desc__"] | ||
for o_param in current_op_desc.output_names(): | ||
for o_argu in current_op_desc.output(o_param): | ||
if o_argu in self.param_grad_names: | ||
allreduce_out_name = o_argu + "__nccl_all_reduce__" | ||
op_desc = _create_op_desc_( | ||
"ncclAllReduce", { | ||
"X": [o_argu], | ||
"Communicator": | ||
['nccl_com__do_not_change_'] | ||
}, {"Out": [allreduce_out_name]}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the ncclAllreduce requires a buffer memory to hold the result, i.e. it doesn't support in place. |
||
{"reduction": "ncclSum"}) | ||
block.desc.append_op().copy_from(op_desc) | ||
|
||
op_desc = _create_op_desc_( | ||
"assign", {"X": [allreduce_out_name]}, | ||
{"Out": [o_argu]}, {}) | ||
block.desc.append_op().copy_from(op_desc) | ||
|
||
return ParallelDoCallBack(param_grad_names, | ||
op.output("parallel_scopes")) | ||
else: | ||
return None | ||
|
||
|
||
def _append_backward_ops_(block, | ||
ops, | ||
target_block, | ||
no_grad_dict, | ||
grad_to_var, | ||
callback=None): | ||
callbacks=None): | ||
""" | ||
Create all grad ops, and insert them into given block | ||
|
||
|
@@ -220,14 +284,11 @@ def _append_backward_ops_(block, | |
val(str): corresponding forward variable name | ||
callback(callable object): a callable object used to decorate new generated grad ops | ||
""" | ||
if callback is None: | ||
|
||
def empty_callback(block, context): | ||
pass | ||
|
||
callback = empty_callback | ||
elif not hasattr(callback, '__call__'): | ||
raise ValueError("'callback' must be a callable object.") | ||
if callbacks is not None: | ||
assert (isinstance(callbacks, list)) | ||
for cb in callbacks: | ||
if not hasattr(cb, '__call__'): | ||
raise ValueError("'callback' must be a callable object.") | ||
|
||
# grad_op_descs holds created grad_op, and will be appended to target_block | ||
grad_op_descs = [] | ||
|
@@ -238,8 +299,17 @@ def empty_callback(block, context): | |
if op.has_attr("sub_block"): | ||
sub_block = program.block(op.block_attr("sub_block")) | ||
grad_sub_block = program.create_block(parent_idx=sub_block.idx) | ||
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, | ||
no_grad_dict, grad_to_var) | ||
cb = _callback_lookup_(op) | ||
if cb is not None: | ||
if callbacks is None: | ||
new_callbacks = [cb] | ||
else: | ||
new_callbacks = callbacks + [_callback_lookup_(op)] | ||
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, | ||
no_grad_dict, grad_to_var, new_callbacks) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
else: | ||
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block, | ||
no_grad_dict, grad_to_var, callbacks) | ||
grad_sub_block_list.append(grad_sub_block.desc) | ||
|
||
# Getting op's corresponding grad_op | ||
|
@@ -258,7 +328,11 @@ def empty_callback(block, context): | |
for op_desc in grad_op_descs: | ||
new_op_desc = target_block.desc.append_op() | ||
new_op_desc.copy_from(op_desc) | ||
callback(block=target_block, context=grad_to_var) | ||
grad_to_var["__current_op_desc__"] = new_op_desc | ||
if callbacks is not None: | ||
assert (isinstance(callbacks, list)) | ||
for cb in callbacks: | ||
cb(block=target_block, context=grad_to_var) | ||
|
||
|
||
def _append_backward_vars_(block, start_op_idx, grad_to_var, grad_info_map): | ||
|
@@ -296,6 +370,9 @@ def _append_backward_vars_(block, start_op_idx, grad_to_var, grad_info_map): | |
# infer_shape and infer_type | ||
op_desc.infer_var_type(block.desc) | ||
op_desc.infer_shape(block.desc) | ||
# ncclInit dones't need to set data_type | ||
if op_desc.type() == 'ncclInit': | ||
continue | ||
for arg in op_desc.output_arg_names(): | ||
if arg in new_vars: | ||
_infer_var_data_type_(arg, block) | ||
|
@@ -335,7 +412,8 @@ def _get_stop_gradients_(program): | |
return no_grad_dict | ||
|
||
|
||
def append_backward(loss, parameter_list=None, no_grad_set=None, callback=None): | ||
def append_backward(loss, parameter_list=None, no_grad_set=None, | ||
callbacks=None): | ||
""" | ||
Append backward part to main_program | ||
|
||
|
@@ -351,6 +429,8 @@ def append_backward(loss, parameter_list=None, no_grad_set=None, callback=None): | |
(list[(Variable,Variable)]): list of (parameter, gradient) pair. | ||
""" | ||
assert isinstance(loss, framework.Variable) | ||
if callbacks is not None: | ||
isinstance(callbacks, list) | ||
|
||
program = loss.block.program | ||
if no_grad_set is None: | ||
|
@@ -378,7 +458,7 @@ def append_backward(loss, parameter_list=None, no_grad_set=None, callback=None): | |
no_grad_dict[0].update(map(_append_grad_suffix_, block_no_grad_set)) | ||
|
||
_append_backward_ops_(root_block, op_path, root_block, no_grad_dict, | ||
grad_to_var, callback) | ||
grad_to_var, callbacks) | ||
|
||
# Because calc_gradient may be called multiple times, | ||
# we need rename the internal gradient variables so that they have | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was debugging conv_op.cc, looks like OutputSize has been linked to another function...