Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial GO_OP implementation and Unit tests (DO NOT MERGE) #8496

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
497321a
Adding Python boilerplate code for Go op
Feb 13, 2018
f2a95fa
Add very basic test case
Feb 13, 2018
d785ca9
Adding the python logic for go routine
Feb 13, 2018
313ffb0
Fix syntax
Feb 13, 2018
a43924e
Changing test to notest
Feb 13, 2018
c97cf4a
Rename Routine to Go
Feb 13, 2018
39f22cf
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Feb 13, 2018
fc2593c
Combining GoGuard and Go in one class
Feb 13, 2018
24737a6
Merge branch 'go_op' of https://github.com/abhinavarora/Paddle into g…
Feb 13, 2018
8343a35
Modify test
Feb 13, 2018
d0e26f7
Adding fluid close channel
Feb 13, 2018
ccb37b9
Merge branch 'go_op' of https://github.com/abhinavarora/Paddle into g…
Feb 13, 2018
a37a1e1
Fixing __init__.py for calling fluid.go()
Feb 13, 2018
2fb93ca
Merge branch 'go_op' of https://github.com/abhinavarora/Paddle into g…
Feb 13, 2018
f5f6370
Adding stubs for channel methods and updating test case
Feb 13, 2018
323a2e2
Removing import *
Feb 15, 2018
d02873b
Merge branch 'go_op' of github.com:abhinavarora/Paddle into go_op
Feb 15, 2018
04cb0f3
Merge remote-tracking branch 'origin/develop' into go_op
Feb 15, 2018
47f9680
Adding imports from concurrency
Feb 15, 2018
802eb42
Initial commit of GO_OP (for varun)
cs2be Feb 15, 2018
934352a
Merge branch 'develop' of github.com:PaddlePaddle/Paddle into go_op_t…
cs2be Feb 21, 2018
3697796
Creating local scopes and go through them
Feb 21, 2018
d690a83
Updated go op inputs persistability enforcement
Feb 21, 2018
e87aabb
Add thread execution; compile failing though
Feb 21, 2018
60dd07b
Fix go op
cs2be Feb 22, 2018
9abc486
Cleaned up Go op
cs2be Feb 22, 2018
8b8d208
Fix yapf format issue
cs2be Feb 22, 2018
756af7b
Readd warp ctc dir for unit tests
cs2be Feb 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,4 @@ cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_contex
cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc)

cc_test(channel_test SRCS channel_test.cc)
cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op sum_op executor proto_desc)
92 changes: 92 additions & 0 deletions paddle/fluid/framework/concurrency_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <thread>

#include "gtest/gtest.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/program_desc.h"

USE_NO_KERNEL_OP(go);
USE_NO_KERNEL_OP(sum);

namespace f = paddle::framework;
namespace p = paddle::platform;

namespace paddle {
namespace framework {
void InitTensorsInScope(Scope &scope, p::CPUPlace &place) {
p::CPUDeviceContext ctx(place);
for (int i = 0; i < 2; ++i) {
auto var_name = paddle::string::Sprintf("x%d", i);
auto var = scope.Var(var_name);
auto tensor = var->GetMutable<LoDTensor>();
tensor->Resize({10, 10});
float *expect = tensor->mutable_data<float>(place);
for (int64_t i = 0; i < tensor->numel(); ++i) {
expect[i] = static_cast<float>(i);
}
}

auto out_var = scope.Var("Out");
auto out_tensor = out_var->GetMutable<LoDTensor>();
out_tensor->Resize({10, 10});
out_tensor->mutable_data<float>(place); // allocate
}

void AddOp(const std::string &type, const VariableNameMap &inputs,
const VariableNameMap &outputs, AttributeMap attrs,
BlockDesc *block) {
// insert output
for (auto kv : outputs) {
for (auto v : kv.second) {
auto var = block->Var(v);
var->SetDataType(proto::VarType::FP32);
}
}

// insert op
auto op = block->AppendOp();
op->SetType(type);
for (auto &kv : inputs) {
op->SetInput(kv.first, kv.second);
}
for (auto &kv : outputs) {
op->SetOutput(kv.first, kv.second);
}
op->SetAttrMap(attrs);
}

TEST(Concurrency, Go_Op) {
Scope scope;
p::CPUPlace place;

InitTensorsInScope(scope, place);

ProgramDesc program;
BlockDesc *block = program.MutableBlock(0);

AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block);

AttributeMap attrs;
attrs.insert({"sub_block", block});

auto go_op = OpRegistry::CreateOp("go", {{"X", {"x0", "x1"}}},
{{"Out", {"Out"}}}, attrs);
go_op->Run(scope, place);
usleep(1000000); // TODO(thuan): Replace this command with channel receive
}
} // namespace framework
} // namespace paddle
112 changes: 112 additions & 0 deletions paddle/fluid/operators/go_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <thread>
#include <vector>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_registry.h"

namespace paddle {
namespace operators {

using StepScopeVar = std::vector<framework::Scope *>;

static constexpr char kBlock[] = "sub_block";
static constexpr char kX[] = "X";
static constexpr char kOutputs[] = "Out";

class GoOp : public framework::OperatorBase {
public:
GoOp(const std::string &type, const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: framework::OperatorBase(type, inputs, outputs, attrs) {}

private:
void RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const override {
/*
* Determine the global scope. Create a new child scope.
* Within the child scope, add all the local variables relevant
* to that scope.
*
* Now go through all the inputs to the op to ensure that
* all of them are in the newly created scope. This is important
* to ensure that they don't get destroyed when the parent scope
* is deleted.
* */

// TODO(varunarora): Consider moving this root scope lookup to scope.h.
const framework::Scope *root_scope = &scope;
const framework::Scope *parent_scope = &(root_scope->parent());

while (parent_scope != nullptr) {
root_scope = parent_scope;
parent_scope = &(parent_scope->parent());
}

auto *block = Attr<framework::BlockDesc *>(kBlock);

// Now execute the go op with the newly created scope.
std::thread go_thread([=] {
framework::Executor executor(dev_place);
const framework::ProgramDesc *program = block->Program();

framework::Scope &new_scope = root_scope->NewScope();

for (auto &var : block->AllVars()) {
new_scope.Var(var->Name());
}

auto &inputs = Inputs(kX);
for (size_t i = 0; i < inputs.size(); i++) {
PADDLE_ENFORCE_NOT_NULL(new_scope.FindVar(inputs.at(i)),
"All variables used in the go block "
"should be created in the global scope");
}

executor.Run(*program, &new_scope, block->ID(),
false /*create_local_scope*/);
});

go_thread.detach();
}
};

class GoOpMaker : public framework::OpProtoAndCheckerMaker {
public:
GoOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput(kX,
"A set of variables, which are required by operators inside the "
"block of Go Op.")
.AsDuplicable();
AddOutput(kOutputs,
"A set of variables, which will be assigned with values "
"generated by the operators inside the block of Go Op.")
.AsDuplicable();
AddAttr<framework::BlockDesc *>(kBlock, "The block inside GoOp");
AddComment(R"DOC(
)DOC");
}
};

// TODO(thuan): Look into Gradient Operator for GO_OP

} // namespace operators
} // namespace paddle

REGISTER_OPERATOR(go, paddle::operators::GoOp,
paddle::framework::EmptyGradOpMaker,
paddle::operators::GoOpMaker);
2 changes: 1 addition & 1 deletion python/paddle/v2/fluid/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def construct_go_op(self):
.parent_idx)

x_name_list = set()
out_vars = set()
out_vars = []
for op in go_block.ops:
# Iterate over all operators, get all the inputs
# and add as input to the Go operator.
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/v2/fluid/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def find_name(var_list, name):

self.desc.check_attrs()
no_kernel_op_set = {
'feed', 'fetch', 'save', 'load', 'recurrent',
'feed', 'fetch', 'save', 'load', 'recurrent', 'go',
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send',
'recv', 'listen_and_serv', 'parallel_do', 'save_combine',
'load_combine', 'ncclInit'
Expand Down
21 changes: 19 additions & 2 deletions python/paddle/v2/fluid/tests/notest_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core
from paddle.v2.fluid.executor import Executor
import numpy
import time


class TestRoutineOp(unittest.TestCase):
def test_simple_routine(self):
ch = fluid.make_channel(dtype=bool)
d0 = fluid.layers.data(
"d0", shape=[10], append_batch_size=False, dtype='float32')
i = fluid.layers.zeros(shape=[1], dtype='int64')
data_array = fluid.layers.array_write(x=d0, i=i)

with fluid.Go():
d = fluid.layers.array_read(array=data_array, i=i)

fluid.channel_send(ch, True)

result = fluid.channel_recv(ch)
Expand All @@ -30,8 +39,16 @@ def test_simple_routine(self):
cpu = core.CPUPlace()
exe = Executor(cpu)

outs = exe.run(fetch_list=[result])
self.assertEqual(outs[0], True)
d = []
for i in xrange(3):
d.append(numpy.random.random(size=[10]).astype('float32'))

outs = exe.run(feed={'d0': d[0]}, fetch_list=[])

while True:
time.sleep(10)

#self.assertEqual(outs[0], True)


if __name__ == '__main__':
Expand Down