Skip to content

Commit

Permalink
Merge pull request #40 from HuangShiqing/paddlebox
Browse files Browse the repository at this point in the history
dedup key前置优化
  • Loading branch information
HuangShiqing committed Jan 8, 2024
2 parents 7c7ca59 + 6767330 commit 0a82713
Show file tree
Hide file tree
Showing 3 changed files with 620 additions and 110 deletions.
94 changes: 80 additions & 14 deletions paddle/fluid/framework/fleet/box_wrapper_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License. */
#include <glog/logging.h>

#include <vector>
#include <thread>

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
Expand Down Expand Up @@ -382,24 +383,25 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
VLOG(3) << "Begine BoxPs PullSparse";
xpu::ctx_guard RAII_GUARD(ctx_xpu);

int64_t total_bytes = total_length * feature_pull_size_;
void* total_values_xpu =
dev.pull_push_tensor.mutable_data<void>(total_bytes, place);


#ifdef TRACE_PROFILE
TRACE_SCOPE_START("copy keys", xpu_wait(ctx_xpu->xpu_stream));
#endif
VLOG(3) << "Begin copy keys, key_num[" << total_length << "]";
// LoDTensor& total_keys_tensor = dev.keys_tensor;
uint64_t* total_keys;
if(use_l3_tensor) {
total_keys = dev.keys_tensor.mutable_data<uint64_t>(total_length * sizeof(int64_t), l3_place);
int* key2slot = nullptr;
if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) {
total_keys = dev.keys_tensor.mutable_data<uint64_t>(total_length * 3 * sizeof(int64_t), place);
} else {
total_keys = dev.keys_tensor.mutable_data<uint64_t>(total_length * sizeof(int64_t), place);
if(use_l3_tensor) {
total_keys = dev.keys_tensor.mutable_data<uint64_t>(total_length * sizeof(int64_t), l3_place);
} else {
total_keys = dev.keys_tensor.mutable_data<uint64_t>(total_length * sizeof(int64_t), place);
}
}
int* key2slot = nullptr;
key2slot =dev.keys2slot.mutable_data<int>(total_length * sizeof(int), place);

key2slot = dev.keys2slot.mutable_data<int>(total_length * sizeof(int), place);
// construct slot_level lod info
std::vector<int64_t> slot_lengths_lod(slot_num + 1, 0);
for (int i = 1; i <= slot_num ; i++) {
Expand Down Expand Up @@ -429,6 +431,65 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
static_cast<int>(slot_lengths.size()),
static_cast<int>(total_length), key2slot);
}
uint64_t* d_pull_keys = total_keys;
int pull_size = total_length;
int* d_merged_idx = nullptr;
int* d_merged_offsets = nullptr;
int* d_res_idx = nullptr;
std::thread thread_get_restore_idx;

if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) {
uint64_t* d_merged_keys = reinterpret_cast<uint64_t*>(&total_keys[total_length]);
pull_size = boxps_ptr_->DedupKeysAndFillIdxXPU(device_id, total_length, total_keys,
d_merged_keys, d_merged_idx, d_merged_offsets);
d_pull_keys = d_merged_keys;
d_res_idx = reinterpret_cast<int*>(&total_keys[2 * total_length]);

thread_get_restore_idx = std::thread([&] {
xpu_set_device(device_id);
std::vector<int> h_idx(total_length);
std::vector<int> h_offset(pull_size + 1);
xpu_memcpy(h_idx.data(),
d_merged_idx,
h_idx.size() * sizeof(int),
XPUMemcpyKind::XPU_DEVICE_TO_HOST);
xpu_memcpy(h_offset.data(),
d_merged_offsets,
pull_size * sizeof(int),
XPUMemcpyKind::XPU_DEVICE_TO_HOST);
h_offset[pull_size] = total_length - 1;
std::vector<int> tmp1(total_length);

for (size_t i = 0; i < (size_t)pull_size; i++) {
if (i != 0) {
tmp1[h_offset[i]] = tmp1[h_offset[i] - 1];
}
else {
tmp1[0] = 0;
}
for (int j = h_offset[i] + 1; j < h_offset[i + 1]; j++) {
tmp1[j] = tmp1[j - 1] + 1;
}
}
if (h_offset[pull_size - 1] != h_offset[pull_size]) {
tmp1[h_offset[pull_size]] = tmp1[h_offset[pull_size] - 1] + 1;
} else {
tmp1[h_offset[pull_size]] = tmp1[h_offset[pull_size] - 1];
}
std::vector<int> h_res_idx(total_length);
for (size_t i = 0; i < (size_t)total_length; i++) {
h_res_idx[h_idx[i]] = i - tmp1[i];
}

xpu_memcpy(d_res_idx,
h_res_idx.data(),
total_length * sizeof(int),
XPUMemcpyKind::XPU_HOST_TO_DEVICE);
});
}

void* total_values_xpu = dev.pull_push_tensor.mutable_data<void>(pull_size * feature_pull_size_, place);

VLOG(3) << "Begin call PullSparseXPU in BoxPS, dev: " << device_id
<< " len: " << total_length;
#ifdef TRACE_PROFILE
Expand All @@ -437,8 +498,9 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
TRACE_SCOPE_START("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream));
#endif
pull_boxps_timer.Start();
boxps_ptr_->PullSparseXPU(total_keys, total_values_xpu,
static_cast<int>(total_length), device_id);

boxps_ptr_->PullSparseXPU(d_pull_keys, total_values_xpu, pull_size, device_id);

pull_boxps_timer.Pause();
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream));
Expand Down Expand Up @@ -467,10 +529,14 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
#ifdef TRACE_PROFILE
TRACE_SCOPE_START("CopyForPull", xpu_wait(ctx_xpu->xpu_stream));
#endif
if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) {
thread_get_restore_idx.join();
}

box_wrapper_kernel_->CopyForPull(place, xpu_keys, (float**)values.data(), total_values_xpu,
pull_offset, slot_lengths_lod.data(), slot_num, key2slot, hidden_size,
expand_embed_dim, total_length, total_dims, skip_offset,
expand_only);
pull_offset, slot_lengths_lod.data(), slot_num, key2slot, d_res_idx, hidden_size,
expand_embed_dim, total_length, total_dims, skip_offset,
expand_only, d_merged_idx, d_merged_offsets, pull_size);
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("CopyForPull", xpu_wait(ctx_xpu->xpu_stream));
TRACE_SCOPE_END("pull copy", xpu_wait(ctx_xpu->xpu_stream));
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/framework/fleet/box_wrapper_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ void CopyForPull(
const paddle::platform::Place& place, uint64_t** xpu_keys,
float** xpu_values, void* total_values_xpu,
boxps::FeaturePullOffset* pull_offset, const int64_t* slot_lens,
const int slot_num, const int* key2slot, const int hidden_size,
const int slot_num, const int* key2slot, const int* d_res_idx, const int hidden_size,
const int expand_embed_dim, const int64_t total_length, int* total_dims,
const int skip_offset, bool expand_only,
const uint32_t* xpu_restore_idx = nullptr);
const int* xpu_merged_idx = nullptr,
const int* xpu_merged_offsets = nullptr,
const int merged_len = 0);

void CopyForPush(
const paddle::platform::Place& place,
Expand Down
Loading

0 comments on commit 0a82713

Please sign in to comment.