Skip to content

Commit

Permalink
fix test_communicator_half_async random core;test=develop (PaddlePadd…
Browse files Browse the repository at this point in the history
  • Loading branch information
danleifeng committed Feb 27, 2024
1 parent b89066a commit dba9992
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 60 deletions.
38 changes: 38 additions & 0 deletions test/collective/fleet/run_server_for_communicator_half_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End

import paddle

paddle.enable_static()

pipe_name = os.getenv("PIPE_FILE")


class RunServer(TestCommunicatorHalfAsyncEnd2End):
def runTest(self):
pass


os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
half_run_server = RunServer()
with open(pipe_name, 'w') as pipe:
pipe.write('done')

half_run_server.run_ut()
118 changes: 58 additions & 60 deletions test/collective/fleet/test_communicator_half_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import subprocess
import sys
import tempfile
import unittest

import numpy
Expand All @@ -23,32 +24,52 @@
from paddle import base
from paddle.distributed import fleet
from paddle.distributed.fleet.base import role_maker
from paddle.distributed.utils.launch_utils import find_free_ports

paddle.enable_static()


class TestCommunicatorHalfAsyncEnd2End(unittest.TestCase):
def net(self):
x = paddle.static.data(name='x', shape=[-1, 13], dtype='float32')
y_predict = paddle.static.nn.fc(x, size=1, activation=None)
y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32')
x1 = paddle.static.data(
name='x1', shape=[-1, 1], dtype='int64', lod_level=1
)

emb = paddle.static.nn.embedding(
input=x1,
size=[10000, 10],
param_attr=base.ParamAttr(
name="embedding",
initializer=paddle.nn.initializer.Constant(value=0.01),
),
is_sparse=True,
)

pool = paddle.static.nn.sequence_lod.sequence_pool(
input=emb.squeeze(-2), pool_type="sum"
)
z = paddle.concat([x, pool], axis=1)

y_predict = paddle.static.nn.fc(x=z, size=1)
y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32')
cost = paddle.nn.functional.square_error_cost(input=y_predict, label=y)
avg_cost = paddle.mean(cost)
return avg_cost, x, y
return avg_cost, x, x1, y

def fake_reader(self):
def reader():
for i in range(10000):
x = numpy.random.random((1, 13)).astype('float32')
z = numpy.random.randint(0, 9999, (1, 1)).astype('int64')
y = numpy.random.randint(0, 2, (1, 1)).astype('int64')
yield x, y
yield x, z, y

return reader

def run_pserver(self, role, strategy):
fleet.init(role)
avg_cost, x, y = self.net()
avg_cost, x, z, y = self.net()
optimizer = paddle.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
Expand All @@ -61,102 +82,79 @@ def run_trainer(self, role, strategy):
exe = base.Executor(place)

fleet.init(role)
avg_cost, x, y = self.net()
avg_cost, x, z, y = self.net()
optimizer = paddle.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)

exe.run(paddle.static.default_startup_program())
exe.run(base.default_startup_program())
fleet.init_worker()

train_reader = paddle.batch(self.fake_reader(), batch_size=24)
feeder = base.DataFeeder(place=place, feed_list=[x, y])
feeder = base.DataFeeder(place=place, feed_list=[x, z, y])

for batch_id, data in enumerate(train_reader()):
exe.run(
paddle.static.default_main_program(),
base.default_main_program(),
feed=feeder.feed(data),
fetch_list=[],
)

fleet.stop_worker()

def run_ut(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True

training_role = os.getenv("TRAINING_ROLE", "TRAINER")

role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.WORKER
if training_role == "TRAINER"
else role_maker.Role.SERVER,
worker_num=1,
server_endpoints=["127.0.0.1:6002"],
)
os.environ["PADDLE_PSERVER_NUMS"] = "1"
os.environ["PADDLE_TRAINERS_NUM"] = "1"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "1"
os.environ["POD_IP"] = "127.0.0.1"

role = role_maker.PaddleCloudRoleMaker()

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True

if training_role == "TRAINER":
self.run_trainer(role, strategy)
else:
self.run_pserver(role, strategy)

def test_communicator(self):
run_server_cmd = """
temp_dir = tempfile.TemporaryDirectory()
pipe_name = os.path.join(temp_dir.name, 'mypipe')
try:
os.mkfifo(pipe_name)
except OSError as oe:
print(f"Failed to create pipe: {oe}")

import sys
import os
port = find_free_ports(1).pop()

import time
import threading
import subprocess
import unittest
import numpy
from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End
import paddle
import paddle.base as base
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
paddle.enable_static()
class RunServer(TestCommunicatorHalfAsyncEnd2End):
def runTest(self):
pass
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
os.environ["TRAINING_ROLE"] = "PSERVER"
half_run_server = RunServer()
half_run_server.run_ut()
"""

server_file = "run_server_for_communicator_haflaysnc.py"
with open(server_file, "w") as wb:
wb.write(run_server_cmd)
os.environ["TRAINING_ROLE"] = "PSERVER"
_python = sys.executable
os.environ["PADDLE_PORT"] = str(port)
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = f"127.0.0.1:{port}"
os.environ["PIPE_FILE"] = pipe_name

_python = sys.executable
server_file = "run_server_for_communicator_half_async.py"
ps_cmd = f"{_python} {server_file}"

ps_proc = subprocess.Popen(
ps_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
with open(pipe_name, 'r') as pipe:
start_command = pipe.read()

os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["FLAGS_communicator_send_queue_size"] = "1"
os.environ["FLAGS_communicator_max_merge_var_num"] = "1"

self.run_ut()
ps_proc.kill()

if os.path.exists(server_file):
os.remove(server_file)
ps_proc.wait()
outs, errs = ps_proc.communicate()


if __name__ == '__main__':
Expand Down

0 comments on commit dba9992

Please sign in to comment.