Skip to content

Commit

Permalink
Revert "[Dy2Stat] Refactor ExecutorCache logic and pre-support BuildS…
Browse files Browse the repository at this point in the history
…trategy for pass (PaddlePaddle#34181)"

This reverts commit 609f822.
  • Loading branch information
Aurelius84 committed Jul 23, 2021
1 parent 781f402 commit 4e567f0
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 165 deletions.
67 changes: 40 additions & 27 deletions paddle/fluid/framework/executor_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#include "paddle/fluid/framework/executor_cache.h"
#include "paddle/fluid/framework/op_info.h"

namespace paddle {
namespace framework {
Expand All @@ -26,11 +25,11 @@ namespace framework {

namespace details {

static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
static ExecutionStrategy GetExecutionStrategy(
const ExecutorInfoCache::CacheKey &cache_key) {
framework::ExecutionStrategy execution_strategy;

auto device_type = platform::Place2DeviceType(place);
switch (device_type) {
switch (cache_key.device_type_) {
case platform::DeviceType::CPU: {
execution_strategy.num_threads_ = 2;
break;
Expand All @@ -47,9 +46,9 @@ static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
}
default:
PADDLE_THROW(platform::errors::Unavailable("Unsupported Device type %d.",
device_type));
cache_key.device_type_));
}
execution_strategy.use_device_ = device_type;
execution_strategy.use_device_ = cache_key.device_type_;

return execution_strategy;
}
Expand Down Expand Up @@ -137,44 +136,58 @@ ExecutorInfoCache &ExecutorInfoCache::Instance() {
return g_exe_cache_info_map;
}

CacheInfo GetExecutorInfoFromCache(const ProgramDesc &program_desc,
const platform::Place &place,
int64_t start_op_index, int64_t end_op_index,
bool is_grad, int64_t program_id,
void ExecutorInfoCache::Finalize() {
// NOTE(Aurelius84): DO NOT perform finalize in destructor
// to avoid problems caused by destructor order of static
// object.
info_map_.clear();
}

CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey &cache_key,
framework::Scope *scope) {
auto &cached_exe_info = framework::ExecutorInfoCache::Instance();

if (!cached_exe_info.Has(program_id, is_grad)) {
VLOG(1) << "create exe_info for " << program_id << " is_grad: " << is_grad;
auto execution_strategy = details::GetExecutionStrategy(place);
auto &build_strategy = cached_exe_info.GetBuildStrategy(program_id);
if (!cached_exe_info.Has(cache_key)) {
VLOG(1) << "create exe_info for " << cache_key.DebugString();

// TODO(Aurelius84): Consider to use LRU algorithm to replace this.
if (cached_exe_info.Size() > 4u /* max_cached_size*/) {
VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
"all cache!";
cached_exe_info.Finalize();
}

framework::BuildStrategy build_strategy;
auto execution_strategy = details::GetExecutionStrategy(cache_key);

// 2. Construct Graph and ParallelExecutor.
auto graph = std::make_shared<framework::ir::Graph>(
program_desc, start_op_index, end_op_index);
*cache_key.program_desc_, cache_key.start_op_index_,
cache_key.end_op_index_);
auto parallel_executor = std::make_shared<framework::ParallelExecutor>(
place, scope, execution_strategy, build_strategy, graph.get());
cache_key.place_, scope, execution_strategy, build_strategy,
graph.get());
parallel_executor->PrepareVariables(scope);

// 3. Insert value into cached map.
auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
cached_value.executor_ = parallel_executor;
cached_value.graph_ = std::move(graph);
return std::make_pair(parallel_executor, /*is_new_created=*/true);
framework::ExecutorInfoCache::ValueType cache_val = {parallel_executor,
graph};
cached_exe_info.Insert(cache_key, cache_val);

bool is_new_created = true;
return std::make_pair(parallel_executor, is_new_created);
} else {
VLOG(1) << "get exe_info from cache by: " << program_id
<< " is_grad: " << is_grad;
auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
VLOG(1) << "get exe_info from cache by: " << cache_key.DebugString();
bool is_new_created = false;
auto cache_val = cached_exe_info.GetMutable(cache_key);
auto parallel_executor = cache_val.first;

auto &parallel_executor = cached_value.executor_;
// update op_handle scope_map in pe->executor_->Graph
std::unordered_map<Scope *, Scope *> scope_map = {
{parallel_executor->GetLocalScopes().front(), scope}};
parallel_executor->ResetOpHandleScopeMapOfGraphs(scope_map);
// need to recreate tmp variables in new scope
parallel_executor->PrepareVariables(scope);

return std::make_pair(parallel_executor, /*is_new_created=*/false);
return std::make_pair(parallel_executor, is_new_created);
}
}

Expand Down
153 changes: 92 additions & 61 deletions paddle/fluid/framework/executor_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,90 +45,121 @@ void ParseSafeEagerDeletionSkipVars(
std::vector<std::string>* skip_eager_delete_vars);

} // namespace details

class ExecutorInfo {
class ExecutorInfoCache {
public:
struct CacheValue {
std::shared_ptr<ParallelExecutor> executor_{nullptr};
std::shared_ptr<ir::Graph> graph_{nullptr};

std::vector<std::string> skip_eager_delete_vars_;
struct CacheKey {
CacheKey(const ProgramDesc* program_desc, const platform::Place& place,
int64_t start_op_index, int64_t end_op_index, bool is_grad)
: program_desc_(program_desc),
place_(place),
start_op_index_(start_op_index),
end_op_index_(end_op_index),
is_grad_(is_grad) {
device_type_ = platform::Place2DeviceType(place);
PADDLE_ENFORCE_NOT_NULL(program_desc_,
"program_desc should not be null.");
}

std::string DebugString() const {
std::stringstream ss;

ss << "\n CacheKey(program_desc: " << program_desc_;
ss << ", start_op_index: " << start_op_index_;
ss << ", end_op_index: " << end_op_index_;
ss << ", is_grad: " << is_grad_;
ss << ", device_type: " << device_type_ << ")";

return ss.str();
}

const ProgramDesc* program_desc_;
platform::Place place_;
int64_t start_op_index_;
int64_t end_op_index_;
bool is_grad_;
platform::DeviceType device_type_;
};

bool IsAvailable(bool is_grad) {
const auto& executor =
is_grad ? backward_info_.executor_ : forward_info_.executor_;
return executor != nullptr;
}

CacheValue& GetMutable(bool is_grad) {
return is_grad ? backward_info_ : forward_info_;
}

private:
CacheValue forward_info_;
CacheValue backward_info_;
};
using KeyType = size_t;
using ValueType =
std::pair<std::shared_ptr<ParallelExecutor>, std::shared_ptr<ir::Graph>>;

struct KeyHasher {
size_t operator()(const CacheKey& key) const noexcept {
size_t seed = 10;
auto* prog_desc = key.program_desc_;
/*
* Note(Aurelius84): DO NOT use only ProgramDesc* to calculate hash value
* because a new program will hold same pointer address after an older
* program is destructed with a small probability. Add op size while
* hashing because program may contains at least one block.
*/
hash_combine(&seed, prog_desc);
for (size_t i = 0; i < prog_desc->Size(); ++i) {
hash_combine(&seed, &prog_desc->Block(i));
hash_combine(&seed, prog_desc->Block(i).OpSize());
}
hash_combine(&seed, static_cast<int>(key.device_type_));
hash_combine(&seed, key.start_op_index_);
hash_combine(&seed, key.end_op_index_);
hash_combine(&seed, key.is_grad_);
VLOG(3) << "hash value is : " << seed
<< " of key: " << key.DebugString();
return seed;
}

template <typename T>
void hash_combine(size_t* seed, const T& val) const {
std::hash<T> hasher;
(*seed) ^= hasher(val) + 0x9e3779b9 + ((*seed) << 6) + ((*seed >> 2));
}
};

class ExecutorInfoCache {
public:
static ExecutorInfoCache& Instance();

const BuildStrategy& GetBuildStrategy(int64_t program_id) {
// If not found, insert build_strategy with default value.
return strategy_map_[program_id];
}

void SetBuildStrategy(int64_t program_id,
const BuildStrategy& build_strategy) {
ValueType GetMutable(const CacheKey& key) {
auto key_val = key_hash_func_(key);
PADDLE_ENFORCE_EQ(
strategy_map_.count(program_id), 0,
platform::errors::PreconditionNotMet(
"program_id: %s already exist in ExecutorInfoCache", program_id));
strategy_map_[program_id] = build_strategy;
Has(key_val), true,
platform::errors::NotFound("%s doesn't exist in ExecutorInfoCache",
key.DebugString()));
return info_map_[key_val];
}

bool Has(int64_t program_id, bool is_grad) {
return info_map_.find(program_id) != info_map_.end() &&
info_map_[program_id].IsAvailable(is_grad);
bool Has(const CacheKey& key) const {
auto key_val = key_hash_func_(key);
return Has(key_val);
}

ExecutorInfo::CacheValue& GetMutable(int64_t program_id, bool is_grad) {
return info_map_[program_id].GetMutable(is_grad);
bool Has(const KeyType& key) const {
return info_map_.find(key) != info_map_.end();
}

void UpdateSkipEagerDeleteVars(int64_t program_id, bool is_grad,
const std::vector<std::string>& skip_vars) {
auto& cached_value = GetMutable(program_id, is_grad);
cached_value.skip_eager_delete_vars_ = std::move(skip_vars);
void Insert(const CacheKey& key, ValueType value) {
auto key_val = key_hash_func_(key);
PADDLE_ENFORCE_EQ(
Has(key_val), false,
platform::errors::NotFound("%s has existed in ExecutorInfoCache",
key.DebugString()));
info_map_.insert({key_val, value});
}

std::vector<std::string>& SkipEagerDeleteVars(int64_t program_id,
bool is_grad) {
auto& cached_value = GetMutable(program_id, is_grad);
return cached_value.skip_eager_delete_vars_;
}
size_t Size() const { return info_map_.size(); }

void Finalize() {
// NOTE(Aurelius84): DO NOT perform finalize in destructor
// to avoid problems caused by destructor order of static
// object.
info_map_.clear();
strategy_map_.clear();
}
void Finalize();

private:
std::unordered_map<int64_t, ExecutorInfo> info_map_;
std::unordered_map<int64_t, BuildStrategy> strategy_map_;
ExecutorInfoCache() = default;
DISABLE_COPY_AND_ASSIGN(ExecutorInfoCache);

KeyHasher key_hash_func_;
std::unordered_map<KeyType, ValueType> info_map_;
};

using CacheInfo =
std::pair<std::shared_ptr<ParallelExecutor>, bool /*is_new_created*/>;

CacheInfo GetExecutorInfoFromCache(const ProgramDesc& program_desc,
const platform::Place& place,
int64_t start_op_index, int64_t end_op_index,
bool is_grad, int64_t program_id,
CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey& cache_key,
framework::Scope* scope);

} // namespace framework
Expand Down
4 changes: 0 additions & 4 deletions paddle/fluid/operators/run_program_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ class RunProgramOpMaker : public framework::OpProtoAndCheckerMaker {
"(bool, default false) Set to true for inference only, false "
"for training.")
.SetDefault(false);
AddAttr<int64_t>(
"program_id",
"(int64_t)"
"The unique hash id used as cache key for ExecutorInfoCache.");
AddComment(R"DOC(
RunProgram operator.
Expand Down
Loading

0 comments on commit 4e567f0

Please sign in to comment.