Skip to content

Commit

Permalink
[PsCore] optimize performance of large kv (#32535)
Browse files Browse the repository at this point in the history
* optimize pull sparse

* optimize pull sparse

* change macro

* format
  • Loading branch information
Thunderbrook committed Apr 26, 2021
1 parent 913317f commit 4b7242b
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 119 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ if (WITH_MIPS)
add_definitions(-DPADDLE_WITH_MIPS)
endif()

if (WITH_HETERPS)
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -faligned-new")
endif()
endif()
set(PADDLE_PYTHON_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/python/build")

set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g -DNDEBUG")
Expand Down
23 changes: 13 additions & 10 deletions paddle/fluid/distributed/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include <thread> // NOLINT
#include "butil/object_pool.h"
#include "paddle/fluid/distributed/table/depends/sparse_utils.h"
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/archive.h"
Expand Down Expand Up @@ -196,12 +197,13 @@ int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request,
return 0;
}

std::vector<float> res_data;
res_data.resize(num * table->value_accesor()->select_size() / sizeof(float));
table->pull_dense(res_data.data(), num);
auto res_data = butil::get_object<std::vector<float>>();
res_data->resize(num * table->value_accesor()->select_size() / sizeof(float));
table->pull_dense(res_data->data(), num);

cntl->response_attachment().append((char *)res_data.data(),
res_data.size() * sizeof(float));
cntl->response_attachment().append((char *)(res_data->data()),
res_data->size() * sizeof(float));
butil::return_object(res_data);

return 0;
}
Expand Down Expand Up @@ -367,12 +369,13 @@ int32_t BrpcPsService::pull_sparse(Table *table,

value.DeserializeFromBytes(const_cast<void *>(data));

std::vector<float> res_data;
res_data.resize(num * dim);
table->pull_sparse(res_data.data(), value);
auto res_data = butil::get_object<std::vector<float>>();
res_data->resize(num * dim);
table->pull_sparse(res_data->data(), value);

cntl->response_attachment().append((char *)res_data.data(),
res_data.size() * sizeof(float));
cntl->response_attachment().append((char *)(res_data->data()),
res_data->size() * sizeof(float));
butil::return_object(res_data);
return 0;
}

Expand Down
55 changes: 30 additions & 25 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,37 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,

int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t not_save_num = 0;
for (auto& value : block->values_) {
if (mode == SaveMode::delta && !value.second.need_save_) {
not_save_num++;
continue;
}

auto* vs = value.second.data_;
std::stringstream ss;
auto id = value.first;
ss << id << "\t" << value.second.count_ << "\t" << value.second.unseen_days_
<< "\t" << value.second.is_entry_ << "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
}
int64_t save_num = 0;
for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
save_num += 1;

auto* vs = value.second->data_.data();
std::stringstream ss;
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
}

ss << "\n";
ss << "\n";

os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());

if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second.need_save_ = false;
if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second->need_save_ = false;
}
}
}

return block->values_.size() - not_save_num;
return save_num;
}

int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
Expand Down Expand Up @@ -183,7 +186,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,

block->Init(id, false);

auto value_instant = block->GetValue(id);
VALUE* value_instant = block->GetValue(id);
if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
Expand Down Expand Up @@ -373,8 +376,10 @@ std::pair<int64_t, int64_t> CommonSparseTable::print_table_stat() {
int64_t feasign_size = 0;
int64_t mf_size = 0;

for (auto& value : shard_values_) {
feasign_size += value->values_.size();
for (auto& shard : shard_values_) {
for (auto& table : shard->values_) {
feasign_size += table.size();
}
}

return {feasign_size, mf_size};
Expand Down
Loading

0 comments on commit 4b7242b

Please sign in to comment.