Skip to content

Commit

Permalink
Merge pull request #26 from HuangShiqing/paddlebox
Browse files Browse the repository at this point in the history
Paddlebox
  • Loading branch information
HuangShiqing committed Nov 23, 2023
2 parents a2b9f15 + da37757 commit 8d071ee
Show file tree
Hide file tree
Showing 10 changed files with 759 additions and 53 deletions.
10 changes: 10 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3595,6 +3595,7 @@ int SlotPaddleBoxDataFeed::GetCurrentPhase() {
return box_ptr->Phase();
}
}

void SlotPaddleBoxDataFeed::GetRankOffsetGPU(const int pv_num,
const int ins_num) {
#if defined(PADDLE_WITH_CUDA) && defined(_LINUX) || defined(PADDLE_WITH_XPU_KP) && !defined(CPU_DATA_FEED)
Expand All @@ -3609,6 +3610,15 @@ void SlotPaddleBoxDataFeed::GetRankOffsetGPU(const int pv_num,
value.d_ad_offset.data<int>(), col);

#elif defined(PADDLE_WITH_XPU_KP)
auto dev_ctx = platform::DeviceContextPool::Instance().Get(this->place_);
auto ctx = static_cast<platform::XPUDeviceContext*>(dev_ctx)->x_context();
int r = xpu::constant<int>(ctx, tensor_ptr, rank_offset_->numel(), 0);
PADDLE_ENFORCE_EQ(r,
XPU_SUCCESS,
platform::errors::External(
"XPU constant kernel return wrong value[%d %s]",
r,
XPUAPIErrorMsg[r]));
DataFeedPdboxXpuKernelHelper::CopyRankOffset(this->place_, tensor_ptr, ins_num, pv_num, max_rank,
value.d_rank.data<int>(), value.d_cmatch.data<int>(),
value.d_ad_offset.data<int>(), col);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/data_feed.kps
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "xpu/kernel/xtdk.h"
// #include "xpu/kernel/debug.h"
#include "xpu/kernel/xtdk_math.h"
#include "xpu/kernel/xtdk_io.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -93,6 +94,7 @@ __global__ void CopyRankOffsetKernel(int* mat,
}
}
}
mfence();
LM2GM(lm_mat, mat + lm_pv_offset[pv_offset_left_index] * col, ad_num * col * sizeof(int));
pv_offset_left_index = pv_offset_right_index;
}
Expand Down Expand Up @@ -376,7 +378,7 @@ void DataFeedPdboxXpuKernelHelper::CopyRankOffset(const paddle::platform::Place&
stream = static_cast<platform::XPUDeviceContext*>(dev_ctx)
->x_context()
->xpu_stream;
CopyRankOffsetKernel<<<4, 64, stream>>>(dest, ranks, cmatchs, ad_offsets, ins_num, pv_num, max_rank, cols);
CopyRankOffsetKernel<<<8, 64, stream>>>(dest, ranks, cmatchs, ad_offsets, pv_num, ins_num, max_rank, cols);
xpu_wait(stream);
}

Expand Down
11 changes: 8 additions & 3 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2114,8 +2114,11 @@ void PadBoxSlotDataset::PreLoadIntoDisk(const std::string& path,
}
CHECK(slot_pool_ != nullptr) << "slotrecord pool nullptr";
read_ins_ref_ = thread_num_;
if (disable_shuffle_) {
read_ins_ref_ = 1;
}
CHECK(down_pool_ != nullptr) << "down_pool nullptr";
for (int64_t i = 0; i < thread_num_; ++i) {
for (int64_t i = 0; i < read_ins_ref_; ++i) {
wait_futures_.emplace_back(down_pool_->Run([this, i]() {
platform::Timer timer;
timer.Start();
Expand Down Expand Up @@ -2785,8 +2788,10 @@ void PadBoxSlotDataset::PrepareTrain(void) {
// join or aucrunner mode enable pv
if (enable_pv_merge_ && (box_ptr->Phase() & 0x01 == 1 ||
box_ptr->Mode() == 1)) {
std::shuffle(input_pv_ins_.begin(), input_pv_ins_.end(),
BoxWrapper::LocalRandomEngine());
if (!disable_random_update_) {
std::shuffle(input_pv_ins_.begin(), input_pv_ins_.end(),
BoxWrapper::LocalRandomEngine());
}
// 分数据到各线程里面
int batchsize = reinterpret_cast<SlotPaddleBoxDataFeed*>(readers_[0].get())
->GetPvBatchSize();
Expand Down
34 changes: 27 additions & 7 deletions paddle/fluid/framework/fleet/box_wrapper_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -731,16 +731,34 @@ void BoxWrapper::PushSparseGradCaseXPU(const paddle::platform::Place& place,

TRACE_SCOPE_START("CopyForPush", xpu_wait(ctx_xpu->xpu_stream));
#endif
float* real_grad_values;
// float* real_grad_values;
// for (int i = 0; i < slot_num; i++) {
// if(grad_values[i] != nullptr) {
// real_grad_values = const_cast<float*>(grad_values[i]);
// break;
// }
// }
std::vector<int> slot_inner_offset(total_length);
int out_count = 0;
for (int i = 0; i < slot_num; i++) {
if(grad_values[i] != nullptr) {
real_grad_values = const_cast<float*>(grad_values[i]);
break;
for (int64_t j = 0; j < slot_lengths[i]; j++) {
slot_inner_offset[out_count++] = j;
}
}
box_wrapper_kernel_->CopyForPush(place, real_grad_values, total_grad_values_xpu,
push_offset, total_length, slot_vector, slot_lens, slot_num,
hidden_size, batch_size, total_dims, skip_offset, key2slot);
auto d_slot_inner_offset_tmp = memory::Alloc(place, total_length * sizeof(int));
int* d_slot_inner_offset = reinterpret_cast<int*>(d_slot_inner_offset_tmp->ptr());
memory::Copy(place,
d_slot_inner_offset,
platform::CPUPlace(),
slot_inner_offset.data(),
total_length * sizeof(int));

box_wrapper_kernel_->CopyForPush(place, xpu_values, total_grad_values_xpu,
push_offset, total_length, slot_vector, (int*)d_slot_inner_offset, slot_lens, slot_num,
hidden_size, batch_size, total_dims, skip_offset, key2slot,
expand_embed_dim,
push_float_num_,
expand_only);

push_boxps_timer.Resume();
#ifdef TRACE_PROFILE
Expand All @@ -749,9 +767,11 @@ void BoxWrapper::PushSparseGradCaseXPU(const paddle::platform::Place& place,

TRACE_SCOPE_START("PushSparseXPU", xpu_wait(ctx_xpu->xpu_stream));
#endif

int ret = boxps_ptr_->PushSparseXPU(total_keys,
reinterpret_cast<void*>(total_grad_values_xpu),
static_cast<int>(total_length), device_id);

PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"PushSparseXPU failed in BoxPS."));
push_boxps_timer.Pause();
Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/framework/fleet/box_wrapper_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,22 @@ void CopyForPull(

void CopyForPush(
const paddle::platform::Place& place,
float* gm_src_ptr,
float** gm_src_ptr,
void* total_grad_values_xpu,
boxps::FeaturePushOffset* push_offset,
const int64_t total_length,
const int* slots,
const int* slot_inner_offset,
const int64_t* slot_lens,
const int slot_num,
const int hidden_size,
const int batch_size,
const int* total_dims,
const int skip_offset,
const int* key2slot);
const int* key2slot,
const int expand_embed_dim,
const int push_float_num,
bool expand_only);

public:
const static int MAX_SLOT_SIZE = 10240;
Expand Down
Loading

0 comments on commit 8d071ee

Please sign in to comment.