diff --git a/test/collective/fleet/run_server_for_communicator_half_async.py b/test/collective/fleet/run_server_for_communicator_half_async.py new file mode 100644 index 0000000000000..14d8fd80331b3 --- /dev/null +++ b/test/collective/fleet/run_server_for_communicator_half_async.py @@ -0,0 +1,38 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End + +import paddle + +paddle.enable_static() + +pipe_name = os.getenv("PIPE_FILE") + + +class RunServer(TestCommunicatorHalfAsyncEnd2End): + def runTest(self): + pass + + +os.environ["TRAINING_ROLE"] = "PSERVER" +os.environ["http_proxy"] = "" +os.environ["https_proxy"] = "" +half_run_server = RunServer() +with open(pipe_name, 'w') as pipe: + pipe.write('done') + +half_run_server.run_ut() diff --git a/test/collective/fleet/test_communicator_half_async.py b/test/collective/fleet/test_communicator_half_async.py index 25e5302fb444f..687337f25ab2a 100644 --- a/test/collective/fleet/test_communicator_half_async.py +++ b/test/collective/fleet/test_communicator_half_async.py @@ -15,6 +15,7 @@ import os import subprocess import sys +import tempfile import unittest import numpy @@ -23,6 +24,7 @@ from paddle import base from paddle.distributed import fleet from paddle.distributed.fleet.base import role_maker +from paddle.distributed.utils.launch_utils import find_free_ports paddle.enable_static() @@ -30,25 +32,44 @@ class TestCommunicatorHalfAsyncEnd2End(unittest.TestCase): def net(self): x = paddle.static.data(name='x', shape=[-1, 13], dtype='float32') - y_predict = paddle.static.nn.fc(x, size=1, activation=None) - y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32') + x1 = paddle.static.data( + name='x1', shape=[-1, 1], dtype='int64', lod_level=1 + ) + emb = paddle.static.nn.embedding( + input=x1, + size=[10000, 10], + param_attr=base.ParamAttr( + name="embedding", + initializer=paddle.nn.initializer.Constant(value=0.01), + ), + is_sparse=True, + ) + + pool = paddle.static.nn.sequence_lod.sequence_pool( + input=emb.squeeze(-2), pool_type="sum" + ) + z = paddle.concat([x, pool], axis=1) + + y_predict = paddle.static.nn.fc(x=z, size=1) + y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32') cost = paddle.nn.functional.square_error_cost(input=y_predict, label=y) avg_cost = paddle.mean(cost) - return avg_cost, x, y + return avg_cost, x, x1, y def fake_reader(self): def reader(): for i in range(10000): x = numpy.random.random((1, 13)).astype('float32') + z = numpy.random.randint(0, 9999, (1, 1)).astype('int64') y = numpy.random.randint(0, 2, (1, 1)).astype('int64') - yield x, y + yield x, z, y return reader def run_pserver(self, role, strategy): fleet.init(role) - avg_cost, x, y = self.net() + avg_cost, x, z, y = self.net() optimizer = paddle.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) @@ -61,20 +82,20 @@ def run_trainer(self, role, strategy): exe = base.Executor(place) fleet.init(role) - avg_cost, x, y = self.net() + avg_cost, x, z, y = self.net() optimizer = paddle.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - exe.run(paddle.static.default_startup_program()) + exe.run(base.default_startup_program()) fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) - feeder = base.DataFeeder(place=place, feed_list=[x, y]) + feeder = base.DataFeeder(place=place, feed_list=[x, z, y]) for batch_id, data in enumerate(train_reader()): exe.run( - paddle.static.default_main_program(), + base.default_main_program(), feed=feeder.feed(data), fetch_list=[], ) @@ -82,19 +103,18 @@ def run_trainer(self, role, strategy): fleet.stop_worker() def run_ut(self): - strategy = paddle.distributed.fleet.DistributedStrategy() - strategy.a_sync = True - training_role = os.getenv("TRAINING_ROLE", "TRAINER") - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.WORKER - if training_role == "TRAINER" - else role_maker.Role.SERVER, - worker_num=1, - server_endpoints=["127.0.0.1:6002"], - ) + os.environ["PADDLE_PSERVER_NUMS"] = "1" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["POD_IP"] = "127.0.0.1" + + role = role_maker.PaddleCloudRoleMaker() + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True if training_role == "TRAINER": self.run_trainer(role, strategy) @@ -102,61 +122,39 @@ def run_ut(self): self.run_pserver(role, strategy) def test_communicator(self): - run_server_cmd = """ + temp_dir = tempfile.TemporaryDirectory() + pipe_name = os.path.join(temp_dir.name, 'mypipe') + try: + os.mkfifo(pipe_name) + except OSError as oe: + print(f"Failed to create pipe: {oe}") -import sys -import os + port = find_free_ports(1).pop() -import time -import threading -import subprocess -import unittest -import numpy - -from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End - -import paddle -import paddle.base as base -import paddle.distributed.fleet as fleet -import paddle.distributed.fleet.base.role_maker as role_maker - -paddle.enable_static() - -class RunServer(TestCommunicatorHalfAsyncEnd2End): - def runTest(self): - pass - -os.environ["http_proxy"] = "" -os.environ["https_proxy"] = "" -os.environ["TRAINING_ROLE"] = "PSERVER" -half_run_server = RunServer() -half_run_server.run_ut() -""" - - server_file = "run_server_for_communicator_haflaysnc.py" - with open(server_file, "w") as wb: - wb.write(run_server_cmd) os.environ["TRAINING_ROLE"] = "PSERVER" - _python = sys.executable + os.environ["PADDLE_PORT"] = str(port) + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = f"127.0.0.1:{port}" + os.environ["PIPE_FILE"] = pipe_name + _python = sys.executable + server_file = "run_server_for_communicator_half_async.py" ps_cmd = f"{_python} {server_file}" + ps_proc = subprocess.Popen( ps_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - os.environ["http_proxy"] = "" - os.environ["https_proxy"] = "" + with open(pipe_name, 'r') as pipe: + start_command = pipe.read() + os.environ["TRAINING_ROLE"] = "TRAINER" - os.environ["FLAGS_communicator_send_queue_size"] = "1" - os.environ["FLAGS_communicator_max_merge_var_num"] = "1" self.run_ut() ps_proc.kill() - - if os.path.exists(server_file): - os.remove(server_file) + ps_proc.wait() + outs, errs = ps_proc.communicate() if __name__ == '__main__':