Skip to content

Commit

Permalink
[with_data_parallel][part3] remove with_data_parallel in unit test (#…
Browse files Browse the repository at this point in the history
…50568)

* remove with_data_parallel in unittest

* fix CI

* remove comment

* trigger CI

* revert part changes

* test_build_strategy_fusion_group_pass
  • Loading branch information
kangguangli committed Feb 22, 2023
1 parent 499b7f8 commit b958fa7
Show file tree
Hide file tree
Showing 31 changed files with 129 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ def _run(
exe.run(startup_program)

if use_compiled:
main_program = paddle.static.CompiledProgram(
main_program
).with_data_parallel(fetch_vars[0].name, places=[self.place])
main_program = paddle.static.CompiledProgram(main_program)

if use_str: # test for fetch name
fetch_vars = [x.name for x in fetch_vars]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_train_enable_fusion_group(self):
rnn_model = "static"
config = RNNConfig("test", rnn_model)
with fluid.scope_guard(fluid.Scope()):
self.train(config, parallel=True, use_program_cache=False)
self.train(config, use_program_cache=False)


if __name__ == '__main__':
Expand Down
63 changes: 0 additions & 63 deletions python/paddle/fluid/tests/unittests/test_compiled_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,30 +72,6 @@ def test_compiled_program_base(self):
)
np.testing.assert_array_equal(loss_data[0], self.loss)

def test_compiled_program_with_data_parallel(self):
with new_program_scope():
paddle.seed(self.seed)
paddle.framework.random._manual_program_seed(self.seed)
place = (
fluid.CUDAPlace(0)
if core.is_compiled_with_cuda()
else fluid.CPUPlace()
)
exe = fluid.Executor(place)

loss = simple_fc_net()
exe.run(fluid.default_startup_program())
compiled_prog = fluid.CompiledProgram(
fluid.default_main_program()
).with_data_parallel(loss_name=loss.name, places=[place])

(loss_data,) = exe.run(
compiled_prog,
feed={"image": self.img, "label": self.label},
fetch_list=[loss.name],
)
np.testing.assert_array_equal(loss_data[0], self.loss)


class TestCompiledProgramError(unittest.TestCase):
def test_program_or_graph_error(self):
Expand All @@ -112,17 +88,6 @@ def build_simple_model(self):
)
avg_loss = paddle.mean(loss)

def compile_program_not_compiled(self):
with fluid.program_guard(fluid.Program()):
# build model
self.build_simple_model()
# compile program
program = fluid.default_main_program()
compiled_program = fluid.CompiledProgram(
program
).with_data_parallel()
return compiled_program

def compile_program(self):
with fluid.program_guard(fluid.Program()):
# build model
Expand All @@ -149,34 +114,6 @@ def test_compile_place_error(self):
with self.assertRaises(ValueError):
compiled_program._compile(scope, new_place)

def test_share_vars_from_error_no_parallel(self):
with fluid.program_guard(fluid.Program()):
source_program, _, _ = self.compile_program()
self.build_simple_model()
# compile program
program = fluid.default_main_program()
compiled_program = fluid.CompiledProgram(
program
).with_data_parallel(share_vars_from=source_program)
scope = fluid.global_scope()
place = fluid.CPUPlace()
with self.assertRaises(ValueError):
compiled_program._compile(scope, place)

def test_share_vars_from_error_no_executor(self):
with fluid.program_guard(fluid.Program()):
source_program = self.compile_program_not_compiled()
self.build_simple_model()
# compile program
program = fluid.default_main_program()
compiled_program = fluid.CompiledProgram(
program
).with_data_parallel(share_vars_from=source_program)
scope = fluid.global_scope()
place = fluid.CPUPlace()
with self.assertRaises(ValueError):
compiled_program._compile(scope, place)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ def cuda_graph_static_graph_main(self, seed, use_cuda_graph):
build_strategy.fix_op_run_order = True
build_strategy.fuse_all_optimizer_ops = True
compiled_program = paddle.static.CompiledProgram(
main
).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy, places=place
main, build_strategy=build_strategy
)
image_t = scope.var(image.name).get_tensor()
label_t = scope.var(label.name).get_tensor()
Expand Down
10 changes: 5 additions & 5 deletions python/paddle/fluid/tests/unittests/test_dataset_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ def check_batch_number(self, place, randomize_batch_num=False):
dataset._set_batch_size(BATCH_SIZE)

if isinstance(place, fluid.CPUPlace):
file_num = 10
file_num = 1
os.environ['CPU_NUM'] = str(file_num)
places = fluid.cpu_places()
places = [fluid.CPUPlace()]
use_cuda = False
else:
file_num = fluid.core.get_cuda_device_count()
places = fluid.cuda_places()
file_num = 1
places = [fluid.CUDAPlace(0)]
use_cuda = True

filelist = []
Expand Down Expand Up @@ -145,7 +145,7 @@ def check_batch_number(self, place, randomize_batch_num=False):
dataloader = fluid.io.DataLoader.from_dataset(
dataset=dataset, places=places, drop_last=self.drop_last
)
prog = fluid.CompiledProgram(main_prog).with_data_parallel()
prog = fluid.CompiledProgram(main_prog)
exe = fluid.Executor(place)

exe.run(startup_prog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
os.environ['FLAGS_use_mkldnn'] = '0'
os.environ['CPU_NUM'] = '4'

import multiprocessing
import unittest
from functools import reduce

Expand Down Expand Up @@ -82,13 +81,6 @@ def test_executor_main(self):
with fluid.unique_name.guard():
self.executor_main()

for p in places:
self.place = p
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.scope_guard(fluid.Scope()):
with fluid.unique_name.guard():
self.pe_main()

def prepare_feed(self, image, label, dev_cnt=1):
batch_size = 32 * dev_cnt
image_shape = (batch_size,) + tuple(image.shape[1:])
Expand Down Expand Up @@ -179,48 +171,6 @@ def executor_main(self):
fluid.global_scope(), persistables, non_persistables
)

def pe_main(self):
image, label, loss = simple_fc_net()
loss.persistable = False
persistables, non_persistables = get_persistables_and_non_persistables(
fluid.default_main_program(), [loss.name]
)
self.assert_gc_vars(
fluid.default_main_program(), [loss.name], non_persistables
)

exe = fluid.Executor(self.place)
exe.run(fluid.default_startup_program())

exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_iteration_per_drop_scope = 100

build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = False
build_strategy.enable_inplace = False

prog = fluid.CompiledProgram(
fluid.default_main_program()
).with_data_parallel(loss_name=loss.name, exec_strategy=exec_strategy)

dev_cnt = (
fluid.core.get_cuda_device_count()
if isinstance(self.place, fluid.CUDAPlace)
else int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
)

for idx in range(10):
image_np, label_np = self.prepare_feed(image, label, dev_cnt)
feed = {image.name: image_np, label.name: label_np}

exe.run(program=prog, feed=feed, fetch_list=[loss])

local_scopes = prog._local_scopes
for scope in local_scopes:
kids = scope._kids()
self.assertTrue(len(kids) == 1)
self.assertScopeVar(kids[0], persistables, non_persistables)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,13 @@
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid import compiler


def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2):
def train(network, use_cuda, batch_size=32, pass_num=2):
if use_cuda and not core.is_compiled_with_cuda():
print('Skip use_cuda=True because Paddle is not compiled with cuda')
return

if use_parallel_executor and os.name == 'nt':
print(
'Skip use_parallel_executor=True because Paddle comes without parallel support on windows'
)
return

word_dict_size = 5147
reader = fake_imdb_reader(word_dict_size, batch_size * 40)
train_reader = paddle.batch(reader, batch_size=batch_size)
Expand All @@ -54,23 +47,15 @@ def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2):

place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=[data, label], place=place)
reader = feeder.decorate_reader(
train_reader, multi_devices=use_parallel_executor
)
reader = feeder.decorate_reader(train_reader, multi_devices=False)

exe = fluid.Executor(place)
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
exe.run(fluid.default_startup_program())

train_cp = fluid.default_main_program()
if use_parallel_executor:
train_cp = compiler.CompiledProgram(
fluid.default_main_program()
).with_data_parallel(loss_name=cost.name)
fetch_list = [cost.name]
else:
fetch_list = [cost]
fetch_list = [cost]

for pass_id in range(pass_num):
batch_id = 0
Expand All @@ -94,12 +79,9 @@ def test_network(self):
return

for use_cuda in [True, False]:
for use_parallel_executor in [False, True]:
print(
'network: {}, use_cuda: {}, use_parallel_executor: {}'.format(
self.net.__name__, use_cuda, use_parallel_executor
)
)
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.scope_guard(core.Scope()):
train(self.net, use_cuda, use_parallel_executor)
print(
'network: {}, use_cuda: {}'.format(self.net.__name__, use_cuda)
)
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.scope_guard(core.Scope()):
train(self.net, use_cuda)
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def set_customed_config(self):
# You can override the function to set your own config.
pass

def _prepare_program(self, config, parallel=True):
def _prepare_program(self, config):
paddle.seed(config.random_seed)
self.main_program = fluid.Program()
self.startup_program = fluid.Program()
Expand Down Expand Up @@ -517,16 +517,7 @@ def _prepare_program(self, config, parallel=True):

self.exe.run(self.startup_program)

if parallel:
self.train_program = fluid.compiler.CompiledProgram(
self.main_program
).with_data_parallel(
loss_name=self.loss.name,
build_strategy=self.build_strategy,
exec_strategy=self.exec_strategy,
)
else:
self.train_program = self.main_program
self.train_program = self.main_program

def _generate_init_data(self):
init_hidden = np.zeros(
Expand Down Expand Up @@ -621,29 +612,27 @@ def _train_an_epoch(self, epoch_id, use_program_cache=True):
ppl = np.append(ppl, batch_ppl)
return ppl

def train(self, config, parallel=True, use_program_cache=True):
def train(self, config, use_program_cache=True):
self.set_customed_config()

self.config = config
self._prepare_program(config, parallel)
self._prepare_program(config)
ppl = np.zeros(shape=(0, config.batch_size))
for epoch_id in range(config.max_epoch):
train_ppl = self._train_an_epoch(epoch_id, use_program_cache)
ppl = np.append(ppl, train_ppl)
return ppl

def compare_padding_static_mode(
self, parallel=True, use_program_cache=True
):
def compare_padding_static_mode(self, use_program_cache=True):
'''
Test that train ppl of padding mode is same to that of static graph mode
'''
config = RNNConfig('test', 'padding')
with fluid.scope_guard(fluid.Scope()):
padding_rnn_ppl = self.train(config, parallel, use_program_cache)
padding_rnn_ppl = self.train(config, use_program_cache)
config = RNNConfig('test', 'static')
with fluid.scope_guard(fluid.Scope()):
static_rnn_ppl = self.train(config, parallel, use_program_cache)
static_rnn_ppl = self.train(config, use_program_cache)
np.testing.assert_allclose(padding_rnn_ppl, static_rnn_ppl, rtol=0.001)


Expand All @@ -654,15 +643,15 @@ def test_padding_mode_no_eager_deletion(self):
'''
fluid.core._set_eager_deletion_mode(-1.0, 1.0, True)
# When parallel is True, use_program_cache does not make a difference.
self.compare_padding_static_mode(parallel=True, use_program_cache=True)
self.compare_padding_static_mode(use_program_cache=True)

def test_padding_mode_eager_deletion(self):
'''
Test that train ppl of padding mode is same to that of static graph mode under eager deletion
'''
fluid.core._set_eager_deletion_mode(0.0, 1.0, True)
# When parallel is True, use_program_cache does not make a difference.
self.compare_padding_static_mode(parallel=True, use_program_cache=True)
self.compare_padding_static_mode(use_program_cache=True)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit b958fa7

Please sign in to comment.