Skip to content

Commit

Permalink
change use_calc_stream to sync_op
Browse files Browse the repository at this point in the history
  • Loading branch information
LiYuRio committed Sep 19, 2022
1 parent fa97e5b commit ae88cc5
Show file tree
Hide file tree
Showing 18 changed files with 118 additions and 128 deletions.
2 changes: 1 addition & 1 deletion python/paddle/distributed/auto_parallel/process_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def instantiate(self):
tmp = paddle.to_tensor(
[1], dtype="int32") if _non_static_mode() else fill_constant(
[0], dtype="int32", value="1")
paddle.distributed.all_reduce(tmp, use_calc_stream=True, group=self)
paddle.distributed.all_reduce(tmp, sync_op=True, group=self)
paddle.distributed.wait(tmp, group=self)
paddle.enable_static()

Expand Down
109 changes: 56 additions & 53 deletions python/paddle/distributed/collective.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _sharding_sync_parameters(self):
# instead of the relative logic rank id within group
src=self._hcg.get_sharding_parallel_group().ranks[rank],
group=self._hcg.get_sharding_parallel_group(),
use_calc_stream=True)
sync_op=True)

def _update_trainable(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _sync_params_and_buffers(self):
broadcast(p,
src=self._global_root_rank,
group=self.group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
wait(tensor=p, group=self.group, use_calc_stream=True)
Expand Down Expand Up @@ -415,7 +415,7 @@ def _broadcast_params(self):
broadcast(tensor=internal_storage.buffer,
src=self.group.ranks[dst_rank],
group=self.group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
wait(tensor=internal_storage.buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,18 @@ def _broadcast_final_loss(self):
1) if loss.dtype == paddle.float32 else paddle.to_tensor(0)
paddle.distributed.broadcast(is_fp32,
src=self.global_rank,
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
paddle.distributed.broadcast(loss,
src=self.global_rank,
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
else:
is_fp32 = paddle.to_tensor(1)
paddle.distributed.broadcast(
is_fp32,
src=self._hcg.get_rank_from_stage(self.num_stages - 1),
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
loss = paddle.zeros(shape=[
1
Expand All @@ -400,7 +400,7 @@ def _broadcast_final_loss(self):
paddle.distributed.broadcast(
loss,
src=self._hcg.get_rank_from_stage(self.num_stages - 1),
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
return loss

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _sync_params_and_buffers(self):
broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

def _generate_master_params(self, trainable_params):
if self.offload:
Expand Down Expand Up @@ -413,4 +413,4 @@ def _broadcast_params(self):
broadcast(tensor=internal_storage.buffer,
src=self._group.ranks[dst_rank],
group=self._group,
use_calc_stream=True)
sync_op=True)
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def __sync_buffers(self):
collective.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)

def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _sync_params_and_buffers(self):
collective.broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

def _clear_gradients(self):
assert len(self._trainable_params.keys()) > 0
Expand Down Expand Up @@ -446,7 +446,7 @@ def _sync_buffers(self):
collective.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)

def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def __sync_buffers(self):
dist.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)

Expand Down Expand Up @@ -340,7 +340,7 @@ def cleanup():
tensor=param.grad,
dst=self._group.ranks[dst_rank],
group=self._group,
use_calc_stream=True),
sync_op=True),
callback=cleanup))

# Multi stream operation will be supported later
Expand Down Expand Up @@ -396,7 +396,7 @@ def cleanup():
tensor=grad_storage.buffer,
dst=self._group.ranks[grad_storage.destination],
group=self._group,
use_calc_stream=True),
sync_op=True),
callback=cleanup))

# Multi stream operation will be supported later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _sync_params_and_buffers(self):
dist.broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
dist.wait(tensor=p, group=self._group, use_calc_stream=True)
Expand Down Expand Up @@ -435,7 +435,7 @@ def _sync_buffers(self):
dist.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)

Expand Down Expand Up @@ -478,7 +478,7 @@ def _update_params(self):
grad_storage.buffer.scale_(scale=self._world_size_scaling)
dist.all_reduce(tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True)
sync_op=True)
dist.wait(tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True)
Expand Down Expand Up @@ -541,7 +541,7 @@ def allreduce_(*_):
# Only support sync allreduce current rank's layer now
dist.all_reduce(tensor=full_grad,
group=self._group,
use_calc_stream=True)
sync_op=True)
dist.wait(tensor=full_grad,
group=self._group,
use_calc_stream=True)
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle.distributed.broadcast(shape_gpu,
src=src_rank,
group=model_parallel_group,
use_calc_stream=True)
sync_op=True)

if mp_rank != 0:
input_data = paddle.zeros(shape_gpu, dtype=dtype)
Expand All @@ -105,7 +105,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle.distributed.broadcast(input_data,
src=src_rank,
group=model_parallel_group,
use_calc_stream=True)
sync_op=True)


def broadcast_input_data(hcg, *inputs, **kwargs):
Expand Down Expand Up @@ -170,7 +170,7 @@ def sharding_reduce_gradients(parameter_list, hcg):
paddle.distributed.all_reduce(
param.grad,
group=hcg.get_sharding_parallel_group(),
use_calc_stream=True)
sync_op=True)

elif _in_legacy_dygraph():
g_var = param._grad_ivar()
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def sync_params_buffers(model,
paddle.distributed.broadcast(coalesced_var,
src=src_rank,
group=comm_group,
use_calc_stream=True)
sync_op=True)

for coalesced_var, origin_vars, var_shapes in coalesced_vars:
var_len = [np.prod(v_shape) for v_shape in var_shapes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def get_model(self, main_prog, startup_program, rank):
shape=[10, 1000],
dtype='float32')
gp = paddle.distributed.new_group([0, 1])
paddle.distributed.all_reduce(tindata,
group=gp,
use_calc_stream=True)
paddle.distributed.all_reduce(tindata, group=gp, sync_op=True)
return [tindata]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_collective_alltoall_single(self):
output,
in_split_sizes,
out_split_sizes,
use_calc_stream=False,
sync_op=False,
group=group)
task.wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ def test_collective_reduce_scatter_base(self):
# [1, 2, 3, 4] # Rank-1

output = paddle.empty(shape=[2], dtype=input.dtype)
task = paddle.distributed.collective._reduce_scatter_base(
output, input, use_calc_stream=False)
task = paddle.distributed.collective._reduce_scatter_base(output,
input,
sync_op=False)

task.wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,29 @@ def test_all(self):
paddle.distributed.scatter(result, [self.tensor2, self.tensor1],
src=dp_src_rank,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
if dp_rank == 0:
assert np.array_equal(result, self.tensor2)
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test scatter api ok")

paddle.distributed.broadcast(result,
src=1,
group=dp_gp,
use_calc_stream=True)
paddle.distributed.broadcast(result, src=1, group=dp_gp, sync_op=True)
assert np.array_equal(result, self.tensor1)
print("test broadcast api ok")

paddle.distributed.reduce(result,
dst=dp_src_rank,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
if dp_rank == 0:
assert np.array_equal(result, paddle.add(self.tensor1,
self.tensor1))
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test reduce api ok")

paddle.distributed.all_reduce(result, use_calc_stream=True)
paddle.distributed.all_reduce(result, sync_op=True)
assert np.array_equal(
result,
paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1))
Expand All @@ -92,7 +89,7 @@ def test_all(self):
paddle.distributed.all_gather(result,
self.tensor1,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
assert np.array_equal(result[0], self.tensor1)
assert np.array_equal(result[1], self.tensor1)
print("test all_gather api ok")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,26 @@ def test_all(self):
paddle.distributed.scatter(result, [self.tensor2, self.tensor1],
src=0,
group=gp,
use_calc_stream=True)
sync_op=True)
if gp.rank == 0:
assert np.array_equal(result, self.tensor2)
elif gp.rank == 1:
assert np.array_equal(result, self.tensor1)
print("test scatter api ok")

paddle.distributed.broadcast(result,
src=1,
group=gp,
use_calc_stream=True)
paddle.distributed.broadcast(result, src=1, group=gp, sync_op=True)
assert np.array_equal(result, self.tensor1)
print("test broadcast api ok")

paddle.distributed.reduce(result, dst=0, group=gp, use_calc_stream=True)
paddle.distributed.reduce(result, dst=0, group=gp, sync_op=True)
if gp.rank == 0:
assert np.array_equal(result, paddle.add(self.tensor1,
self.tensor1))
elif gp.rank == 1:
assert np.array_equal(result, self.tensor1)
print("test reduce api ok")

paddle.distributed.all_reduce(result, use_calc_stream=True)
paddle.distributed.all_reduce(result, sync_op=True)
assert np.array_equal(
result,
paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1))
Expand All @@ -72,7 +69,7 @@ def test_all(self):
paddle.distributed.all_gather(result,
self.tensor1,
group=gp,
use_calc_stream=True)
sync_op=True)
assert np.array_equal(result[0], self.tensor1)
assert np.array_equal(result[1], self.tensor1)
print("test all_gather api ok")
Expand Down
Loading

0 comments on commit ae88cc5

Please sign in to comment.