From 497321a375d312c2e34ca5c560877d36913d944d Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Mon, 12 Feb 2018 17:32:55 -0800 Subject: [PATCH 01/21] Adding Python boilerplate code for Go op --- python/paddle/v2/fluid/__init__.py | 3 ++- python/paddle/v2/fluid/concurrency.py | 16 ++++++++++++++++ python/paddle/v2/fluid/tests/test_concurrency.py | 13 +++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 python/paddle/v2/fluid/concurrency.py create mode 100644 python/paddle/v2/fluid/tests/test_concurrency.py diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 9f710c4a4aa72..7762bd4eb680d 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -29,6 +29,7 @@ import learning_rate_decay import backward import regularizer +import concurrency from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace @@ -40,7 +41,7 @@ Tensor = LoDTensor -__all__ = framework.__all__ + executor.__all__ + [ +__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [ 'io', 'initializer', 'layers', diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py new file mode 100644 index 0000000000000..2acb1c5b61c89 --- /dev/null +++ b/python/paddle/v2/fluid/concurrency.py @@ -0,0 +1,16 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Variables: make_channel +# TODO: Operators: send, close_channel, recv, go, select diff --git a/python/paddle/v2/fluid/tests/test_concurrency.py b/python/paddle/v2/fluid/tests/test_concurrency.py new file mode 100644 index 0000000000000..eca2dce114b06 --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_concurrency.py @@ -0,0 +1,13 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. From f2a95fafacb8e9d60a908f579f4793466c83786f Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Mon, 12 Feb 2018 17:54:31 -0800 Subject: [PATCH 02/21] Add very basic test case --- .../paddle/v2/fluid/tests/test_concurrency.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/python/paddle/v2/fluid/tests/test_concurrency.py b/python/paddle/v2/fluid/tests/test_concurrency.py index eca2dce114b06..4ac4fbccd515d 100644 --- a/python/paddle/v2/fluid/tests/test_concurrency.py +++ b/python/paddle/v2/fluid/tests/test_concurrency.py @@ -11,3 +11,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import unittest +import numpy as np +import paddle.v2.fluid.layers as layers +import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core +from paddle.v2.fluid.executor import Executor + + +class TestRoutineOp(unittest.TestCase): + def test_simple_routine(self): + counter = layers.zeros(shape=[1], dtype='int64') + counter = layers.increment(counter) + + routine_op = fluid.Routine() + with routine_op.block(): + counter = layers.increment(counter) + + cpu = core.CPUPlace() + exe = Executor(cpu) + + outs = exe.run(fetch_list=[counter]) + self.assertEqual(2, np.sum(outs[0])) + + +if __name__ == '__main__': + unittest.main() From d785ca9949d9d7dc057a0547236ea881886f87ee Mon Sep 17 00:00:00 2001 From: Kavya Srinet Date: Mon, 12 Feb 2018 18:01:55 -0800 Subject: [PATCH 03/21] Adding the python logic for go routine --- python/paddle/v2/fluid/concurrency.py | 59 +++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index 2acb1c5b61c89..f62e1caa48cc2 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -14,3 +14,62 @@ # TODO: Variables: make_channel # TODO: Operators: send, close_channel, recv, go, select +from layers.control_flow import BlockGuard +from layer_helper import LayerHelper + + +class RoutineGuard(BlockGuard): + def __init__(self, routine_op): + if not isinstance(routine_op, Routine): + raise TypeError("RoutineGuard takes a routine op") + super(RoutineGuard, self).__init__(routine_op.helper.main_program) + self.routine_op = routine_op + + def __enter__(self): + return super(RoutineGuard, self).__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + self.routine_op.complete() + return super(RoutineGuard, self).__exit__(exc_type, exc_val, exc_tb) + + +class Routine(object): + + def __init__(self, name=None): + self.helper = LayerHelper("routine", name=name) + + def block(self): + return RoutineGuard(self) + + def complete(self): + main_program = self.helper.main_program + routine_block = main_program.current_block() + parent_block = main_program.block(main_program.current_block() + .parent_idx) + + x_name_list = set() + out_vars = set() + for op in routine_block.ops: + # Iterate over all operators, get all the inputs + # and add as input to the routine operator. + for iname in op.input_names: + for in_var_name in op.input(iname): + x_name_list.add(in_var_name) + + # Iterate over all operators , get all the outputs + # add to the output list of routine operator only if + # they exist in the parent block. + for oname in op.output_names: + for out_var_name in op.output(oname): + if out_var_name in parent_block.vars: + out_vars.add(parent_block.var(out_var_name)) + + parent_block.append_op( + type='routine', + inputs={ + 'X': [parent_block.var(x_name) for x_name in x_name_list] + }, + outputs={'Out': out_vars, + attrs={'sub_block': routine_block}) From 313ffb0382db8504dae58ba32f3468776e363b81 Mon Sep 17 00:00:00 2001 From: Kavya Srinet Date: Mon, 12 Feb 2018 18:04:23 -0800 Subject: [PATCH 04/21] Fix syntax --- python/paddle/v2/fluid/concurrency.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index f62e1caa48cc2..dc462c2d08d54 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + # TODO: Variables: make_channel # TODO: Operators: send, close_channel, recv, go, select from layers.control_flow import BlockGuard @@ -71,5 +72,5 @@ def complete(self): inputs={ 'X': [parent_block.var(x_name) for x_name in x_name_list] }, - outputs={'Out': out_vars, + outputs={'Out': out_vars}, attrs={'sub_block': routine_block}) From a43924e217d86e5b4e4818412d73e1cd1f471942 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Tue, 13 Feb 2018 10:05:05 -0800 Subject: [PATCH 05/21] Changing test to notest --- .../v2/fluid/tests/{test_concurrency.py => notest_concurrency.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/paddle/v2/fluid/tests/{test_concurrency.py => notest_concurrency.py} (100%) diff --git a/python/paddle/v2/fluid/tests/test_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py similarity index 100% rename from python/paddle/v2/fluid/tests/test_concurrency.py rename to python/paddle/v2/fluid/tests/notest_concurrency.py From c97cf4ae9033aa6165268dd3abde62490354b54a Mon Sep 17 00:00:00 2001 From: Kavya Srinet Date: Tue, 13 Feb 2018 10:15:04 -0800 Subject: [PATCH 06/21] Rename Routine to Go --- python/paddle/v2/fluid/concurrency.py | 36 +++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index dc462c2d08d54..efe6f0014d31a 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -19,48 +19,48 @@ from layer_helper import LayerHelper -class RoutineGuard(BlockGuard): - def __init__(self, routine_op): - if not isinstance(routine_op, Routine): - raise TypeError("RoutineGuard takes a routine op") - super(RoutineGuard, self).__init__(routine_op.helper.main_program) - self.routine_op = routine_op +class GoGuard(BlockGuard): + def __init__(self, go_op): + if not isinstance(go_op, Go): + raise TypeError("GoGuard takes a go op") + super(GoGuard, self).__init__(go_op.helper.main_program) + self.go_op = go_op def __enter__(self): - return super(RoutineGuard, self).__enter__() + return super(GoGuard, self).__enter__() def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: return False - self.routine_op.complete() - return super(RoutineGuard, self).__exit__(exc_type, exc_val, exc_tb) + self.go_op.complete() + return super(GoGuard, self).__exit__(exc_type, exc_val, exc_tb) -class Routine(object): +class Go(object): def __init__(self, name=None): - self.helper = LayerHelper("routine", name=name) + self.helper = LayerHelper("go", name=name) def block(self): - return RoutineGuard(self) + return GoGuard(self) def complete(self): main_program = self.helper.main_program - routine_block = main_program.current_block() + go_block = main_program.current_block() parent_block = main_program.block(main_program.current_block() .parent_idx) x_name_list = set() out_vars = set() - for op in routine_block.ops: + for op in go_block.ops: # Iterate over all operators, get all the inputs - # and add as input to the routine operator. + # and add as input to the Go operator. for iname in op.input_names: for in_var_name in op.input(iname): x_name_list.add(in_var_name) # Iterate over all operators , get all the outputs - # add to the output list of routine operator only if + # add to the output list of Go operator only if # they exist in the parent block. for oname in op.output_names: for out_var_name in op.output(oname): @@ -68,9 +68,9 @@ def complete(self): out_vars.add(parent_block.var(out_var_name)) parent_block.append_op( - type='routine', + type='go', inputs={ 'X': [parent_block.var(x_name) for x_name in x_name_list] }, outputs={'Out': out_vars}, - attrs={'sub_block': routine_block}) + attrs={'sub_block': go_block}) From fc2593cd17a7bff2a38b497aa5e22241304d015f Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Tue, 13 Feb 2018 10:34:45 -0800 Subject: [PATCH 07/21] Combining GoGuard and Go in one class --- python/paddle/v2/fluid/concurrency.py | 34 +++++++++------------------ 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index efe6f0014d31a..d5085fb8c7e9d 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -12,39 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. - # TODO: Variables: make_channel # TODO: Operators: send, close_channel, recv, go, select from layers.control_flow import BlockGuard from layer_helper import LayerHelper +__all__ = ['Go'] -class GoGuard(BlockGuard): - def __init__(self, go_op): - if not isinstance(go_op, Go): - raise TypeError("GoGuard takes a go op") - super(GoGuard, self).__init__(go_op.helper.main_program) - self.go_op = go_op + +class Go(BlockGuard): + def __init__(self, name=None): + self.helper = LayerHelper("go", name=name) + super(Go, self).__init__(self.helper.main_program) def __enter__(self): - return super(GoGuard, self).__enter__() + super(Go, self).__enter__() def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: return False - self.go_op.complete() - return super(GoGuard, self).__exit__(exc_type, exc_val, exc_tb) - - -class Go(object): - - def __init__(self, name=None): - self.helper = LayerHelper("go", name=name) - - def block(self): - return GoGuard(self) + self.construct_go_op() + return super(Go, self).__exit__(exc_type, exc_val, exc_tb) - def complete(self): + def construct_go_op(self): main_program = self.helper.main_program go_block = main_program.current_block() parent_block = main_program.block(main_program.current_block() @@ -69,8 +59,6 @@ def complete(self): parent_block.append_op( type='go', - inputs={ - 'X': [parent_block.var(x_name) for x_name in x_name_list] - }, + inputs={'X': [parent_block.var(x_name) for x_name in x_name_list]}, outputs={'Out': out_vars}, attrs={'sub_block': go_block}) From 8343a35361e48499adbed6796633e978960aab85 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Tue, 13 Feb 2018 11:20:30 -0800 Subject: [PATCH 08/21] Modify test --- .../paddle/v2/fluid/tests/notest_concurrency.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/paddle/v2/fluid/tests/notest_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py index 4ac4fbccd515d..b648902bea55c 100644 --- a/python/paddle/v2/fluid/tests/notest_concurrency.py +++ b/python/paddle/v2/fluid/tests/notest_concurrency.py @@ -13,8 +13,6 @@ # limitations under the License. import unittest -import numpy as np -import paddle.v2.fluid.layers as layers import paddle.v2.fluid as fluid import paddle.v2.fluid.core as core from paddle.v2.fluid.executor import Executor @@ -22,18 +20,17 @@ class TestRoutineOp(unittest.TestCase): def test_simple_routine(self): - counter = layers.zeros(shape=[1], dtype='int64') - counter = layers.increment(counter) + ch = fluid.make_channel(dtype=bool) + with fluid.Go(): + fluid.send(ch, True) - routine_op = fluid.Routine() - with routine_op.block(): - counter = layers.increment(counter) + result = fluid.recv(ch) cpu = core.CPUPlace() exe = Executor(cpu) - outs = exe.run(fetch_list=[counter]) - self.assertEqual(2, np.sum(outs[0])) + outs = exe.run(fetch_list=[result]) + self.assertEqual(outs[0], True) if __name__ == '__main__': From d0e26f79f26aa4049efb8726ca2268083577aac9 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Tue, 13 Feb 2018 11:23:04 -0800 Subject: [PATCH 09/21] Adding fluid close channel --- python/paddle/v2/fluid/tests/notest_concurrency.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/paddle/v2/fluid/tests/notest_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py index b648902bea55c..b2ef982d45979 100644 --- a/python/paddle/v2/fluid/tests/notest_concurrency.py +++ b/python/paddle/v2/fluid/tests/notest_concurrency.py @@ -25,6 +25,7 @@ def test_simple_routine(self): fluid.send(ch, True) result = fluid.recv(ch) + fluid.close_channel(ch) cpu = core.CPUPlace() exe = Executor(cpu) From a37a1e18fe071e02180133a393afd97f596e59d6 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Tue, 13 Feb 2018 11:51:35 -0800 Subject: [PATCH 10/21] Fixing __init__.py for calling fluid.go() --- python/paddle/v2/fluid/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 7762bd4eb680d..24580e90434f4 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -29,12 +29,12 @@ import learning_rate_decay import backward import regularizer -import concurrency from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler from distribute_transpiler_simple import SimpleDistributeTranspiler +from concurrency import * import clip from memory_optimization_transpiler import memory_optimize import profiler From f5f6370509b845a482d358dadbd244c16ee866cf Mon Sep 17 00:00:00 2001 From: Kavya Srinet Date: Tue, 13 Feb 2018 11:53:19 -0800 Subject: [PATCH 11/21] Adding stubs for channel methods and updating test case --- python/paddle/v2/fluid/concurrency.py | 24 ++++++++++++++++++- .../v2/fluid/tests/notest_concurrency.py | 6 ++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index d5085fb8c7e9d..5f868b6e86d34 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -17,7 +17,13 @@ from layers.control_flow import BlockGuard from layer_helper import LayerHelper -__all__ = ['Go'] +__all__ = [ + 'Go', + 'make_channel', + 'channel_send', + 'channel_recv', + 'channel_close', +] class Go(BlockGuard): @@ -62,3 +68,19 @@ def construct_go_op(self): inputs={'X': [parent_block.var(x_name) for x_name in x_name_list]}, outputs={'Out': out_vars}, attrs={'sub_block': go_block}) + + +def make_channel(dtype, size=0): + return True + + +def channel_send(channel, value): + return True + + +def channel_recv(channel): + return True + + +def channel_close(channel): + return True diff --git a/python/paddle/v2/fluid/tests/notest_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py index b2ef982d45979..9d87ed9c07364 100644 --- a/python/paddle/v2/fluid/tests/notest_concurrency.py +++ b/python/paddle/v2/fluid/tests/notest_concurrency.py @@ -22,10 +22,10 @@ class TestRoutineOp(unittest.TestCase): def test_simple_routine(self): ch = fluid.make_channel(dtype=bool) with fluid.Go(): - fluid.send(ch, True) + fluid.channel_send(ch, True) - result = fluid.recv(ch) - fluid.close_channel(ch) + result = fluid.channel_recv(ch) + fluid.channel_close(ch) cpu = core.CPUPlace() exe = Executor(cpu) From 323a2e257fa9c8300a96c7e36bb2fe4f115dbeca Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Thu, 15 Feb 2018 09:30:06 -0800 Subject: [PATCH 12/21] Removing import * --- python/paddle/v2/fluid/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 24580e90434f4..6b85915573419 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -34,7 +34,7 @@ from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler from distribute_transpiler_simple import SimpleDistributeTranspiler -from concurrency import * +from concurrency import Go import clip from memory_optimization_transpiler import memory_optimize import profiler From 47f9680246d471cb5598801ab09ab62dfcc661be Mon Sep 17 00:00:00 2001 From: Kavya Srinet Date: Thu, 15 Feb 2018 10:36:33 -0800 Subject: [PATCH 13/21] Adding imports from concurrency --- python/paddle/v2/fluid/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 6b85915573419..361fb3f5ad939 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -34,7 +34,8 @@ from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler from distribute_transpiler_simple import SimpleDistributeTranspiler -from concurrency import Go +from concurrency import (Go, make_channel, channel_send, channel_recv, + channel_close) import clip from memory_optimization_transpiler import memory_optimize import profiler From 802eb42c05d4572e664cad09df4fea9614012867 Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Thu, 15 Feb 2018 13:41:01 -0800 Subject: [PATCH 14/21] Initial commit of GO_OP (for varun) --- paddle/fluid/operators/go_op.cc | 100 ++++++++++++++++++++++++++ python/paddle/v2/fluid/__init__.py | 3 +- python/paddle/v2/fluid/concurrency.py | 2 +- python/paddle/v2/fluid/framework.py | 2 +- 4 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 paddle/fluid/operators/go_op.cc diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc new file mode 100644 index 0000000000000..721d88b4a9a46 --- /dev/null +++ b/paddle/fluid/operators/go_op.cc @@ -0,0 +1,100 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" + +namespace paddle { +namespace operators { + +static constexpr char kBlock[] = "sub_block"; +static constexpr char kX[] = "X"; +static constexpr char kOutputs[] = "Out"; + +class GoOp : public framework::OperatorBase { + public: + GoOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + private: + void TestFunc123() { + std::cout << "THIS IS MY TEST THREAD" << std::endl; + } + + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + framework::Executor executor(dev_place); + auto *block = Attr(kBlock); + auto *program = block->Program(); + auto ¤t_scope = scope.NewScope(); +// std::thread go_thread(&GoOp::ExecuteOnThread, &executor, +// current_scope, program, block); +// std::thread th_(&GoOp::TestFunc123, this); +// th_.join(); +// + std::thread send_thread = std::thread{[&]() { +// for (int i=0; i < 500; ++i) { +// std::cout << "THIS IS MY TEST THREAD" << i << std::endl; +// usleep(1000000); +// } + executor.Run(*program, ¤t_scope, block->ID(), + false /*create_local_scope*/); + }}; + std::cout << "OUTSIDE THREAD" << std::endl; + send_thread.detach(); + } +// +// void ExecuteOnThread(framework::Executor* executor, +// framework::Scope *current_scope, +// framework::ProgramDesc *program, +// framework::BlockDesc *block) { +// executor->Run(*program, current_scope, block->ID(), +// false /*create_local_scope*/); +// } +}; + + +class GoOpMaker : public framework::OpProtoAndCheckerMaker { + public: + GoOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(kX, + "A set of variables, which are required by operators inside the " + "block of Go Op.") + .AsDuplicable(); + AddOutput(kOutputs, + "A set of variables, which will be assigned with values " + "generated by the operators inside the block of Go Op.") + .AsDuplicable(); + AddAttr(kBlock, + "The block inside GoOp"); + AddComment(R"DOC( +)DOC"); + } +}; + +// TODO(thuan): Look into Gradient Operator for GO_OP + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(go, paddle::operators::GoOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::GoOpMaker); diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 361fb3f5ad939..827d2e43e5c05 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -34,8 +34,7 @@ from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler from distribute_transpiler_simple import SimpleDistributeTranspiler -from concurrency import (Go, make_channel, channel_send, channel_recv, - channel_close) +from concurrency import Go, make_channel, channel_send, channel_recv, channel_close import clip from memory_optimization_transpiler import memory_optimize import profiler diff --git a/python/paddle/v2/fluid/concurrency.py b/python/paddle/v2/fluid/concurrency.py index 5f868b6e86d34..047c6f91beb66 100644 --- a/python/paddle/v2/fluid/concurrency.py +++ b/python/paddle/v2/fluid/concurrency.py @@ -47,7 +47,7 @@ def construct_go_op(self): .parent_idx) x_name_list = set() - out_vars = set() + out_vars = [] for op in go_block.ops: # Iterate over all operators, get all the inputs # and add as input to the Go operator. diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index dfd7e8047c113..35e713ce6e23f 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -484,7 +484,7 @@ def find_name(var_list, name): self.desc.check_attrs() no_kernel_op_set = { - 'feed', 'fetch', 'save', 'load', 'recurrent', + 'feed', 'fetch', 'save', 'load', 'recurrent', 'go', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'parallel_do', 'save_combine', 'load_combine' From 3697796efaa61a1e2d15d8c6c122aad2baa29fda Mon Sep 17 00:00:00 2001 From: Varun Arora Date: Wed, 21 Feb 2018 11:39:32 -0800 Subject: [PATCH 15/21] Creating local scopes and go through them --- paddle/fluid/operators/go_op.cc | 99 ++++++++++++++----- .../v2/fluid/tests/notest_concurrency.py | 25 ++++- 2 files changed, 95 insertions(+), 29 deletions(-) diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc index 721d88b4a9a46..7192e8d47ada4 100644 --- a/paddle/fluid/operators/go_op.cc +++ b/paddle/fluid/operators/go_op.cc @@ -14,14 +14,17 @@ limitations under the License. */ #include #include -#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/variable.h" + namespace paddle { namespace operators { +using StepScopeVar = std::vector; + static constexpr char kBlock[] = "sub_block"; static constexpr char kX[] = "X"; static constexpr char kOutputs[] = "Out"; @@ -38,36 +41,80 @@ class GoOp : public framework::OperatorBase { std::cout << "THIS IS MY TEST THREAD" << std::endl; } + // + void ExecuteOnThread(const framework::Executor& executor, + const framework::Scope &scope) const { + // /*, + // framework::Scope ¤t_scope, + // framework::ProgramDesc *program, + // framework::BlockDesc *block*/ +// executor->Run(*program, current_scope, block->ID(), +// false /*create_local_scope*/); + for (int i=0; i < 10; ++i) { + std::cout << "THIS IS MY TEST THREAD" << i << std::endl; + usleep(1000000); + } + + // std::cout << Inputs(kX).size() << std::endl; + std::cout << scope.LocalVarNames().size() << std::endl; + } void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { framework::Executor executor(dev_place); - auto *block = Attr(kBlock); - auto *program = block->Program(); - auto ¤t_scope = scope.NewScope(); -// std::thread go_thread(&GoOp::ExecuteOnThread, &executor, -// current_scope, program, block); -// std::thread th_(&GoOp::TestFunc123, this); -// th_.join(); + // auto *block = Attr(kBlock); + // auto *program = block->Program(); + + // TODO(varunarora): Consider moving this to scope.h. + const framework::Scope* root_scope = &scope; + const framework::Scope* parent_scope = &(root_scope->parent()); + + while (parent_scope != nullptr) { + root_scope = parent_scope; + parent_scope = &(parent_scope->parent()); + std::cout << "New parent scope: " << parent_scope << std::endl; + } + std::cout << "Found root scope: "; + std::cout << root_scope << std::endl; + std::cout << root_scope->LocalVarNames().size() << std::endl; + + framework::Scope& new_scope = root_scope->NewScope(); + + // Add all the inputs to a new + auto &inputs = Inputs(kX); + for (size_t i = 0; i < inputs.size(); i++) { + std::cout << inputs.at(i) << std::endl; + /*framework::Variable* new_var = */ + new_scope.Var(inputs.at(i)); + } + + // std::cout << new_scope.LocalVarNames().size() << std::endl; + + // Loop through all the local scope variables. + std::vector new_scope_variables = ( + new_scope.LocalVarNames()); + for (size_t j = 0; j < new_scope_variables.size(); j++) { + std::cout << new_scope.FindVar( + new_scope_variables[j]) << std::endl; + + } + // std::thread go_thread([=]{ExecuteOnThread(executor, scope);}); + +// std::thread send_thread = std::thread{[&]() { +// for (int i=0; i < 10; ++i) { +// std::cout << "THIS IS MY TEST THREAD" << i << std::endl; +// usleep(1000000); +// } // - std::thread send_thread = std::thread{[&]() { -// for (int i=0; i < 500; ++i) { -// std::cout << "THIS IS MY TEST THREAD" << i << std::endl; -// usleep(1000000); -// } - executor.Run(*program, ¤t_scope, block->ID(), - false /*create_local_scope*/); - }}; - std::cout << "OUTSIDE THREAD" << std::endl; - send_thread.detach(); +// std::cout << scope.LocalVarNames().size() << std::endl; +// executor.Run(*program, ¤t_scope, block->ID(), +// false /*create_local_scope*/); +// }}; + + std::cout << "OUTSIDE THREAD" << std::endl; + // send_thread.detach(); + // go_thread.detach(); + // go_thread.join(); } -// -// void ExecuteOnThread(framework::Executor* executor, -// framework::Scope *current_scope, -// framework::ProgramDesc *program, -// framework::BlockDesc *block) { -// executor->Run(*program, current_scope, block->ID(), -// false /*create_local_scope*/); -// } }; diff --git a/python/paddle/v2/fluid/tests/notest_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py index 9d87ed9c07364..aad65b9a9326e 100644 --- a/python/paddle/v2/fluid/tests/notest_concurrency.py +++ b/python/paddle/v2/fluid/tests/notest_concurrency.py @@ -16,12 +16,20 @@ import paddle.v2.fluid as fluid import paddle.v2.fluid.core as core from paddle.v2.fluid.executor import Executor - +import numpy +import time class TestRoutineOp(unittest.TestCase): def test_simple_routine(self): ch = fluid.make_channel(dtype=bool) + d0 = fluid.layers.data( + "d0", shape=[10], append_batch_size=False, dtype='float32') + i = fluid.layers.zeros(shape=[1], dtype='int64') + data_array = fluid.layers.array_write(x=d0, i=i) + with fluid.Go(): + d = fluid.layers.array_read(array=data_array, i=i) + fluid.channel_send(ch, True) result = fluid.channel_recv(ch) @@ -30,8 +38,19 @@ def test_simple_routine(self): cpu = core.CPUPlace() exe = Executor(cpu) - outs = exe.run(fetch_list=[result]) - self.assertEqual(outs[0], True) + d = [] + for i in xrange(3): + d.append(numpy.random.random(size=[10]).astype('float32')) + + outs = exe.run( + feed={'d0': d[0]}, + fetch_list=[] + ) + + while True: + time.sleep(10) + + #self.assertEqual(outs[0], True) if __name__ == '__main__': From d690a8346d83915e02f192d13966d7efa8a1a798 Mon Sep 17 00:00:00 2001 From: Varun Arora Date: Wed, 21 Feb 2018 14:53:28 -0800 Subject: [PATCH 16/21] Updated go op inputs persistability enforcement --- paddle/fluid/operators/go_op.cc | 62 ++++++++++++--------------------- 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc index 7192e8d47ada4..55519d259f050 100644 --- a/paddle/fluid/operators/go_op.cc +++ b/paddle/fluid/operators/go_op.cc @@ -60,60 +60,42 @@ class GoOp : public framework::OperatorBase { } void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { - framework::Executor executor(dev_place); - // auto *block = Attr(kBlock); - // auto *program = block->Program(); + framework::Executor executor(dev_place); - // TODO(varunarora): Consider moving this to scope.h. + // std::thread go_thread([=]{ExecuteOnThread(executor, scope);}); + // go_thread.detach(); + + /* + * Determine the global scope. Create a new child scope. + * Within the child scope, add all the local variables relevant + * to that scope. + * + * Now go through all the inputs to the op to ensure that + * all of them are in the newly created scope. + * */ + + // TODO(varunarora): Consider moving this root scope lookup to scope.h. const framework::Scope* root_scope = &scope; const framework::Scope* parent_scope = &(root_scope->parent()); while (parent_scope != nullptr) { root_scope = parent_scope; parent_scope = &(parent_scope->parent()); - std::cout << "New parent scope: " << parent_scope << std::endl; } - std::cout << "Found root scope: "; - std::cout << root_scope << std::endl; - std::cout << root_scope->LocalVarNames().size() << std::endl; framework::Scope& new_scope = root_scope->NewScope(); - // Add all the inputs to a new - auto &inputs = Inputs(kX); - for (size_t i = 0; i < inputs.size(); i++) { - std::cout << inputs.at(i) << std::endl; - /*framework::Variable* new_var = */ - new_scope.Var(inputs.at(i)); + auto *block = Attr(kBlock); + for (auto& var : block->AllVars()) { + new_scope.Var(var->Name()); } - // std::cout << new_scope.LocalVarNames().size() << std::endl; - - // Loop through all the local scope variables. - std::vector new_scope_variables = ( - new_scope.LocalVarNames()); - for (size_t j = 0; j < new_scope_variables.size(); j++) { - std::cout << new_scope.FindVar( - new_scope_variables[j]) << std::endl; - + auto &inputs = Inputs(kX); + for (size_t i = 0; i < inputs.size(); i++) { + PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), + "All variables used in the go block " + "should be created outside any block"); } - // std::thread go_thread([=]{ExecuteOnThread(executor, scope);}); - -// std::thread send_thread = std::thread{[&]() { -// for (int i=0; i < 10; ++i) { -// std::cout << "THIS IS MY TEST THREAD" << i << std::endl; -// usleep(1000000); -// } -// -// std::cout << scope.LocalVarNames().size() << std::endl; -// executor.Run(*program, ¤t_scope, block->ID(), -// false /*create_local_scope*/); -// }}; - - std::cout << "OUTSIDE THREAD" << std::endl; - // send_thread.detach(); - // go_thread.detach(); - // go_thread.join(); } }; From e87aabb1ec130f5521e87863cd99a1b42b0cc03a Mon Sep 17 00:00:00 2001 From: Varun Arora Date: Wed, 21 Feb 2018 15:44:58 -0800 Subject: [PATCH 17/21] Add thread execution; compile failing though --- paddle/fluid/operators/go_op.cc | 42 +++++++++++++-------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc index 55519d259f050..943cfaa83fa95 100644 --- a/paddle/fluid/operators/go_op.cc +++ b/paddle/fluid/operators/go_op.cc @@ -16,8 +16,6 @@ limitations under the License. */ #include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/framework/operator.h" -#include "paddle/fluid/framework/variable.h" namespace paddle { @@ -37,41 +35,28 @@ class GoOp : public framework::OperatorBase { : framework::OperatorBase(type, inputs, outputs, attrs) {} private: - void TestFunc123() { - std::cout << "THIS IS MY TEST THREAD" << std::endl; - } + void ExecuteOnThread(framework::Executor& executor, + const framework::ProgramDesc *program, + framework::Scope *scope, + framework::BlockDesc *block) { + + executor.Run(*program, scope, block->ID(), + false /*create_local_scope*/); - // - void ExecuteOnThread(const framework::Executor& executor, - const framework::Scope &scope) const { - // /*, - // framework::Scope ¤t_scope, - // framework::ProgramDesc *program, - // framework::BlockDesc *block*/ -// executor->Run(*program, current_scope, block->ID(), -// false /*create_local_scope*/); - for (int i=0; i < 10; ++i) { - std::cout << "THIS IS MY TEST THREAD" << i << std::endl; - usleep(1000000); - } - - // std::cout << Inputs(kX).size() << std::endl; - std::cout << scope.LocalVarNames().size() << std::endl; } + void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { framework::Executor executor(dev_place); - - // std::thread go_thread([=]{ExecuteOnThread(executor, scope);}); - // go_thread.detach(); - /* * Determine the global scope. Create a new child scope. * Within the child scope, add all the local variables relevant * to that scope. * * Now go through all the inputs to the op to ensure that - * all of them are in the newly created scope. + * all of them are in the newly created scope. This is important + * to ensure that they don't get destroyed when the parent scope + * is deleted. * */ // TODO(varunarora): Consider moving this root scope lookup to scope.h. @@ -96,6 +81,11 @@ class GoOp : public framework::OperatorBase { "All variables used in the go block " "should be created outside any block"); } + + // Now execute the go op with the newly created scope. + std::thread go_thread([=]{ExecuteOnThread( + executor, block->Program(), &new_scope, block);}); + go_thread.detach(); } }; From 60dd07b3157bf18ec54e6f03820e77adbfea0f46 Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Wed, 21 Feb 2018 17:59:19 -0800 Subject: [PATCH 18/21] Fix go op --- paddle/fluid/framework/CMakeLists.txt | 1 + paddle/fluid/framework/concurrency_test.cc | 91 ++++++++++++++++++++++ paddle/fluid/operators/go_op.cc | 54 +++++++------ paddle/testing/paddle_gtest_main.cc | 5 +- 4 files changed, 124 insertions(+), 27 deletions(-) create mode 100644 paddle/fluid/framework/concurrency_test.cc diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index ef1bc07c2dbe7..f60bf3fe3b133 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -96,3 +96,4 @@ cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_contex cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc) cc_test(channel_test SRCS channel_test.cc) +cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op sum_op executor) diff --git a/paddle/fluid/framework/concurrency_test.cc b/paddle/fluid/framework/concurrency_test.cc new file mode 100644 index 0000000000000..5e88fd1eb3386 --- /dev/null +++ b/paddle/fluid/framework/concurrency_test.cc @@ -0,0 +1,91 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/block_desc.h" + +USE_NO_KERNEL_OP(go); +USE_NO_KERNEL_OP(sum); + +namespace f = paddle::framework; +namespace p = paddle::platform; + +void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { + p::CPUDeviceContext ctx(place); + for (int i = 0; i < 2; ++i) { + auto var_name = paddle::string::Sprintf("x%d", i); + auto var = scope.Var(var_name); + auto tensor = var->GetMutable(); + tensor->Resize({10, 10}); + float *expect = tensor->mutable_data(place); + for (int64_t i = 0; i < tensor->numel(); ++i) { + expect[i] = static_cast(i); + } + } + + auto out_var = scope.Var("Out"); + auto out_tensor = out_var->GetMutable(); + out_tensor->Resize({10, 10}); + out_tensor->mutable_data(place); // allocate +} + +void AddOp(const std::string &type, const f::VariableNameMap &inputs, + const f::VariableNameMap &outputs, f::AttributeMap attrs, + f::BlockDesc *block) { + // insert output + for (auto kv : outputs) { + for (auto v : kv.second) { + block->Var(v); + // var->SetDataType(f::proto::DataType::FP32); + } + } + + // insert op + auto op = block->AppendOp(); + op->SetType(type); + for (auto &kv : inputs) { + op->SetInput(kv.first, kv.second); + } + for (auto &kv : outputs) { + op->SetOutput(kv.first, kv.second); + } + op->SetAttrMap(attrs); +} + +TEST(Concurrency, Go_Op) { + f::Scope scope; + p::CPUPlace place; + + InitTensorsInScope(scope, place); + + f::ProgramDesc program; + f::BlockDesc *block = program.MutableBlock(0); + + AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block); + + f::AttributeMap attrs; + attrs.insert({"sub_block", block}); + + auto go_op = f::OpRegistry::CreateOp("go", {{"X", {"x0", "x1"}}}, + {{"Out", {"Out"}}}, attrs); + std::cout << "GOING INTO THREAD" << std::endl << std::flush; + go_op->Run(scope, place); + std::cout << "GOING INTO SLEEP" << std::endl << std::flush; + usleep(100000000); + std::cout << "GOING DONE" << std::endl << std::flush; +} diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc index 943cfaa83fa95..16f2f02670ae7 100644 --- a/paddle/fluid/operators/go_op.cc +++ b/paddle/fluid/operators/go_op.cc @@ -35,19 +35,18 @@ class GoOp : public framework::OperatorBase { : framework::OperatorBase(type, inputs, outputs, attrs) {} private: - void ExecuteOnThread(framework::Executor& executor, - const framework::ProgramDesc *program, - framework::Scope *scope, - framework::BlockDesc *block) { - - executor.Run(*program, scope, block->ID(), - false /*create_local_scope*/); - - } +// void ExecuteOnThread(const framework::Executor& executor, +// const framework::ProgramDesc *program, +// framework::Scope *scope, +// framework::BlockDesc *block) const { +// executor.Run(*program, scope, block->ID(), +// false /*create_local_scope*/); +// +// } void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { - framework::Executor executor(dev_place); +// framework::Executor executor(dev_place); /* * Determine the global scope. Create a new child scope. * Within the child scope, add all the local variables relevant @@ -68,23 +67,30 @@ class GoOp : public framework::OperatorBase { parent_scope = &(parent_scope->parent()); } - framework::Scope& new_scope = root_scope->NewScope(); - auto *block = Attr(kBlock); - for (auto& var : block->AllVars()) { - new_scope.Var(var->Name()); - } - - auto &inputs = Inputs(kX); - for (size_t i = 0; i < inputs.size(); i++) { - PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), - "All variables used in the go block " - "should be created outside any block"); - } // Now execute the go op with the newly created scope. - std::thread go_thread([=]{ExecuteOnThread( - executor, block->Program(), &new_scope, block);}); + std::thread go_thread([=] { + framework::Executor executor(dev_place); + const framework::ProgramDesc *program = block->Program(); + + framework::Scope& new_scope = root_scope->NewScope(); + + for (auto& var : block->AllVars()) { + new_scope.Var(var->Name()); + } + + auto &inputs = Inputs(kX); + for (size_t i = 0; i < inputs.size(); i++) { + PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), + "All variables used in the go block " + "should be created outside any block"); + } + + executor.Run(*program, &new_scope, block->ID(), + false /*create_local_scope*/); + }); + go_thread.detach(); } }; diff --git a/paddle/testing/paddle_gtest_main.cc b/paddle/testing/paddle_gtest_main.cc index 270f2f4c181df..0fea6a80794a6 100644 --- a/paddle/testing/paddle_gtest_main.cc +++ b/paddle/testing/paddle_gtest_main.cc @@ -28,10 +28,9 @@ int main(int argc, char** argv) { } #ifdef PADDLE_WITH_CUDA new_argv.push_back( - strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory," - "warpctc_dir")); + strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory")); #else - new_argv.push_back(strdup("--tryfromenv=use_pinned_memory,warpctc_dir")); + new_argv.push_back(strdup("--tryfromenv=use_pinned_memory")); #endif int new_argc = static_cast(new_argv.size()); char** new_argv_address = new_argv.data(); From 9abc486ceae958967a4a3bb12afcaf4285475416 Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Thu, 22 Feb 2018 10:21:28 -0800 Subject: [PATCH 19/21] Cleaned up Go op --- paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/concurrency_test.cc | 39 ++--- paddle/fluid/operators/go_op.cc | 157 ++++++++++----------- 3 files changed, 93 insertions(+), 105 deletions(-) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index f60bf3fe3b133..b31715305deaa 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -96,4 +96,4 @@ cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_contex cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc) cc_test(channel_test SRCS channel_test.cc) -cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op sum_op executor) +cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op sum_op executor proto_desc) diff --git a/paddle/fluid/framework/concurrency_test.cc b/paddle/fluid/framework/concurrency_test.cc index 5e88fd1eb3386..b53c679d2a936 100644 --- a/paddle/fluid/framework/concurrency_test.cc +++ b/paddle/fluid/framework/concurrency_test.cc @@ -15,9 +15,9 @@ limitations under the License. */ #include #include "gtest/gtest.h" +#include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/program_desc.h" -#include "paddle/fluid/framework/block_desc.h" USE_NO_KERNEL_OP(go); USE_NO_KERNEL_OP(sum); @@ -25,12 +25,14 @@ USE_NO_KERNEL_OP(sum); namespace f = paddle::framework; namespace p = paddle::platform; -void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { +namespace paddle { +namespace framework { +void InitTensorsInScope(Scope &scope, p::CPUPlace &place) { p::CPUDeviceContext ctx(place); for (int i = 0; i < 2; ++i) { auto var_name = paddle::string::Sprintf("x%d", i); auto var = scope.Var(var_name); - auto tensor = var->GetMutable(); + auto tensor = var->GetMutable(); tensor->Resize({10, 10}); float *expect = tensor->mutable_data(place); for (int64_t i = 0; i < tensor->numel(); ++i) { @@ -39,19 +41,19 @@ void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { } auto out_var = scope.Var("Out"); - auto out_tensor = out_var->GetMutable(); + auto out_tensor = out_var->GetMutable(); out_tensor->Resize({10, 10}); out_tensor->mutable_data(place); // allocate } -void AddOp(const std::string &type, const f::VariableNameMap &inputs, - const f::VariableNameMap &outputs, f::AttributeMap attrs, - f::BlockDesc *block) { +void AddOp(const std::string &type, const VariableNameMap &inputs, + const VariableNameMap &outputs, AttributeMap attrs, + BlockDesc *block) { // insert output for (auto kv : outputs) { for (auto v : kv.second) { - block->Var(v); - // var->SetDataType(f::proto::DataType::FP32); + auto var = block->Var(v); + var->SetDataType(proto::VarType::FP32); } } @@ -68,24 +70,23 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs, } TEST(Concurrency, Go_Op) { - f::Scope scope; + Scope scope; p::CPUPlace place; InitTensorsInScope(scope, place); - f::ProgramDesc program; - f::BlockDesc *block = program.MutableBlock(0); + ProgramDesc program; + BlockDesc *block = program.MutableBlock(0); AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block); - f::AttributeMap attrs; + AttributeMap attrs; attrs.insert({"sub_block", block}); - auto go_op = f::OpRegistry::CreateOp("go", {{"X", {"x0", "x1"}}}, - {{"Out", {"Out"}}}, attrs); - std::cout << "GOING INTO THREAD" << std::endl << std::flush; + auto go_op = OpRegistry::CreateOp("go", {{"X", {"x0", "x1"}}}, + {{"Out", {"Out"}}}, attrs); go_op->Run(scope, place); - std::cout << "GOING INTO SLEEP" << std::endl << std::flush; - usleep(100000000); - std::cout << "GOING DONE" << std::endl << std::flush; + usleep(1000000); // TODO(thuan): Replace this command with channel receive } +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/operators/go_op.cc b/paddle/fluid/operators/go_op.cc index 16f2f02670ae7..6701e7c78699a 100644 --- a/paddle/fluid/operators/go_op.cc +++ b/paddle/fluid/operators/go_op.cc @@ -12,12 +12,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include +#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_registry.h" - namespace paddle { namespace operators { @@ -27,98 +26,86 @@ static constexpr char kBlock[] = "sub_block"; static constexpr char kX[] = "X"; static constexpr char kOutputs[] = "Out"; -class GoOp : public framework::OperatorBase { - public: - GoOp(const std::string &type, const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : framework::OperatorBase(type, inputs, outputs, attrs) {} - - private: -// void ExecuteOnThread(const framework::Executor& executor, -// const framework::ProgramDesc *program, -// framework::Scope *scope, -// framework::BlockDesc *block) const { -// executor.Run(*program, scope, block->ID(), -// false /*create_local_scope*/); -// -// } - - void RunImpl(const framework::Scope &scope, - const platform::Place &dev_place) const override { -// framework::Executor executor(dev_place); - /* - * Determine the global scope. Create a new child scope. - * Within the child scope, add all the local variables relevant - * to that scope. - * - * Now go through all the inputs to the op to ensure that - * all of them are in the newly created scope. This is important - * to ensure that they don't get destroyed when the parent scope - * is deleted. - * */ - - // TODO(varunarora): Consider moving this root scope lookup to scope.h. - const framework::Scope* root_scope = &scope; - const framework::Scope* parent_scope = &(root_scope->parent()); - - while (parent_scope != nullptr) { - root_scope = parent_scope; - parent_scope = &(parent_scope->parent()); - } - - auto *block = Attr(kBlock); - - // Now execute the go op with the newly created scope. - std::thread go_thread([=] { - framework::Executor executor(dev_place); - const framework::ProgramDesc *program = block->Program(); - - framework::Scope& new_scope = root_scope->NewScope(); - - for (auto& var : block->AllVars()) { - new_scope.Var(var->Name()); - } - - auto &inputs = Inputs(kX); - for (size_t i = 0; i < inputs.size(); i++) { - PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), - "All variables used in the go block " - "should be created outside any block"); - } - - executor.Run(*program, &new_scope, block->ID(), - false /*create_local_scope*/); - }); - - go_thread.detach(); +class GoOp : public framework::OperatorBase { + public: + GoOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : framework::OperatorBase(type, inputs, outputs, attrs) {} + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + /* + * Determine the global scope. Create a new child scope. + * Within the child scope, add all the local variables relevant + * to that scope. + * + * Now go through all the inputs to the op to ensure that + * all of them are in the newly created scope. This is important + * to ensure that they don't get destroyed when the parent scope + * is deleted. + * */ + + // TODO(varunarora): Consider moving this root scope lookup to scope.h. + const framework::Scope *root_scope = &scope; + const framework::Scope *parent_scope = &(root_scope->parent()); + + while (parent_scope != nullptr) { + root_scope = parent_scope; + parent_scope = &(parent_scope->parent()); } -}; + auto *block = Attr(kBlock); + + // Now execute the go op with the newly created scope. + std::thread go_thread([=] { + framework::Executor executor(dev_place); + const framework::ProgramDesc *program = block->Program(); + + framework::Scope &new_scope = root_scope->NewScope(); + + for (auto &var : block->AllVars()) { + new_scope.Var(var->Name()); + } + + auto &inputs = Inputs(kX); + for (size_t i = 0; i < inputs.size(); i++) { + PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)), + "All variables used in the go block " + "should be created in the global scope"); + } + + executor.Run(*program, &new_scope, block->ID(), + false /*create_local_scope*/); + }); + + go_thread.detach(); + } +}; class GoOpMaker : public framework::OpProtoAndCheckerMaker { - public: - GoOpMaker(OpProto *proto, OpAttrChecker *op_checker) - : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput(kX, - "A set of variables, which are required by operators inside the " - "block of Go Op.") - .AsDuplicable(); - AddOutput(kOutputs, - "A set of variables, which will be assigned with values " - "generated by the operators inside the block of Go Op.") - .AsDuplicable(); - AddAttr(kBlock, - "The block inside GoOp"); - AddComment(R"DOC( + public: + GoOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput(kX, + "A set of variables, which are required by operators inside the " + "block of Go Op.") + .AsDuplicable(); + AddOutput(kOutputs, + "A set of variables, which will be assigned with values " + "generated by the operators inside the block of Go Op.") + .AsDuplicable(); + AddAttr(kBlock, "The block inside GoOp"); + AddComment(R"DOC( )DOC"); - } + } }; // TODO(thuan): Look into Gradient Operator for GO_OP -} // namespace operators -} // namespace paddle +} // namespace operators +} // namespace paddle REGISTER_OPERATOR(go, paddle::operators::GoOp, paddle::framework::EmptyGradOpMaker, From 8b8d208e472178d4e804c5ac46e8234a540475a6 Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Thu, 22 Feb 2018 10:23:05 -0800 Subject: [PATCH 20/21] Fix yapf format issue --- python/paddle/v2/fluid/tests/notest_concurrency.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/paddle/v2/fluid/tests/notest_concurrency.py b/python/paddle/v2/fluid/tests/notest_concurrency.py index aad65b9a9326e..58622b0add18c 100644 --- a/python/paddle/v2/fluid/tests/notest_concurrency.py +++ b/python/paddle/v2/fluid/tests/notest_concurrency.py @@ -19,6 +19,7 @@ import numpy import time + class TestRoutineOp(unittest.TestCase): def test_simple_routine(self): ch = fluid.make_channel(dtype=bool) @@ -42,10 +43,7 @@ def test_simple_routine(self): for i in xrange(3): d.append(numpy.random.random(size=[10]).astype('float32')) - outs = exe.run( - feed={'d0': d[0]}, - fetch_list=[] - ) + outs = exe.run(feed={'d0': d[0]}, fetch_list=[]) while True: time.sleep(10) From 756af7b04262d4401f446d0db5096777a4c727af Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Thu, 22 Feb 2018 10:25:30 -0800 Subject: [PATCH 21/21] Readd warp ctc dir for unit tests --- paddle/testing/paddle_gtest_main.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/paddle/testing/paddle_gtest_main.cc b/paddle/testing/paddle_gtest_main.cc index 0fea6a80794a6..270f2f4c181df 100644 --- a/paddle/testing/paddle_gtest_main.cc +++ b/paddle/testing/paddle_gtest_main.cc @@ -28,9 +28,10 @@ int main(int argc, char** argv) { } #ifdef PADDLE_WITH_CUDA new_argv.push_back( - strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory")); + strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory," + "warpctc_dir")); #else - new_argv.push_back(strdup("--tryfromenv=use_pinned_memory")); + new_argv.push_back(strdup("--tryfromenv=use_pinned_memory,warpctc_dir")); #endif int new_argc = static_cast(new_argv.size()); char** new_argv_address = new_argv.data();