Skip to content

Commit

Permalink
Paddlebox Fix DualBox mpi coredump bug (PaddlePaddle#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
qingshui committed Jun 20, 2020
1 parent 09ba408 commit 9812cc1
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 38 deletions.
8 changes: 8 additions & 0 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ struct SlotValues {
std::vector<T> slot_values;
std::vector<uint32_t> slot_offsets;

~SlotValues() {
slot_values.shrink_to_fit();
slot_offsets.shrink_to_fit();
}
void add_values(const T* values, uint32_t num) {
if (slot_offsets.empty()) {
slot_offsets.push_back(0);
Expand Down Expand Up @@ -797,6 +801,10 @@ inline SlotRecord make_slotrecord() { return new SlotRecordObject(); }

struct SlotPvInstanceObject {
std::vector<SlotRecord> ads;
~SlotPvInstanceObject() {
ads.clear();
ads.shrink_to_fit();
}
void merge_instance(SlotRecord ins) { ads.push_back(ins); }
};

Expand Down
68 changes: 34 additions & 34 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,20 @@ void MultiSlotDataset::SlotsShuffle(
}

#ifdef PADDLE_WITH_BOX_PS
class PadBoxSlotDataConsumer : public boxps::DataConsumer {
public:
explicit PadBoxSlotDataConsumer(PadBoxSlotDataset* dataset)
: _dataset(dataset) {
BoxWrapper::data_shuffle_->register_handler(this);
}
virtual ~PadBoxSlotDataConsumer() {}
virtual void on_receive(const int client_id, const char* buff, int len) {
_dataset->ReceiveSuffleData(client_id, buff, len);
}

private:
PadBoxSlotDataset* _dataset;
};
// paddlebox
PadBoxSlotDataset::PadBoxSlotDataset() {
mpi_size_ = boxps::MPICluster::Ins().size();
Expand All @@ -1398,15 +1412,17 @@ PadBoxSlotDataset::PadBoxSlotDataset() {
finished_counter_ = mpi_size_;
mpi_flags_.assign(mpi_size_, 1);
VLOG(3) << "RegisterClientToClientMsgHandler";
BoxWrapper::data_shuffle_->register_handler(
[this](int client_id, const char* buf, int len) {
return this->ReceiveSuffleData(client_id, buf, len);
});
data_consumer_ = reinterpret_cast<void*>(new PadBoxSlotDataConsumer(this));
VLOG(3) << "RegisterClientToClientMsgHandler done";
}
SlotRecordPool();
}
PadBoxSlotDataset::~PadBoxSlotDataset() {}
PadBoxSlotDataset::~PadBoxSlotDataset() {
if (data_consumer_ != nullptr) {
delete reinterpret_cast<PadBoxSlotDataConsumer*>(data_consumer_);
data_consumer_ = nullptr;
}
}
// create input channel and output channel
void PadBoxSlotDataset::CreateChannel() {
if (input_channel_ == nullptr) {
Expand Down Expand Up @@ -1529,7 +1545,9 @@ void PadBoxSlotDataset::MergeInsKeys(const Channel<SlotRecord>& in) {
for (auto& rec : datas) {
for (auto& idx : used_fea_index) {
uint64_t* feas = rec->slot_uint64_feasigns_.get_values(idx, &num);
agent->AddKeys(feas, num, tid);
if (num > 0) {
agent->AddKeys(feas, num, tid);
}
}
feed_obj->ExpandSlotRecord(&rec);
}
Expand Down Expand Up @@ -1581,6 +1599,7 @@ void PadBoxSlotDataset::ReleaseMemory() {
delete pv;
}
input_pv_ins_.clear();
input_pv_ins_.shrink_to_fit();
}
timeline.Pause();
VLOG(1) << "DatasetImpl<T>::ReleaseMemory() end, cost time="
Expand All @@ -1602,7 +1621,6 @@ void PadBoxSlotDataset::ShuffleData(std::vector<std::thread>* shuffle_threads,
std::vector<SlotRecord> loc_datas;
std::vector<SlotRecord> releases;
std::vector<paddle::framework::BinaryArchive> ars(mpi_size_);
std::vector<std::future<int32_t>> rets(mpi_size_);

while (input_channel_->Read(data)) {
for (auto& t : data) {
Expand Down Expand Up @@ -1631,41 +1649,26 @@ void PadBoxSlotDataset::ShuffleData(std::vector<std::thread>* shuffle_threads,
if (i == mpi_rank_) {
continue;
}
if (ars[i].Length() == 0) {
auto& ar = ars[i];
if (ar.Length() == 0) {
continue;
}
rets[i] = BoxWrapper::data_shuffle_->send_message(i, ars[i].Buffer(),
ars[i].Length());
BoxWrapper::data_shuffle_->send_message(i, ar.Buffer(), ar.Length());
ar.Clear();
}

for (int i = 0; i < mpi_size_; ++i) {
if (i == mpi_rank_) {
continue;
}
rets[i].wait();
ars[i].Clear();
}
data.clear();
loc_datas.clear();
}

VLOG(3) << "end shuffle thread id = " << tid;
// only one thread send finish notify
if (--shuffle_counter_ == 0) {
// send closed
paddle::framework::BinaryArchive ar;
for (int i = 0; i < mpi_size_; ++i) {
if (i == mpi_rank_) {
continue;
}
rets[i] = BoxWrapper::data_shuffle_->send_message(i, ar.Buffer(),
ar.Length());
}

for (int i = 0; i < mpi_size_; ++i) {
if (i == mpi_rank_) {
continue;
}
rets[i].wait();
BoxWrapper::data_shuffle_->send_message(i, NULL, 0);
}
// local closed channel
if (--finished_counter_ == 0) {
Expand All @@ -1680,12 +1683,6 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf,
VLOG(3) << "ReceiveFromClient client_id=" << client_id
<< ", msg length=" << len;
if (len == 0) {
return;
}

paddle::framework::BinaryArchive ar;
ar.SetReadBuffer(const_cast<char*>(buf), len, nullptr);
if (ar.Cursor() == ar.Finish()) {
if (mpi_flags_[client_id]) {
mpi_flags_[client_id] = 0;
--finished_counter_;
Expand All @@ -1696,6 +1693,9 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf,
return;
}

paddle::framework::BinaryArchive ar;
ar.SetReadBuffer(const_cast<char*>(buf), len, nullptr);

int offset = 0;
const int max_fetch_num = 1000;
std::vector<SlotRecord> data;
Expand Down
5 changes: 4 additions & 1 deletion paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ class PadBoxSlotDataset : public DatasetImpl<SlotRecord> {
// shuffle data
virtual void ShuffleData(std::vector<std::thread>* shuffle_threads,
int thread_num = -1);
virtual void ReceiveSuffleData(int client_id, const char* msg, int len);

public:
virtual void ReceiveSuffleData(const int client_id, const char* msg, int len);

private:
void MergeInsKeys(const Channel<SlotRecord>& in);
Expand All @@ -371,6 +373,7 @@ class PadBoxSlotDataset : public DatasetImpl<SlotRecord> {
std::vector<SlotPvInstance> input_pv_ins_;
int shuffle_thread_num_ = 10;
std::atomic<int> shuffle_counter_{0};
void* data_consumer_ = nullptr;
};
#endif

Expand Down
21 changes: 18 additions & 3 deletions paddle/fluid/framework/fleet/box_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ int BoxWrapper::expand_embed_dim_ = 0;

void BasicAucCalculator::compute() {
double* table[2] = {&_table[0][0], &_table[1][0]};
if (boxps::MPICluster::Ins().size() > 1) {
boxps::MPICluster::Ins().allreduce_sum(table[0], _table_size);
boxps::MPICluster::Ins().allreduce_sum(table[1], _table_size);
}

double area = 0;
double fp = 0;
Expand All @@ -51,10 +55,21 @@ void BasicAucCalculator::compute() {
_auc = area / (fp * tp);
}

_mae = _local_abserr / (fp + tp);
_rmse = sqrt(_local_sqrerr / (fp + tp));
if (boxps::MPICluster::Ins().size() > 1) {
// allreduce sum
double local_err[3] = {_local_abserr, _local_sqrerr, _local_pred};
boxps::MPICluster::Ins().allreduce_sum(local_err, 3);

_mae = local_err[0] / (fp + tp);
_rmse = sqrt(local_err[1] / (fp + tp));
_predicted_ctr = local_err[2] / (fp + tp);
} else {
_mae = _local_abserr / (fp + tp);
_rmse = sqrt(_local_sqrerr / (fp + tp));
_predicted_ctr = _local_pred / (fp + tp);
}
_actual_ctr = tp / (fp + tp);
_predicted_ctr = _local_pred / (fp + tp);

_size = fp + tp;
}

Expand Down

0 comments on commit 9812cc1

Please sign in to comment.