Skip to content

Commit

Permalink
[hybrid] Fix row parallel linear bias (#35186) (#35297)
Browse files Browse the repository at this point in the history
Co-authored-by: WangXi <wangxi16@baidu.com>
  • Loading branch information
sljlp and wangxicoding committed Aug 31, 2021
1 parent 167685e commit b36fb03
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 40 deletions.
60 changes: 35 additions & 25 deletions python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,19 @@ def _linear(x, weight, bias=None, name=None):
return res


def _set_var_distributed(var):
if var is None:
return

var.is_distributed = True

# NOTE: use current_block and find_var_recursive to support while_loop
startup_block = paddle.static.default_startup_program().current_block()
main_block = paddle.static.default_main_program().current_block()
startup_block._find_var_recursive(var.name).is_distributed = True
main_block._find_var_recursive(var.name).is_distributed = True


def _parallel_linear(x,
num_rows,
num_cols,
Expand All @@ -1095,7 +1108,7 @@ def _parallel_linear(x,
axis the dimension of the parameter of linear layer.
axis = 0: the row dimension
axid = 1: the col dimension
axis = 1: the col dimension
"""
if group is not None and not group.is_member():
Expand All @@ -1108,40 +1121,35 @@ def _parallel_linear(x,
else:
x = _c_identity(x, group=group)

if core.is_compiled_with_npu():
linear = _Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)
else:
linear = paddle.nn.Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)

linear_out = linear(x)
startup_block = paddle.static.default_startup_program().current_block()
main_block = paddle.static.default_main_program().current_block()
startup_block._find_var_recursive(linear.weight.name).is_distributed = True
main_block._find_var_recursive(linear.weight.name).is_distributed = True
linear = paddle.nn.Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)

# NOTE: npu linear function use matmul_v2 but linear use matmul
linear_function = _linear if core.is_compiled_with_npu()\
else paddle.nn.functional.linear
linear_out = linear_function(
x,
linear.weight,
# NOTE(wangxi): row split, bias need add after allreduce
None if axis == 0 else linear.bias,
linear.name)

_set_var_distributed(linear.weight)
# set is_distributed for splited bias
# if a linear layer is splited by row, each rank would hold a complete bias and they should be the same in each rank.
# if a linear layer is splited by col, the bias would also be split into each rank as its weight
if axis == 1 and linear._bias_attr != False:
startup_block._find_var_recursive(
linear.bias.name).is_distributed = True
main_block._find_var_recursive(linear.bias.name).is_distributed = True
_set_var_distributed(linear.bias)

if not gather_out: return linear_out

op_type = 'c_allreduce_sum' if axis == 0 else 'c_concat'
out_shape = list(linear_out.shape)
out_shape[0] *= 1 if axis == 0 else nranks
main_block = paddle.static.default_main_program().current_block()
out = main_block.create_var(
shape=out_shape,
dtype=linear_out.dtype,
Expand All @@ -1160,6 +1168,8 @@ def _parallel_linear(x,
'use_calc_stream': True,
'use_model_parallel': True
})
if linear.bias is not None:
out = out + linear.bias
else:
main_block.append_op(
type='c_concat',
Expand Down
29 changes: 19 additions & 10 deletions python/paddle/fluid/tests/unittests/static_model_parallel_by_col.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,38 @@
#fluid.default_main_program().random_seed = 1


def get_param_attr(weight, bias):
weight_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(weight))
bias_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(bias))
return weight_attr, bias_attr


def create_model(data, rank):
np.random.seed(2021)
np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE)
np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE)
if rank is not None:
start_col = 0 if rank == 0 else OUT_SIZE // 2
np_weight_part = np_weight[:, start_col:start_col + OUT_SIZE // 2]
np_bias_part = np_bias[start_col:start_col + OUT_SIZE // 2]

weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias_part)
result = paddle.distributed.split(
data,
size=(IN_SIZE, OUT_SIZE),
operation='linear',
axis=1,
num_partitions=MODEL_PARALLEL_SIZE,
weight_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
np_weight_part)),
bias_attr=False, )
weight_attr=weight_attr,
bias_attr=bias_attr)
else:
result = fluid.layers.fc(
data,
size=OUT_SIZE,
param_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(np_weight)),
bias_attr=False, )
weight_attr, bias_attr = get_param_attr(np_weight, np_bias)
result = fluid.layers.fc(data,
size=OUT_SIZE,
param_attr=weight_attr,
bias_attr=bias_attr)

predict = paddle.sum(result)
return predict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,39 @@
#fluid.default_main_program().random_seed = 1


def get_param_attr(weight, bias):
weight_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(weight))
bias_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(bias))
return weight_attr, bias_attr


def create_model(data, rank):
np.random.seed(2021)
np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE)
np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE)
if rank is not None:
start_row = 0 if rank == 0 else IN_SIZE // 2
np_weight_part = np_weight[start_row:start_row + IN_SIZE // 2, :]

weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias)
result = paddle.distributed.split(
data,
size=(IN_SIZE, OUT_SIZE),
operation='linear',
axis=0,
num_partitions=MODEL_PARALLEL_SIZE,
weight_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
np_weight_part)),
bias_attr=False, )
weight_attr=weight_attr,
bias_attr=bias_attr)
else:
weight_attr, bias_attr = get_param_attr(np_weight, np_bias)
result = fluid.layers.fc(
data,
size=OUT_SIZE,
param_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(np_weight)),
bias_attr=False, )
bias_attr=bias_attr)

predict = paddle.sum(result)
return predict
Expand Down

0 comments on commit b36fb03

Please sign in to comment.