Skip to content

Commit

Permalink
dont wait for send op under dygraph pp (#46209)
Browse files Browse the repository at this point in the history
  • Loading branch information
FeixLiu committed Sep 20, 2022
1 parent 37ba18b commit 8ff7df8
Showing 1 changed file with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,21 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
for d in tensor_send_prev:
if _in_legacy_dygraph():
paddle.distributed.wait(d, use_calc_stream=True)
tasks.append(
send_partial(d,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False))
else:
if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
tasks.append(
send_partial(tensor_send_prev,
send_partial(d,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False))
use_calc_stream=False)
else:
if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
send_partial(tensor_send_prev,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False)

if tensor_recv_prev is not None:
if isinstance(tensor_recv_prev, tuple):
Expand All @@ -371,23 +369,21 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
for d in tensor_send_next:
if _in_legacy_dygraph():
paddle.distributed.wait(d, use_calc_stream=True)
tasks.append(
send_partial(d,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False))
else:
if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
tasks.append(
send_partial(tensor_send_next,
send_partial(d,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False))
use_calc_stream=False)
else:
if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
send_partial(tensor_send_next,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False)

if tensor_recv_next is not None:
if isinstance(tensor_recv_next, tuple):
Expand Down Expand Up @@ -438,10 +434,11 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
group=mp_group,
use_calc_stream=True))

for task in tasks:
# wait partial all gather tasks
if task is not None:
task.wait()
if in_dygraph_mode():
for task in tasks:
# wait partial all gather tasks
if task is not None:
task.wait()

return tensor_recv_prev, tensor_recv_next

Expand Down

0 comments on commit 8ff7df8

Please sign in to comment.