diff --git a/paddle/fluid/framework/fleet/box_wrapper_impl.h b/paddle/fluid/framework/fleet/box_wrapper_impl.h index 0e5d4c1dd065b..4147158933363 100644 --- a/paddle/fluid/framework/fleet/box_wrapper_impl.h +++ b/paddle/fluid/framework/fleet/box_wrapper_impl.h @@ -16,6 +16,7 @@ limitations under the License. */ #include #include +#include #if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU)) // The producer side. @@ -382,9 +383,7 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place, VLOG(3) << "Begine BoxPs PullSparse"; xpu::ctx_guard RAII_GUARD(ctx_xpu); - int64_t total_bytes = total_length * feature_pull_size_; - void* total_values_xpu = - dev.pull_push_tensor.mutable_data(total_bytes, place); + #ifdef TRACE_PROFILE TRACE_SCOPE_START("copy keys", xpu_wait(ctx_xpu->xpu_stream)); @@ -392,14 +391,17 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place, VLOG(3) << "Begin copy keys, key_num[" << total_length << "]"; // LoDTensor& total_keys_tensor = dev.keys_tensor; uint64_t* total_keys; - if(use_l3_tensor) { - total_keys = dev.keys_tensor.mutable_data(total_length * sizeof(int64_t), l3_place); + int* key2slot = nullptr; + if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) { + total_keys = dev.keys_tensor.mutable_data(total_length * 3 * sizeof(int64_t), place); } else { - total_keys = dev.keys_tensor.mutable_data(total_length * sizeof(int64_t), place); + if(use_l3_tensor) { + total_keys = dev.keys_tensor.mutable_data(total_length * sizeof(int64_t), l3_place); + } else { + total_keys = dev.keys_tensor.mutable_data(total_length * sizeof(int64_t), place); + } } - int* key2slot = nullptr; - key2slot =dev.keys2slot.mutable_data(total_length * sizeof(int), place); - + key2slot = dev.keys2slot.mutable_data(total_length * sizeof(int), place); // construct slot_level lod info std::vector slot_lengths_lod(slot_num + 1, 0); for (int i = 1; i <= slot_num ; i++) { @@ -429,6 +431,65 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place, static_cast(slot_lengths.size()), static_cast(total_length), key2slot); } + uint64_t* d_pull_keys = total_keys; + int pull_size = total_length; + int* d_merged_idx = nullptr; + int* d_merged_offsets = nullptr; + int* d_res_idx = nullptr; + std::thread thread_get_restore_idx; + + if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) { + uint64_t* d_merged_keys = reinterpret_cast(&total_keys[total_length]); + pull_size = boxps_ptr_->DedupKeysAndFillIdxXPU(device_id, total_length, total_keys, + d_merged_keys, d_merged_idx, d_merged_offsets); + d_pull_keys = d_merged_keys; + d_res_idx = reinterpret_cast(&total_keys[2 * total_length]); + + thread_get_restore_idx = std::thread([&] { + xpu_set_device(device_id); + std::vector h_idx(total_length); + std::vector h_offset(pull_size + 1); + xpu_memcpy(h_idx.data(), + d_merged_idx, + h_idx.size() * sizeof(int), + XPUMemcpyKind::XPU_DEVICE_TO_HOST); + xpu_memcpy(h_offset.data(), + d_merged_offsets, + pull_size * sizeof(int), + XPUMemcpyKind::XPU_DEVICE_TO_HOST); + h_offset[pull_size] = total_length - 1; + std::vector tmp1(total_length); + + for (size_t i = 0; i < (size_t)pull_size; i++) { + if (i != 0) { + tmp1[h_offset[i]] = tmp1[h_offset[i] - 1]; + } + else { + tmp1[0] = 0; + } + for (int j = h_offset[i] + 1; j < h_offset[i + 1]; j++) { + tmp1[j] = tmp1[j - 1] + 1; + } + } + if (h_offset[pull_size - 1] != h_offset[pull_size]) { + tmp1[h_offset[pull_size]] = tmp1[h_offset[pull_size] - 1] + 1; + } else { + tmp1[h_offset[pull_size]] = tmp1[h_offset[pull_size] - 1]; + } + std::vector h_res_idx(total_length); + for (size_t i = 0; i < (size_t)total_length; i++) { + h_res_idx[h_idx[i]] = i - tmp1[i]; + } + + xpu_memcpy(d_res_idx, + h_res_idx.data(), + total_length * sizeof(int), + XPUMemcpyKind::XPU_HOST_TO_DEVICE); + }); + } + + void* total_values_xpu = dev.pull_push_tensor.mutable_data(pull_size * feature_pull_size_, place); + VLOG(3) << "Begin call PullSparseXPU in BoxPS, dev: " << device_id << " len: " << total_length; #ifdef TRACE_PROFILE @@ -437,8 +498,9 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place, TRACE_SCOPE_START("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream)); #endif pull_boxps_timer.Start(); - boxps_ptr_->PullSparseXPU(total_keys, total_values_xpu, - static_cast(total_length), device_id); + + boxps_ptr_->PullSparseXPU(d_pull_keys, total_values_xpu, pull_size, device_id); + pull_boxps_timer.Pause(); #ifdef TRACE_PROFILE TRACE_SCOPE_END("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream)); @@ -467,10 +529,14 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place, #ifdef TRACE_PROFILE TRACE_SCOPE_START("CopyForPull", xpu_wait(ctx_xpu->xpu_stream)); #endif + if (FLAGS_enable_pullpush_dedup_keys && use_xpu_sparse_map_) { + thread_get_restore_idx.join(); + } + box_wrapper_kernel_->CopyForPull(place, xpu_keys, (float**)values.data(), total_values_xpu, - pull_offset, slot_lengths_lod.data(), slot_num, key2slot, hidden_size, - expand_embed_dim, total_length, total_dims, skip_offset, - expand_only); + pull_offset, slot_lengths_lod.data(), slot_num, key2slot, d_res_idx, hidden_size, + expand_embed_dim, total_length, total_dims, skip_offset, + expand_only, d_merged_idx, d_merged_offsets, pull_size); #ifdef TRACE_PROFILE TRACE_SCOPE_END("CopyForPull", xpu_wait(ctx_xpu->xpu_stream)); TRACE_SCOPE_END("pull copy", xpu_wait(ctx_xpu->xpu_stream)); diff --git a/paddle/fluid/framework/fleet/box_wrapper_kernel.h b/paddle/fluid/framework/fleet/box_wrapper_kernel.h index 88adcb870b4b5..6a1374b6046c4 100644 --- a/paddle/fluid/framework/fleet/box_wrapper_kernel.h +++ b/paddle/fluid/framework/fleet/box_wrapper_kernel.h @@ -41,10 +41,12 @@ void CopyForPull( const paddle::platform::Place& place, uint64_t** xpu_keys, float** xpu_values, void* total_values_xpu, boxps::FeaturePullOffset* pull_offset, const int64_t* slot_lens, - const int slot_num, const int* key2slot, const int hidden_size, + const int slot_num, const int* key2slot, const int* d_res_idx, const int hidden_size, const int expand_embed_dim, const int64_t total_length, int* total_dims, const int skip_offset, bool expand_only, - const uint32_t* xpu_restore_idx = nullptr); + const int* xpu_merged_idx = nullptr, + const int* xpu_merged_offsets = nullptr, + const int merged_len = 0); void CopyForPush( const paddle::platform::Place& place, diff --git a/paddle/fluid/framework/fleet/box_wrapper_kernel.kps b/paddle/fluid/framework/fleet/box_wrapper_kernel.kps index 34f25cb0389ce..d85a13cd70a3c 100644 --- a/paddle/fluid/framework/fleet/box_wrapper_kernel.kps +++ b/paddle/fluid/framework/fleet/box_wrapper_kernel.kps @@ -228,21 +228,21 @@ void BoxWrapperKernel::CopyKeys(const paddle::platform::Place& place, template __global__ void PullCopyNNCross(const TEmbedxOp* op, - const float scale, - const boxps::FeaturePullOffset* info, - int* total_dims, - unsigned long long* dst_vals, - const int* key2slot, - float* total_values, - const uint32_t* restore_idx, - const int total_length, - const int max_cols_num, - const int hidden_size, - const int expand_embed_dim, - const int pull_float_num, - const int skip_offset, - const int cvm_offset, - const int slot_num){ + const float scale, + const boxps::FeaturePullOffset* info, + int* total_dims, + unsigned long long* dst_vals, + const int* key2slot, + float* total_values, + const uint32_t* restore_idx, + const int total_length, + const int max_cols_num, + const int hidden_size, + const int expand_embed_dim, + const int pull_float_num, + const int skip_offset, + const int cvm_offset, + const int slot_num){ int cid = core_id(); int ncores = core_num(); if (cid >= ncores) { @@ -277,9 +277,9 @@ __global__ void PullCopyNNCross(const TEmbedxOp* op, xpu_sync_all(); __local__ uint64_t lm_dst_vals_ptr[1]; - for(int i=0;i +__global__ void PullCopyNNCrossMerged(const TEmbedxOp* op, + const float scale, + const boxps::FeaturePullOffset* info, + int* total_dims, + unsigned long long* dst_vals,//dst + const int* key2slot, + const int* d_res_idx, + float* total_values,//src + const int* merged_idx, + const int* merged_offsets, + const int merged_length, + const int total_length, + const int max_cols_num, + const int hidden_size, + const int expand_embed_dim, + const int pull_float_num, + const int skip_offset, + const int cvm_offset, + const int slot_num) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = cluster_id() * ncores + cid; + int nthreads = cluster_num() * ncores; + + const int buf_length = 1500 / (pull_float_num + hidden_size + expand_embed_dim + 3); + int per_thread_len = roundup_div(total_length, nthreads); + int per_thread_loop_count = roundup_div(per_thread_len, buf_length); + int per_thread_per_loop_len = roundup_div(per_thread_len, per_thread_loop_count); + + __local__ float lm_total_values[buf_length * pull_float_num]; + __local__ float lm_dst_vals[buf_length * hidden_size]; + __local__ float lm_dst_expand_vals[buf_length * expand_embed_dim]; + __local__ int lm_key2slot[buf_length]; + __local__ int lm_total_dims[buf_length]; + __local__ int lm_res_idx[buf_length]; + + __local__ boxps::FeaturePullOffset lm_info[1]; + __local__ TEmbedxOp lm_op[1]; + + const int max_slot_num = 1000; + int sm_slot_len = min(max_slot_num, slot_num); + __shared__ uint64_t sm_dst_vals_ptr[max_slot_num]; + __shared__ uint64_t sm_dst_expand_vals_ptr[max_slot_num]; + for (int i = cid; i < sm_slot_len; i += ncores) { + GM2SM(dst_vals + i, sm_dst_vals_ptr + i, sizeof(uint64_t)); + GM2SM(dst_vals + slot_num + i, sm_dst_expand_vals_ptr + i, sizeof(uint64_t)); + } + mfence(); + xpu_sync_all(); + + __local__ uint64_t lm_dst_vals_ptr[1]; + for (int i = 0; i < slot_num; i++) { + if (sm_dst_vals_ptr[i] != 0) { + lm_dst_vals_ptr[0] = sm_dst_vals_ptr[i]; + break; + } + } + + GM2LM(info, lm_info, sizeof(boxps::FeaturePullOffset)); + GM2LM(op, lm_op, sizeof(TEmbedxOp)); + for (int i = thread_id; i < per_thread_loop_count * nthreads; i += nthreads) { + int gm_offset = i * per_thread_per_loop_len; + if (gm_offset >= total_length) { + return; + } + int len = min(per_thread_per_loop_len, total_length - gm_offset); + + // GM2LM(total_values + gm_offset * pull_float_num, lm_total_values, len * pull_float_num * sizeof(float)); + GM2LM(total_dims + gm_offset, lm_total_dims, len * sizeof(int)); + GM2LM(key2slot + gm_offset, lm_key2slot, len * sizeof(int)); + GM2LM(d_res_idx + gm_offset, lm_res_idx, len * sizeof(int)); + + for (int j = 0; j < len; j++) { // handle each merged value + GM2LM(total_values + lm_res_idx[j] * pull_float_num, lm_total_values + j * pull_float_num, pull_float_num * sizeof(float)); + for (int k = 0; k < cvm_offset; ++k) { + lm_dst_vals[j * hidden_size + k] = lm_total_values[j * pull_float_num + lm_info[0].show + skip_offset + k]; + } + // cal embedx & expand size + int embedx_size = *((int *)&(lm_total_values[j * pull_float_num + lm_info[0].embedx_size])); + int expand_size = *((int *)&(lm_total_values[j * pull_float_num + lm_info[0].expand_size])); + lm_total_dims[j] = static_cast(embedx_size > 0) | static_cast((expand_size > 0) << 1); + + // handle embdx + if (sm_dst_vals_ptr[lm_key2slot[j]] != 0) { + for (int k = cvm_offset; k < cvm_offset + embedx_size; ++k) { + lm_op[0].copy(lm_dst_vals + j * hidden_size + k, + lm_total_values + j * pull_float_num + lm_info[0].embedx, + k - cvm_offset, + scale); + } + for (int k = cvm_offset + embedx_size; k < hidden_size; ++k) { + lm_dst_vals[j * hidden_size + k] = 0; + } + } + + if (sm_dst_expand_vals_ptr[lm_key2slot[j]] == 0) { + continue; + } + // handle expand + for (int k = hidden_size; k < hidden_size + expand_size; ++k) { + // op.copy(&dest_ptr[expand_id], &src_val[info->expand], expand_id, scale); + lm_op[0].copy(lm_dst_expand_vals + j * expand_embed_dim + k - hidden_size, + lm_total_values + j * pull_float_num + lm_info[0].expand, + k - hidden_size, + scale); + } + for (int k = hidden_size + expand_size; k < max_cols_num; ++k) { + lm_dst_expand_vals[j * expand_embed_dim + k - hidden_size] = 0; + } + } + mfence(); + // fill each undedup val + + LM2GM(lm_total_dims, total_dims + gm_offset, len * sizeof(int)); + LM2GM(lm_dst_vals, + ((__global_ptr__ float*)lm_dst_vals_ptr[0] + gm_offset * hidden_size), + len * hidden_size * sizeof(float)); + LM2GM(lm_dst_expand_vals, + ((__global_ptr__ float*)lm_dst_vals_ptr[0] + total_length * hidden_size + gm_offset * expand_embed_dim), + len * expand_embed_dim * sizeof(float)); } } @@ -400,9 +530,9 @@ __global__ void PullCopyNNCrossWithEmb(const TEmbedxOp* op, xpu_sync_all(); __local__ uint64_t lm_dst_vals_ptr[1]; - for(int i=0;i +__global__ void PullCopyNNCrossWithEmbMerged(const TEmbedxOp* op, + const float scale, + const boxps::FeaturePullOffset* info, + int* total_dims, + unsigned long long* dst_vals, + const int* key2slot, + float* total_values, + const int* d_res_idx, + const int total_length, + const int max_cols_num, + const int hidden_size, + const int expand_embed_dim, + const int pull_float_num, + const int skip_offset, + const int cvm_offset, + const int slot_num) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = cluster_id() * ncores + cid; + int nthreads = cluster_num() * ncores; + + const int buf_length = 5; + int per_thread_len = roundup_div(total_length, nthreads); + int per_thread_loop_count = roundup_div(per_thread_len, buf_length); + int per_thread_per_loop_len = roundup_div(per_thread_len, per_thread_loop_count); + + __local__ float lm_total_values[buf_length * pull_float_num]; + __local__ float lm_dst_vals[buf_length * hidden_size]; + __local__ float lm_dst_expand_vals[buf_length * (hidden_size + expand_embed_dim)]; + __local__ int lm_key2slot[buf_length]; + __local__ int lm_total_dims[buf_length]; + __local__ int lm_res_idx[buf_length]; + __local__ boxps::FeaturePullOffset lm_info[1]; + __local__ TEmbedxOp lm_op[1]; + + const int max_slot_num = 1000; + int sm_slot_len = min(max_slot_num, slot_num); + __shared__ uint64_t sm_dst_vals_ptr[max_slot_num]; + __shared__ uint64_t sm_dst_expand_vals_ptr[max_slot_num]; + for (int i = cid; i < sm_slot_len; i += ncores) { + GM2SM(dst_vals + i, sm_dst_vals_ptr + i, sizeof(uint64_t)); + GM2SM(dst_vals + slot_num + i, sm_dst_expand_vals_ptr + i, sizeof(uint64_t)); + } + mfence(); + xpu_sync_all(); + + __local__ uint64_t lm_dst_vals_ptr[1]; + for (int i = 0; i < slot_num; i++) { + if (sm_dst_vals_ptr[i] != 0) { + lm_dst_vals_ptr[0] = sm_dst_vals_ptr[i]; + break; + } + } + + GM2LM(info, lm_info, sizeof(boxps::FeaturePullOffset)); + GM2LM(op, lm_op, sizeof(TEmbedxOp)); + for (int i = thread_id; i < per_thread_loop_count * nthreads; i += nthreads) { + int gm_offset = i * per_thread_per_loop_len; + if (gm_offset >= total_length) { + return; + } + + int len = min(per_thread_per_loop_len, total_length - gm_offset); + + GM2LM(total_dims + gm_offset, lm_total_dims, len * sizeof(int)); + GM2LM(key2slot + gm_offset, lm_key2slot, len * sizeof(int)); + GM2LM(d_res_idx + gm_offset, lm_res_idx, len * sizeof(int)); + + for (int j = 0; j < len; j++) { + // mfence(); + GM2LM(total_values + lm_res_idx[j] * pull_float_num, + lm_total_values + j * pull_float_num, + pull_float_num * sizeof(float)); + // cvm offset + for (int k = 0; k < cvm_offset; ++k) { + //TODO:consider xpu_value[slot_id]==nullptr? + if (sm_dst_vals_ptr[lm_key2slot[j]] != 0) { + lm_dst_vals[j * hidden_size + k] = lm_total_values[j * pull_float_num + lm_info[0].show + skip_offset + k]; + } + if (sm_dst_expand_vals_ptr[lm_key2slot[j]] != 0) { + lm_dst_expand_vals[j * (hidden_size + expand_embed_dim) + k] = lm_total_values[j * pull_float_num + lm_info[0].show + skip_offset + k]; + } + } + + // embedx + // embedx flags + expand flags && *(keys[x] + y) != 0 && *(keys[x] + y) + int embedx_size = *((int *)&(lm_total_values[j * pull_float_num + lm_info[0].embedx_size])); + // int embedx_size = 0; + // TODO: expand_size = expand_embed_dim? + int expand_size = *((int *)&(lm_total_values[j * pull_float_num + lm_info[0].expand_size])); + lm_total_dims[j] = static_cast(embedx_size > 0) | static_cast((expand_size > 0) << 1); + + if (sm_dst_vals_ptr[lm_key2slot[j]] != 0) { + for (int k = cvm_offset; k < cvm_offset + embedx_size; ++k) { + lm_op[0].copy(lm_dst_vals + j * hidden_size + k, + lm_total_values + j * pull_float_num + lm_info[0].embedx, + k - cvm_offset, + scale); + } + + for (int k = cvm_offset + embedx_size; k < hidden_size; ++k) { + lm_dst_vals[j * hidden_size + k] = 0; + } + } + + if (sm_dst_expand_vals_ptr[lm_key2slot[j]] != 0) { + for (int k = cvm_offset; k < cvm_offset + embedx_size; ++k) { + lm_op[0].copy(lm_dst_expand_vals + j * (hidden_size + expand_embed_dim) + k, + lm_total_values + j * pull_float_num + lm_info[0].embedx, + k - cvm_offset, + scale); + } + + for (int k = cvm_offset + embedx_size; k < hidden_size; ++k) { + lm_dst_expand_vals[j * (hidden_size + expand_embed_dim) + k] = 0; + } + } + + // expand + if (sm_dst_expand_vals_ptr[lm_key2slot[j]] == 0) { + continue; + } + + for (int k = hidden_size; k < hidden_size + expand_size; ++k) { + lm_op[0].copy(lm_dst_expand_vals + j * (hidden_size + expand_embed_dim) + k, + lm_total_values + j * pull_float_num + lm_info[0].expand, + k - hidden_size, + scale); + } + for (int k = hidden_size + expand_size; k < max_cols_num; ++k) { + lm_dst_expand_vals[j * (hidden_size + expand_embed_dim) + k] = 0; + } + } + mfence(); + + LM2GM(lm_total_dims, total_dims + gm_offset, len * sizeof(int)); + LM2GM(lm_dst_vals, ((__global_ptr__ float*)lm_dst_vals_ptr[0] + gm_offset * hidden_size), + len * hidden_size * sizeof(float)); + LM2GM(lm_dst_expand_vals, + ((__global_ptr__ float*)lm_dst_vals_ptr[0] + total_length * hidden_size + gm_offset * (hidden_size + expand_embed_dim)), + len * (hidden_size + expand_embed_dim) * sizeof(float)); mfence(); } } @@ -502,11 +784,13 @@ inline void FeaturePullCopyNNCross( const float scale, const boxps::FeaturePullOffset* info, int* total_dims, - float** xpu_values, // const std::vector& values, + float** xpu_values, const int* key2slot, - // uint64_t* total_keys_xpu,//useless + const int* d_res_idx, float* total_values_xpu, - const uint32_t* xpu_restore_idx, + const int* xpu_merged_idx, + const int* xpu_merged_offsets, + int merged_length, const int64_t* slot_lens, const int slot_num, const int total_length, @@ -543,39 +827,81 @@ inline void FeaturePullCopyNNCross( // total_values_xpu->(xpu_values[slot_id], total_dims[slot_id]) if (expand_only) { - PullCopyNNCross<<<8, 64, stream>>>(d_op, - scale, - info, - total_dims, - reinterpret_cast(d_xpu_values), - key2slot, - total_values_xpu, - xpu_restore_idx, - total_length, - (hidden_size + expand_embed_dim), - hidden_size, - expand_embed_dim, - pull_float_num, - skip_offset, - cvm_offset, - slot_num); + if (xpu_merged_idx == nullptr) { + PullCopyNNCross<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + reinterpret_cast(d_xpu_values), + key2slot, + total_values_xpu, + nullptr, + total_length, + (hidden_size + expand_embed_dim), + hidden_size, + expand_embed_dim, + pull_float_num, + skip_offset, + cvm_offset, + slot_num); + } else { + PullCopyNNCrossMerged<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + reinterpret_cast(d_xpu_values), + key2slot, + d_res_idx, + total_values_xpu, + xpu_merged_idx, + xpu_merged_offsets, + merged_length, + total_length, + (hidden_size + expand_embed_dim), + hidden_size, + expand_embed_dim, + pull_float_num, + skip_offset, + cvm_offset, + slot_num); + } } else { - PullCopyNNCrossWithEmb<<<8, 64, stream>>>(d_op, - scale, - info, - total_dims, - reinterpret_cast(d_xpu_values), - key2slot, - total_values_xpu, - xpu_restore_idx, - total_length, - (hidden_size + expand_embed_dim), - hidden_size, - expand_embed_dim, - pull_float_num, - skip_offset, - cvm_offset, - slot_num); + if (xpu_merged_idx == nullptr) { + PullCopyNNCrossWithEmb<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + reinterpret_cast(d_xpu_values), + key2slot, + total_values_xpu, + nullptr, + total_length, + (hidden_size + expand_embed_dim), + hidden_size, + expand_embed_dim, + pull_float_num, + skip_offset, + cvm_offset, + slot_num); + } else { + //CHECK(false) << "Copy for pull PullCopyNNCrossWithEmb Merged"; + PullCopyNNCrossWithEmbMerged<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + reinterpret_cast(d_xpu_values), + key2slot, + total_values_xpu, + d_res_idx, + total_length, + (hidden_size + expand_embed_dim), + hidden_size, + expand_embed_dim, + pull_float_num, + skip_offset, + cvm_offset, + slot_num); + } } xpu_free(d_xpu_values); xpu_wait(stream); @@ -595,7 +921,7 @@ __global__ void PullCopy(const TEmbedxOp* op, int* total_dims, float* dst_vals, float* total_values, - const uint32_t* restore_idx, + const int* restore_idx, const int total_length, const int hidden_size, const int pull_float_num, @@ -617,7 +943,7 @@ __global__ void PullCopy(const TEmbedxOp* op, __local__ float lm_total_values[buf_length * pull_float_num]; __local__ float lm_dst_vals[buf_length * hidden_size]; __local__ int lm_total_dims[buf_length]; - __local__ uint32_t lm_restore_idx[buf_length]; + __local__ int lm_restore_idx[buf_length]; __local__ boxps::FeaturePullOffset lm_info[1]; __local__ TEmbedxOp lm_op[1]; @@ -628,15 +954,19 @@ __global__ void PullCopy(const TEmbedxOp* op, if (gm_offset >= total_length) { return; } - //if(restore_idx != nullptr) { - // GM2LM(restore_idx + gm_offset, lm_restore_idx, per_thread_per_loop_len * sizeof(uint32_t)); - //} - //int pos = (restore_idx != nullptr) ? lm_restore_idx[gm_offset] : gm_offset; - //GM2LM(total_values + pos * pull_float_num, lm_total_values, per_thread_per_loop_len * pull_float_num * sizeof(float)); int len = min(per_thread_per_loop_len, total_length - gm_offset); - GM2LM(total_values + gm_offset * pull_float_num, lm_total_values, len * pull_float_num * sizeof(float)); - GM2LM(total_dims + gm_offset, lm_total_dims, len * sizeof(int)); + + if (restore_idx != nullptr) { + GM2LM(restore_idx + gm_offset, lm_restore_idx, len * sizeof(int)); + for (int j = 0; j < len; ++j) { + int gm_idx = lm_restore_idx[j]; + mfence(); + GM2LM(total_values + gm_idx * pull_float_num, lm_total_values + j * pull_float_num, pull_float_num * sizeof(float)); + } + } else { + GM2LM(total_values + gm_offset * pull_float_num, lm_total_values, len * pull_float_num * sizeof(float)); + } for (int j = 0; j < len; j++) { for (int k = 0; k < cvm_offset; ++k) { @@ -663,6 +993,97 @@ __global__ void PullCopy(const TEmbedxOp* op, } } +template +__global__ void PullCopyMerged(const TEmbedxOp* op, + const float scale, + const boxps::FeaturePullOffset* info, + int* total_dims, + float* dst_vals, + float* merged_values, + const int* merged_idx, + const int* merged_offsets, + const int total_length, + const int merged_length, + const int hidden_size, + const int pull_float_num, + const int skip_offset, + const int cvm_offset) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = cluster_id() * ncores + cid; + int nthreads = cluster_num() * ncores; + + const int buf_length = 30; + const int same_key_idx_buf_len = 30; + int per_thread_len = roundup_div(merged_length, nthreads); + int per_thread_loop_count = roundup_div(per_thread_len, buf_length); + int per_thread_per_loop_len = roundup_div(per_thread_len, per_thread_loop_count); + + __local__ float lm_merged_values[buf_length * pull_float_num]; + __local__ float lm_dst_vals[buf_length * hidden_size]; + + __local__ int lm_total_dims[buf_length]; + __local__ int lm_same_key_idx_buf[same_key_idx_buf_len]; + __local__ int lm_merged_offset[buf_length + 1]; + + __local__ boxps::FeaturePullOffset lm_info[1]; + __local__ TEmbedxOp lm_op[1]; + + GM2LM(info, lm_info, sizeof(boxps::FeaturePullOffset)); + GM2LM(op, lm_op, sizeof(TEmbedxOp)); + for (int i = thread_id; i < per_thread_loop_count * nthreads; i += nthreads) { + int gm_offset = i * per_thread_per_loop_len; + if (gm_offset >= merged_length) { + return; + } + + int len = min(per_thread_per_loop_len, merged_length - gm_offset); + GM2LM(merged_values + gm_offset * pull_float_num, lm_merged_values, len * pull_float_num * sizeof(float)); + int idx_copy_len = len + 1; + if (gm_offset + len == merged_length) { // last + idx_copy_len = len; + lm_merged_offset[len] = total_length; + } + GM2LM(merged_offsets + gm_offset, lm_merged_offset, idx_copy_len * sizeof(int)); + + for (int j = 0; j < len; j++) { + // fill local_dst_value from pull_val of each dedup val + for (int k = 0; k < cvm_offset; ++k) { // cvm + lm_dst_vals[j * hidden_size + k] = lm_merged_values[j * pull_float_num + lm_info[0].show + skip_offset + k]; + } + + int embedx_size = *((int *)&(lm_merged_values[j * pull_float_num + lm_info[0].embedx_size])); + lm_total_dims[j] = static_cast(embedx_size > 0); + for (int k = 0; k < embedx_size; ++k) { // embedx + lm_op[0].copy(lm_dst_vals + j * hidden_size + cvm_offset + k, + lm_merged_values + j * pull_float_num + lm_info[0].embedx, + k, + scale); + } + int dim_size = hidden_size - cvm_offset; + for (int k = embedx_size; k < dim_size; ++k) { + lm_dst_vals[j * hidden_size + cvm_offset + k] = 0; + } + + // fill each undedup val + int same_key_len = lm_merged_offset[j + 1] - lm_merged_offset[j]; + for (int k = 0; k < same_key_len; k += same_key_idx_buf_len) { // handle each same key of lm_dst_vals[j] + int handle_len = min(same_key_idx_buf_len, same_key_len - k); + GM2LM(merged_idx + lm_merged_offset[j] + k, lm_same_key_idx_buf, handle_len * sizeof(int)); + for (int l = 0; l < handle_len; ++l) { + LM2GM(&lm_dst_vals[j * hidden_size], dst_vals + lm_same_key_idx_buf[l] * hidden_size, hidden_size * sizeof(float)); + LM2GM(&lm_total_dims[j], total_dims + lm_same_key_idx_buf[l], sizeof(int)); + } + } + + } + mfence(); + } +} + template inline void FeaturePullCopy(const paddle::platform::Place& place, const TEmbedxOp* op, @@ -672,7 +1093,9 @@ inline void FeaturePullCopy(const paddle::platform::Place& place, float** xpu_values,// const std::vector& values, uint64_t* total_keys_xpu, float* total_values_xpu, - const uint32_t* xpu_restore_idx, + const int* xpu_merged_idx, + const int* xpu_merged_offsets, + int merged_length, const int64_t* slot_lens, const int slot_num, const int total_length, @@ -702,19 +1125,35 @@ inline void FeaturePullCopy(const paddle::platform::Place& place, break; } } - // total_values_xpu->(xpu_values[slot_id], total_dims[slot_id]) - PullCopy<<<8, 64, stream>>>(d_op, - scale, - info, - total_dims, - real_dst_vals, - total_values_xpu, - xpu_restore_idx, - total_length, - hidden_size, - pull_float_num, - skip_offset, - cvm_offset); + if (xpu_merged_idx == nullptr) { + PullCopy<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + real_dst_vals, + total_values_xpu, + nullptr, + total_length, + hidden_size, + pull_float_num, + skip_offset, + cvm_offset); + } else { + PullCopyMerged<<<8, 64, stream>>>(d_op, + scale, + info, + total_dims, + real_dst_vals, + total_values_xpu, + xpu_merged_idx, + xpu_merged_offsets, + total_length, + merged_length, + hidden_size, + pull_float_num, + skip_offset, + cvm_offset); + } xpu_wait(stream); #ifdef TRACE_PROFILE TRACE_SCOPE_END("PullCopy", ); @@ -733,15 +1172,15 @@ void BoxWrapperKernel::CopyForPull( const paddle::platform::Place& place, uint64_t** gpu_keys, float** xpu_values, void* total_values_xpu, boxps::FeaturePullOffset* pull_offset, const int64_t* slot_lens, - const int slot_num, const int* key2slot, const int hidden_size, + const int slot_num, const int* key2slot, const int* d_res_idx, const int hidden_size, const int expand_embed_dim, const int64_t total_length, int* total_dims, - const int skip_offset, bool expand_only, const uint32_t* xpu_restore_idx) { - CHECK(xpu_restore_idx == nullptr) << "Not Supported yet"; + const int skip_offset, bool expand_only, const int* xpu_merged_idx, + const int* xpu_merged_offsets, const int merged_length) { uint64_t* total_keys_xpu = nullptr; const int cvm_offset = cvm_offset_ - skip_offset; if (pull_info_.is_quant) { EmbedxQuantOp op; - if(expand_embed_dim > 0 && pull_info_.expand_size > 0) {//nncross + if(expand_embed_dim > 0 && pull_info_.expand_size > 0) { //nncross FeaturePullCopyNNCross(place, &op, pull_embedx_scale_, @@ -749,9 +1188,11 @@ void BoxWrapperKernel::CopyForPull( total_dims, xpu_values, key2slot, - // total_keys_xpu,//useless + d_res_idx, (float*)total_values_xpu, - xpu_restore_idx, + xpu_merged_idx, + xpu_merged_offsets, + merged_length, slot_lens, slot_num, (int)total_length, @@ -761,9 +1202,6 @@ void BoxWrapperKernel::CopyForPull( skip_offset, cvm_offset, expand_only - // embedx_dim_,//useless - // gpu_keys,//useless - // //useless ); } else if (pull_info_.expand_size < 0 && expand_embed_dim == cvm_offset + expand_embed_dim_ && @@ -786,7 +1224,9 @@ void BoxWrapperKernel::CopyForPull( xpu_values, total_keys_xpu, (float*)total_values_xpu, - xpu_restore_idx, + xpu_merged_idx, + xpu_merged_offsets, + merged_length, slot_lens, slot_num, (int)total_length, @@ -805,7 +1245,9 @@ void BoxWrapperKernel::CopyForPull( xpu_values, total_keys_xpu, (float*)total_values_xpu, - xpu_restore_idx, + xpu_merged_idx, + xpu_merged_offsets, + merged_length, slot_lens, slot_num, total_length,