From c355740ef69209b4cd60d1c0eab188796eb97498 Mon Sep 17 00:00:00 2001 From: Jeff Olivier Date: Wed, 12 Jun 2024 12:20:30 -0600 Subject: [PATCH] Backports to address b/336317519 DAOS-14679 pool: Report on stopping sp_stopping (#14374) DAOS-15145 pool: add pool collective function (#13764) *DAOS-14105 object: collectively punch object (#13493) * partial backport, just the bitmap function Required-githooks: true Change-Id: I2b21b8121cbdecc79ae49a464a42b1d47fb9be10 Signed-off-by: Jeff Olivier --- src/container/container_iv.c | 2 +- src/container/srv_container.c | 5 +- src/container/srv_internal.h | 3 +- src/container/srv_target.c | 62 ++++--- src/dtx/dtx_resync.c | 4 +- src/engine/ult.c | 64 ++++++- src/include/daos_srv/container.h | 3 +- src/include/daos_srv/daos_engine.h | 19 +- src/include/daos_srv/pool.h | 20 +- src/mgmt/rpc.h | 33 +--- src/mgmt/srv.c | 3 + src/mgmt/srv_internal.h | 2 + src/mgmt/srv_target.c | 15 +- src/object/srv_obj_migrate.c | 9 +- src/pool/srv_iv.c | 4 +- src/pool/srv_target.c | 281 ++++++++++++++++++----------- src/rebuild/scan.c | 8 +- src/tests/suite/daos_test_common.c | 2 +- 18 files changed, 353 insertions(+), 186 deletions(-) diff --git a/src/container/container_iv.c b/src/container/container_iv.c index 2ccb5b75d4b..24c59d0f504 100644 --- a/src/container/container_iv.c +++ b/src/container/container_iv.c @@ -652,7 +652,7 @@ cont_iv_ent_update(struct ds_iv_entry *entry, struct ds_iv_key *key, } if (entry->iv_class->iv_class_id == IV_CONT_CAPA && !uuid_is_null(civ_key->cont_uuid)) { - rc = ds_cont_tgt_close(civ_key->cont_uuid); + rc = ds_cont_tgt_close(entry->ns->iv_pool_uuid, civ_key->cont_uuid); if (rc) D_GOTO(out, rc); } diff --git a/src/container/srv_container.c b/src/container/srv_container.c index 4e6c99c3e7f..c1257507434 100644 --- a/src/container/srv_container.c +++ b/src/container/srv_container.c @@ -1801,8 +1801,9 @@ ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid, uuid_copy(arg.cont_uuid, cont_uuid); arg.min_eph = eph; - rc = dss_task_collective(cont_refresh_vos_agg_eph_one, &arg, - DSS_ULT_FL_PERIODIC); + rc = ds_pool_task_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_refresh_vos_agg_eph_one, &arg, DSS_ULT_FL_PERIODIC); return rc; } diff --git a/src/container/srv_internal.h b/src/container/srv_internal.h index 4c9c9f52294..0df756431ce 100644 --- a/src/container/srv_internal.h +++ b/src/container/srv_internal.h @@ -257,7 +257,8 @@ int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid, int ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid, uint64_t *snapshots, int snap_count); int ds_cont_tgt_snapshots_refresh(uuid_t pool_uuid, uuid_t cont_uuid); -int ds_cont_tgt_close(uuid_t cont_hdl_uuid); +int + ds_cont_tgt_close(uuid_t pool_uuid, uuid_t cont_hdl_uuid); int ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid, daos_epoch_t eph); int ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop); diff --git a/src/container/srv_target.c b/src/container/srv_target.c index cabbea456ae..2d5ef014627 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -1256,7 +1256,9 @@ ds_cont_tgt_destroy(uuid_t pool_uuid, uuid_t cont_uuid) cont_iv_entry_delete(pool->sp_iv_ns, pool_uuid, cont_uuid); ds_pool_put(pool); - rc = dss_thread_collective(cont_child_destroy_one, &in, 0); + rc = ds_pool_thread_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_child_destroy_one, &in, 0); if (rc) D_ERROR(DF_UUID"/"DF_UUID" container child destroy failed: %d\n", DP_UUID(pool_uuid), DP_UUID(cont_uuid), rc); @@ -1631,9 +1633,7 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid, uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas, uint32_t status_pm_ver) { - struct cont_tgt_open_arg arg = { 0 }; - struct dss_coll_ops coll_ops = { 0 }; - struct dss_coll_args coll_args = { 0 }; + struct cont_tgt_open_arg arg = {0}; int rc; uuid_copy(arg.pool_uuid, pool_uuid); @@ -1647,22 +1647,9 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid, D_DEBUG(DB_TRACE, "open pool/cont/hdl "DF_UUID"/"DF_UUID"/"DF_UUID"\n", DP_UUID(pool_uuid), DP_UUID(cont_uuid), DP_UUID(cont_hdl_uuid)); - /* collective operations */ - coll_ops.co_func = cont_open_one; - coll_args.ca_func_args = &arg; - - /* setting aggregator args */ - rc = ds_pool_get_failed_tgt_idx(pool_uuid, &coll_args.ca_exclude_tgts, - &coll_args.ca_exclude_tgts_cnt); - if (rc) { - D_ERROR(DF_UUID "failed to get index : rc "DF_RC"\n", - DP_UUID(pool_uuid), DP_RC(rc)); - return rc; - } - - rc = dss_thread_collective_reduce(&coll_ops, &coll_args, 0); - D_FREE(coll_args.ca_exclude_tgts); - + rc = ds_pool_thread_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_open_one, &arg, 0); if (rc != 0) { /* Once it exclude the target from the pool, since the target * might still in the cart group, so IV cont open might still @@ -1732,12 +1719,14 @@ cont_close_one_hdl(void *vin) } int -ds_cont_tgt_close(uuid_t hdl_uuid) +ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid) { struct coll_close_arg arg; uuid_copy(arg.uuid, hdl_uuid); - return dss_thread_collective(cont_close_one_hdl, &arg, 0); + return ds_pool_thread_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_close_one_hdl, &arg, 0); } struct xstream_cont_query { @@ -1840,6 +1829,7 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc) struct dss_coll_ops coll_ops; struct dss_coll_args coll_args = { 0 }; struct xstream_cont_query pack_args; + struct ds_pool_hdl *pool_hdl; out->tqo_hae = DAOS_EPOCH_MAX; @@ -1858,9 +1848,17 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc) coll_args.ca_aggregator = &pack_args; coll_args.ca_func_args = &coll_args.ca_stream_args; - rc = dss_task_collective_reduce(&coll_ops, &coll_args, 0); + pool_hdl = ds_pool_hdl_lookup(in->tqi_pool_uuid); + if (pool_hdl == NULL) + D_GOTO(out, rc = -DER_NO_HDL); + rc = ds_pool_task_collective_reduce(pool_hdl->sph_pool->sp_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + &coll_ops, &coll_args, 0); D_ASSERTF(rc == 0, ""DF_RC"\n", DP_RC(rc)); + + ds_pool_hdl_put(pool_hdl); +out: out->tqo_hae = MIN(out->tqo_hae, pack_args.xcq_hae); out->tqo_rc = (rc == 0 ? 0 : 1); @@ -1943,9 +1941,13 @@ ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid, uuid_copy(args.cont_uuid, cont_uuid); args.snap_count = snap_count; args.snapshots = snapshots; + D_DEBUG(DB_EPC, DF_UUID": refreshing snapshots %d\n", DP_UUID(cont_uuid), snap_count); - return dss_task_collective(cont_snap_update_one, &args, 0); + + return ds_pool_task_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_snap_update_one, &args, false); } void @@ -2032,7 +2034,9 @@ ds_cont_tgt_snapshot_notify_handler(crt_rpc_t *rpc) args.snap_opts = in->tsi_opts; args.oit_oid = in->tsi_oit_oid; - out->tso_rc = dss_thread_collective(cont_snap_notify_one, &args, 0); + out->tso_rc = ds_pool_thread_collective( + in->tsi_pool_uuid, PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_snap_notify_one, &args, 0); if (out->tso_rc != 0) D_ERROR(DF_CONT": Snapshot notify failed: "DF_RC"\n", DP_CONT(in->tsi_pool_uuid, in->tsi_cont_uuid), @@ -2077,7 +2081,9 @@ ds_cont_tgt_epoch_aggregate_handler(crt_rpc_t *rpc) if (out->tao_rc != 0) return; - rc = dss_task_collective(cont_epoch_aggregate_one, NULL, 0); + rc = ds_pool_task_collective(in->tai_pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_epoch_aggregate_one, NULL, 0); if (rc != 0) D_ERROR(DF_CONT": Aggregation failed: "DF_RC"\n", DP_CONT(in->tai_pool_uuid, in->tai_cont_uuid), @@ -2525,7 +2531,9 @@ ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop) uuid_copy(arg.cpa_cont_uuid, cont_uuid); uuid_copy(arg.cpa_pool_uuid, pool_uuid); arg.cpa_prop = prop; - rc = dss_task_collective(cont_child_prop_update, &arg, 0); + rc = ds_pool_task_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + cont_child_prop_update, &arg, 0); if (rc) D_ERROR("collective cont_write_data_turn_off failed, "DF_RC"\n", DP_RC(rc)); diff --git a/src/dtx/dtx_resync.c b/src/dtx/dtx_resync.c index bfa083e06b0..3a598a7da2e 100644 --- a/src/dtx/dtx_resync.c +++ b/src/dtx/dtx_resync.c @@ -797,7 +797,9 @@ dtx_resync_ult(void *data) if (DAOS_FAIL_CHECK(DAOS_DTX_RESYNC_DELAY)) dss_sleep(5 * 1000); - rc = dss_thread_collective(dtx_resync_one, arg, DSS_ULT_DEEP_STACK); + rc = ds_pool_thread_collective(arg->pool_uuid, + PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW, + dtx_resync_one, arg, DSS_ULT_DEEP_STACK); if (rc) { /* If dtx resync fails, then let's still update * sp_dtx_resync_version, so the rebuild can go ahead, diff --git a/src/engine/ult.c b/src/engine/ult.c index 8056edbf074..d3db8841c78 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -97,6 +97,9 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops, int xs_nr; int rc; int tid; + int tgt_id = dss_get_module_info()->dmi_tgt_id; + uint32_t bm_len; + bool self = false; if (ops == NULL || args == NULL || ops->co_func == NULL) { D_DEBUG(DB_MD, "mandatory args missing dss_collective_reduce"); @@ -115,6 +118,7 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops, return -DER_CANCELED; } + bm_len = args->ca_tgt_bitmap_sz << 3; xs_nr = dss_tgt_nr; stream_args = &args->ca_stream_args; D_ALLOC_ARRAY(stream_args->csa_streams, xs_nr); @@ -156,19 +160,18 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops, stream = &stream_args->csa_streams[tid]; stream->st_coll_args = &carg; - if (args->ca_exclude_tgts_cnt) { - int i; - - for (i = 0; i < args->ca_exclude_tgts_cnt; i++) - if (args->ca_exclude_tgts[i] == tid) - break; - - if (i < args->ca_exclude_tgts_cnt) { + if (args->ca_tgt_bitmap != NULL) { + if (tid >= bm_len || isclr(args->ca_tgt_bitmap, tid)) { D_DEBUG(DB_TRACE, "Skip tgt %d\n", tid); rc = ABT_future_set(future, (void *)stream); D_ASSERTF(rc == ABT_SUCCESS, "%d\n", rc); continue; } + + if (tgt_id == tid && flags & DSS_USE_CURRENT_ULT) { + self = true; + continue; + } } dx = dss_get_xstream(DSS_MAIN_XS_ID(tid)); @@ -209,6 +212,12 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops, } } + if (self) { + stream = &stream_args->csa_streams[tgt_id]; + stream->st_coll_args = &carg; + collective_func(stream); + } + ABT_future_wait(future); rc = aggregator.at_rc; @@ -350,6 +359,45 @@ sched_ult2xs_multisocket(int xs_type, int tgt_id) return target; } +int +dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap, + uint32_t *bitmap_sz) +{ + uint8_t *bitmap = NULL; + uint32_t size = ((dss_tgt_nr - 1) >> 3) + 1; + uint32_t bits = size << 3; + int rc = 0; + int i; + + D_ALLOC(bitmap, size); + if (bitmap == NULL) + D_GOTO(out, rc = -DER_NOMEM); + + for (i = 0; i < size; i++) + bitmap[i] = 0xff; + + for (i = dss_tgt_nr; i < bits; i++) + clrbit(bitmap, i); + + if (exclude_tgts == NULL) + goto out; + + for (i = 0; i < exclude_cnt; i++) { + D_ASSERT(exclude_tgts[i] < dss_tgt_nr); + clrbit(bitmap, exclude_tgts[i]); + } + +out: + if (rc == 0) { + *p_bitmap = bitmap; + *bitmap_sz = size; + } else { + D_ERROR("Failed to build bitmap for collective task: " DF_RC "\n", DP_RC(rc)); + } + + return rc; +} + /* ============== ULT create functions =================================== */ static inline int diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 8f860e1214e..10feb7e07e3 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -38,7 +38,8 @@ int ds_cont_list(uuid_t pool_uuid, struct daos_pool_cont_info **conts, uint64_t int ds_cont_filter(uuid_t pool_uuid, daos_pool_cont_filter_t *filt, struct daos_pool_cont_info2 **conts, uint64_t *ncont); int ds_cont_upgrade(uuid_t pool_uuid, struct cont_svc *svc); -int ds_cont_tgt_close(uuid_t hdl_uuid); +int + ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid); int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid, uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas, uint32_t status_pm_ver); diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index a7588e36e3f..182f6aa209c 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -419,9 +419,11 @@ int dss_parameters_set(unsigned int key_id, uint64_t value); enum dss_ult_flags { /* Periodically created ULTs */ - DSS_ULT_FL_PERIODIC = (1 << 0), + DSS_ULT_FL_PERIODIC = (1 << 0), /* Use DSS_DEEP_STACK_SZ as the stack size */ - DSS_ULT_DEEP_STACK = (1 << 1), + DSS_ULT_DEEP_STACK = (1 << 1), + /* Use current ULT (instead of creating new one) for the task. */ + DSS_USE_CURRENT_ULT = (1 << 2), }; int dss_ult_create(void (*func)(void *), void *arg, int xs_type, int tgt_id, @@ -491,8 +493,14 @@ struct dss_coll_args { /** Arguments for dss_collective func (Mandatory) */ void *ca_func_args; void *ca_aggregator; - int *ca_exclude_tgts; - unsigned int ca_exclude_tgts_cnt; + /* Specify on which targets to execute the task. */ + uint8_t *ca_tgt_bitmap; + /* + * The size (in byte) of ca_tgt_bitmap. It may be smaller than dss_tgt_nr if only some + * VOS targets are involved. It also may be larger than dss_tgt_nr if dss_tgt_nr is not + * 2 ^ n aligned. + */ + uint32_t ca_tgt_bitmap_sz; /** Stream arguments for all streams */ struct dss_coll_stream_args ca_stream_args; }; @@ -514,6 +522,9 @@ dss_thread_collective_reduce(struct dss_coll_ops *ops, unsigned int flags); int dss_task_collective(int (*func)(void *), void *arg, unsigned int flags); int dss_thread_collective(int (*func)(void *), void *arg, unsigned int flags); +int +dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap, + uint32_t *bitmap_sz); /** * Loaded module management metholds diff --git a/src/include/daos_srv/pool.h b/src/include/daos_srv/pool.h index 576e547ef10..208e7520e42 100644 --- a/src/include/daos_srv/pool.h +++ b/src/include/daos_srv/pool.h @@ -207,7 +207,8 @@ int ds_pool_tgt_map_update(struct ds_pool *pool, struct pool_buf *buf, unsigned int map_version); int ds_pool_start(uuid_t uuid); -void ds_pool_stop(uuid_t uuid); +int + ds_pool_stop(uuid_t uuid); int ds_pool_extend(uuid_t pool_uuid, int ntargets, const d_rank_list_t *rank_list, int ndomains, const uint32_t *domains, d_rank_list_t *svc_ranks); int ds_pool_target_update_state(uuid_t pool_uuid, d_rank_list_t *ranks, @@ -320,6 +321,23 @@ int ds_pool_tgt_discard(uuid_t pool_uuid, uint64_t epoch); int ds_pool_mark_upgrade_completed(uuid_t pool_uuid, int ret); +struct dss_coll_args; +struct dss_coll_ops; + +int +ds_pool_thread_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, + struct dss_coll_ops *coll_ops, struct dss_coll_args *coll_args, + uint32_t flags); +int +ds_pool_task_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, struct dss_coll_ops *coll_ops, + struct dss_coll_args *coll_args, uint32_t flags); +int +ds_pool_thread_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg, + uint32_t flags); +int +ds_pool_task_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg, + uint32_t flags); + /** * Verify if pool status satisfy Redundancy Factor requirement, by checking * pool map device status. diff --git a/src/mgmt/rpc.h b/src/mgmt/rpc.h index f10840cc6c9..76f3e6532a0 100644 --- a/src/mgmt/rpc.h +++ b/src/mgmt/rpc.h @@ -44,29 +44,16 @@ X(MGMT_GET_BS_STATE, \ 0, &CQF_mgmt_get_bs_state, \ ds_mgmt_hdlr_get_bs_state, NULL) -#define MGMT_PROTO_SRV_RPC_LIST \ - X(MGMT_TGT_CREATE, \ - 0, &CQF_mgmt_tgt_create, \ - ds_mgmt_hdlr_tgt_create, \ - &ds_mgmt_hdlr_tgt_create_co_ops), \ - X(MGMT_TGT_DESTROY, \ - 0, &CQF_mgmt_tgt_destroy, \ - ds_mgmt_hdlr_tgt_destroy, NULL), \ - X(MGMT_TGT_PARAMS_SET, \ - 0, &CQF_mgmt_tgt_params_set, \ - ds_mgmt_tgt_params_set_hdlr, NULL), \ - X(MGMT_TGT_PROFILE, \ - 0, &CQF_mgmt_profile, \ - ds_mgmt_tgt_profile_hdlr, NULL), \ - X(MGMT_TGT_MAP_UPDATE, \ - 0, &CQF_mgmt_tgt_map_update, \ - ds_mgmt_hdlr_tgt_map_update, \ - &ds_mgmt_hdlr_tgt_map_update_co_ops), \ - X(MGMT_TGT_MARK, \ - 0, &CQF_mgmt_mark, \ - ds_mgmt_tgt_mark_hdlr, NULL) - - +#define MGMT_PROTO_SRV_RPC_LIST \ + X(MGMT_TGT_CREATE, 0, &CQF_mgmt_tgt_create, ds_mgmt_hdlr_tgt_create, \ + &ds_mgmt_hdlr_tgt_create_co_ops), \ + X(MGMT_TGT_DESTROY, 0, &CQF_mgmt_tgt_destroy, ds_mgmt_hdlr_tgt_destroy, \ + &ds_mgmt_hdlr_tgt_destroy_co_ops), \ + X(MGMT_TGT_PARAMS_SET, 0, &CQF_mgmt_tgt_params_set, ds_mgmt_tgt_params_set_hdlr, NULL), \ + X(MGMT_TGT_PROFILE, 0, &CQF_mgmt_profile, ds_mgmt_tgt_profile_hdlr, NULL), \ + X(MGMT_TGT_MAP_UPDATE, 0, &CQF_mgmt_tgt_map_update, ds_mgmt_hdlr_tgt_map_update, \ + &ds_mgmt_hdlr_tgt_map_update_co_ops), \ + X(MGMT_TGT_MARK, 0, &CQF_mgmt_mark, ds_mgmt_tgt_mark_hdlr, NULL) /* Define for RPC enum population below */ #define X(a, b, c, d, e) a diff --git a/src/mgmt/srv.c b/src/mgmt/srv.c index ed00b2c0de5..527b73d97f6 100644 --- a/src/mgmt/srv.c +++ b/src/mgmt/srv.c @@ -33,6 +33,9 @@ static struct crt_corpc_ops ds_mgmt_hdlr_tgt_create_co_ops = { .co_post_reply = ds_mgmt_tgt_create_post_reply, }; +static struct crt_corpc_ops ds_mgmt_hdlr_tgt_destroy_co_ops = {.co_aggregate = + ds_mgmt_tgt_destroy_aggregator}; + static struct crt_corpc_ops ds_mgmt_hdlr_tgt_map_update_co_ops = { .co_aggregate = ds_mgmt_tgt_map_update_aggregator, .co_pre_forward = ds_mgmt_tgt_map_update_pre_forward, diff --git a/src/mgmt/srv_internal.h b/src/mgmt/srv_internal.h index b22341fb35b..4ed1ef901e8 100644 --- a/src/mgmt/srv_internal.h +++ b/src/mgmt/srv_internal.h @@ -131,6 +131,8 @@ int ds_mgmt_tgt_setup(void); void ds_mgmt_tgt_cleanup(void); void ds_mgmt_hdlr_tgt_create(crt_rpc_t *rpc_req); void ds_mgmt_hdlr_tgt_destroy(crt_rpc_t *rpc_req); +int + ds_mgmt_tgt_destroy_aggregator(crt_rpc_t *source, crt_rpc_t *result, void *priv); int ds_mgmt_tgt_create_aggregator(crt_rpc_t *source, crt_rpc_t *result, void *priv); int ds_mgmt_tgt_create_post_reply(crt_rpc_t *rpc, void *priv); diff --git a/src/mgmt/srv_target.c b/src/mgmt/srv_target.c index c8307df90ec..76282ed87ae 100644 --- a/src/mgmt/srv_target.c +++ b/src/mgmt/srv_target.c @@ -1326,7 +1326,9 @@ ds_mgmt_hdlr_tgt_destroy(crt_rpc_t *td_req) goto out; } - ds_pool_stop(td_in->td_pool_uuid); + rc = ds_pool_stop(td_in->td_pool_uuid); + if (rc != 0) + goto out; /** generate path to the target directory */ rc = ds_mgmt_tgt_file(td_in->td_pool_uuid, NULL, NULL, &path); @@ -1371,6 +1373,17 @@ ds_mgmt_hdlr_tgt_destroy(crt_rpc_t *td_req) crt_reply_send(td_req); } +int +ds_mgmt_tgt_destroy_aggregator(crt_rpc_t *source, crt_rpc_t *result, void *priv) +{ + struct mgmt_tgt_destroy_out *out_source = crt_reply_get(source); + struct mgmt_tgt_destroy_out *out_result = crt_reply_get(result); + + if (out_source->td_rc != 0) + out_result->td_rc = out_source->td_rc; + return 0; +} + /** * Set parameter on a single target. */ diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 62efb663be8..d14f4bb4379 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -3150,7 +3150,10 @@ ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generat uuid_copy(arg.pool_uuid, pool->sp_uuid); arg.version = version; arg.generation = generation; - rc = dss_thread_collective(migrate_fini_one_ult, &arg, 0); + + rc = ds_pool_thread_collective(pool->sp_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + migrate_fini_one_ult, &arg, 0); if (rc) D_ERROR(DF_UUID" migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); @@ -3868,7 +3871,9 @@ ds_migrate_query_status(uuid_t pool_uuid, uint32_t ver, unsigned int generation, if (rc != ABT_SUCCESS) D_GOTO(out, rc); - rc = dss_thread_collective(migrate_check_one, &arg, 0); + rc = ds_pool_thread_collective(pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + migrate_check_one, &arg, 0); if (rc) D_GOTO(out, rc); diff --git a/src/pool/srv_iv.c b/src/pool/srv_iv.c index 4f1ba915ffd..8fd1fa90f75 100644 --- a/src/pool/srv_iv.c +++ b/src/pool/srv_iv.c @@ -904,7 +904,7 @@ ds_pool_iv_refresh_hdl(struct ds_pool *pool, struct pool_iv_hdl *pih) if (uuid_compare(pool->sp_srv_cont_hdl, pih->pih_cont_hdl) == 0) return 0; - ds_cont_tgt_close(pool->sp_srv_cont_hdl); + ds_cont_tgt_close(pool->sp_uuid, pool->sp_srv_cont_hdl); uuid_clear(pool->sp_srv_cont_hdl); uuid_clear(pool->sp_srv_pool_hdl); } @@ -935,7 +935,7 @@ pool_iv_ent_invalid(struct ds_iv_entry *entry, struct ds_iv_key *key) rc = 0; return rc; } - ds_cont_tgt_close(iv_entry->piv_hdl.pih_cont_hdl); + ds_cont_tgt_close(entry->ns->iv_pool_uuid, iv_entry->piv_hdl.pih_cont_hdl); uuid_clear(pool->sp_srv_cont_hdl); uuid_clear(pool->sp_srv_pool_hdl); uuid_clear(iv_entry->piv_hdl.pih_cont_hdl); diff --git a/src/pool/srv_target.c b/src/pool/srv_target.c index 696f32c8b58..11c7eb2abc3 100644 --- a/src/pool/srv_target.c +++ b/src/pool/srv_target.c @@ -561,6 +561,39 @@ pool_child_delete_one(void *uuid) return 0; } +static void +pool_child_delete_all(struct ds_pool *pool) +{ + int rc; + + rc = ds_pool_thread_collective(pool->sp_uuid, 0, pool_child_delete_one, pool->sp_uuid, 0); + if (rc == 0) + D_INFO(DF_UUID ": deleted\n", DP_UUID(pool->sp_uuid)); + else if (rc == -DER_CANCELED) + D_INFO(DF_UUID ": no ESs\n", DP_UUID(pool->sp_uuid)); + else + DL_ERROR(rc, DF_UUID ": failed to delete ES pool caches", DP_UUID(pool->sp_uuid)); +} + +static int +pool_child_add_all(struct ds_pool *pool) +{ + struct pool_child_lookup_arg collective_arg = { + .pla_pool = pool, .pla_uuid = pool->sp_uuid, .pla_map_version = pool->sp_map_version}; + int rc; + + rc = ds_pool_thread_collective(pool->sp_uuid, 0, pool_child_add_one, &collective_arg, + DSS_ULT_DEEP_STACK); + if (rc == 0) { + D_INFO(DF_UUID ": added\n", DP_UUID(pool->sp_uuid)); + } else { + DL_ERROR(rc, DF_UUID ": failed to add ES pool caches", DP_UUID(pool->sp_uuid)); + pool_child_delete_all(pool); + return rc; + } + return 0; +} + /* ds_pool ********************************************************************/ static struct daos_lru_cache *pool_cache; @@ -571,16 +604,6 @@ pool_obj(struct daos_llink *llink) return container_of(llink, struct ds_pool, sp_entry); } -static inline void -pool_put_sync(void *args) -{ - struct ds_pool *pool = args; - - D_ASSERT(pool != NULL); - D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); - daos_lru_ref_release(pool_cache, &pool->sp_entry); -} - struct ds_pool_create_arg { uint32_t pca_map_version; }; @@ -590,8 +613,7 @@ pool_alloc_ref(void *key, unsigned int ksize, void *varg, struct daos_llink **link) { struct ds_pool_create_arg *arg = varg; - struct ds_pool *pool; - struct pool_child_lookup_arg collective_arg; + struct ds_pool *pool; char group_id[DAOS_UUID_STR_SIZE]; struct dss_module_info *info = dss_get_module_info(); unsigned int iv_ns_id; @@ -664,21 +686,9 @@ pool_alloc_ref(void *key, unsigned int ksize, void *varg, goto err_group; } - collective_arg.pla_pool = pool; - collective_arg.pla_uuid = key; - collective_arg.pla_map_version = arg->pca_map_version; - rc = dss_thread_collective(pool_child_add_one, &collective_arg, DSS_ULT_DEEP_STACK); - if (rc != 0) { - D_ERROR(DF_UUID": failed to add ES pool caches: "DF_RC"\n", - DP_UUID(key), DP_RC(rc)); - goto err_iv_ns; - } - *link = &pool->sp_entry; return 0; -err_iv_ns: - ds_iv_ns_put(pool->sp_iv_ns); err_group: rc_tmp = crt_group_secondary_destroy(pool->sp_group); if (rc_tmp != 0) @@ -712,13 +722,6 @@ pool_free_ref(struct daos_llink *llink) D_ASSERT(d_list_empty(&pool->sp_hdls)); - rc = dss_thread_collective(pool_child_delete_one, pool->sp_uuid, 0); - if (rc == -DER_CANCELED) - D_DEBUG(DB_MD, DF_UUID": no ESs\n", DP_UUID(pool->sp_uuid)); - else if (rc != 0) - D_ERROR(DF_UUID": failed to delete ES pool caches: "DF_RC"\n", - DP_UUID(pool->sp_uuid), DP_RC(rc)); - pl_map_disconnect(pool->sp_uuid); if (pool->sp_map != NULL) pool_map_decref(pool->sp_map); @@ -814,7 +817,7 @@ ds_pool_lookup(const uuid_t uuid, struct ds_pool **pool) if ((*pool)->sp_stopping) { D_DEBUG(DB_MD, DF_UUID": is in stopping\n", DP_UUID(uuid)); - pool_put_sync(*pool); + ds_pool_put(*pool); *pool = NULL; return -DER_SHUTDOWN; } @@ -833,31 +836,9 @@ ds_pool_get(struct ds_pool *pool) void ds_pool_put(struct ds_pool *pool) { - int rc; - - /* - * Someone has stopped the pool. Current user may be the one that is holding the last - * reference on the pool, then drop such reference will trigger pool_free_ref() as to - * stop related container that may wait current user (ULT) to exit. To avoid deadlock, - * let's use independent ULT to drop the reference asynchronously and make current ULT - * to go ahead. - * - * An example of the deadlock scenarios is something like that: - * - * cont_iv_prop_fetch_ult => ds_pool_put => pool_free_ref [WAIT]=> cont_child_stop => - * cont_stop_agg [WAIT]=> cont_agg_ult => ds_cont_csummer_init => ds_cont_get_props => - * cont_iv_prop_fetch [WAIT]=> cont_iv_prop_fetch_ult - */ - if (unlikely(pool->sp_stopping) && daos_lru_is_last_user(&pool->sp_entry)) { - rc = dss_ult_create(pool_put_sync, pool, DSS_XS_SELF, 0, 0, NULL); - if (unlikely(rc != 0)) { - D_ERROR("Failed to create ULT to async put ref on the pool "DF_UUID"\n", - DP_UUID(pool->sp_uuid)); - pool_put_sync(pool); - } - } else { - pool_put_sync(pool); - } + D_ASSERT(pool != NULL); + D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + daos_lru_ref_release(pool_cache, &pool->sp_entry); } void @@ -1018,12 +999,16 @@ ds_pool_start(uuid_t uuid) pool = pool_obj(llink); + rc = pool_child_add_all(pool); + if (rc != 0) + goto failure_pool; + rc = dss_ult_create(pool_fetch_hdls_ult, pool, DSS_XS_SYS, 0, 0, NULL); if (rc != 0) { D_ERROR(DF_UUID": failed to create fetch ult: %d\n", DP_UUID(uuid), rc); - D_GOTO(failure_pool, rc); + D_GOTO(failure_children, rc); } pool->sp_fetch_hdls = 1; @@ -1040,8 +1025,10 @@ ds_pool_start(uuid_t uuid) failure_ult: pool_fetch_hdls_ult_abort(pool); +failure_children: + pool_child_delete_all(pool); failure_pool: - pool_put_sync(pool); + ds_pool_put(pool); return rc; } @@ -1100,18 +1087,27 @@ ds_pool_stop_all_containers(struct ds_pool *pool) * Stop a pool. Must be called on the system xstream. Release the ds_pool * object reference held by ds_pool_start. Only for mgmt and pool modules. */ -void +int ds_pool_stop(uuid_t uuid) { struct ds_pool *pool; ds_pool_failed_remove(uuid); - ds_pool_lookup(uuid, &pool); - if (pool == NULL) - return; - D_ASSERT(!pool->sp_stopping); + ds_pool_lookup_internal(uuid, &pool); + if (pool == NULL) { + D_INFO(DF_UUID ": not found\n", DP_UUID(uuid)); + return 0; + } + if (pool->sp_stopping) { + int rc = -DER_AGAIN; + + DL_INFO(rc, DF_UUID ": already stopping", DP_UUID(uuid)); + ds_pool_put(pool); + return rc; + } pool->sp_stopping = 1; + D_INFO(DF_UUID ": stopping\n", DP_UUID(uuid)); pool_tgt_disconnect_all(pool); @@ -1127,9 +1123,19 @@ ds_pool_stop(uuid_t uuid) ds_rebuild_abort(pool->sp_uuid, -1, -1, -1); ds_migrate_stop(pool, -1, -1); + ds_pool_put(pool); /* held by ds_pool_start */ - pool_put_sync(pool); + + while (!daos_lru_is_last_user(&pool->sp_entry)) + dss_sleep(1000 /* ms */); + + D_INFO(DF_UUID ": completed reference wait\n", DP_UUID(uuid)); + + pool_child_delete_all(pool); + + ds_pool_put(pool); D_INFO(DF_UUID": pool stopped\n", DP_UUID(uuid)); + return 0; } /* ds_pool_hdl ****************************************************************/ @@ -1422,17 +1428,9 @@ pool_tgt_query(struct ds_pool *pool, struct daos_pool_space *ps) coll_args.ca_aggregator = &agg_arg; coll_args.ca_func_args = &coll_args.ca_stream_args; - rc = ds_pool_get_failed_tgt_idx(pool->sp_uuid, - &coll_args.ca_exclude_tgts, - &coll_args.ca_exclude_tgts_cnt); - if (rc) { - D_ERROR(DF_UUID": failed to get index : rc "DF_RC"\n", - DP_UUID(pool->sp_uuid), DP_RC(rc)); - return rc; - } - - rc = dss_thread_collective_reduce(&coll_ops, &coll_args, 0); - D_FREE(coll_args.ca_exclude_tgts); + rc = ds_pool_thread_collective_reduce(pool->sp_uuid, + PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW, + &coll_ops, &coll_args, 0); if (rc) { D_ERROR("Pool query on pool "DF_UUID" failed, "DF_RC"\n", DP_UUID(pool->sp_uuid), DP_RC(rc)); @@ -1707,7 +1705,9 @@ ds_pool_tgt_map_update(struct ds_pool *pool, struct pool_buf *buf, DP_UUID(pool->sp_uuid), map_version_before, map_version); pool->sp_map_version = map_version; - rc = dss_task_collective(update_child_map, pool, 0); + rc = ds_pool_task_collective(pool->sp_uuid, + PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW, + update_child_map, pool, 0); D_ASSERT(rc == 0); map_updated = true; @@ -1888,8 +1888,9 @@ ds_pool_tgt_prop_update(struct ds_pool *pool, struct pool_iv_prop *iv_prop) arg.uvp_checkpoint_props_changed = 1; } - ret = dss_thread_collective(update_vos_prop_on_targets, &arg, 0); - + ret = ds_pool_thread_collective(pool->sp_uuid, + PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW, + update_vos_prop_on_targets, &arg, 0); return ret; } @@ -2164,14 +2165,100 @@ pool_child_discard(void *data) return rc; } +static int +ds_pool_collective_reduce(uuid_t pool_uuid, uint32_t exclude_status, struct dss_coll_ops *coll_ops, + struct dss_coll_args *coll_args, uint32_t flags, bool thread) +{ + int *exclude_tgts = NULL; + uint32_t exclude_tgt_nr = 0; + int rc; + + if (exclude_status != 0) { + rc = ds_pool_get_tgt_idx_by_state(pool_uuid, exclude_status, &exclude_tgts, + &exclude_tgt_nr); + if (rc != 0) { + D_ERROR(DF_UUID "failed to get index : rc " DF_RC "\n", DP_UUID(pool_uuid), + DP_RC(rc)); + return rc; + } + + if (exclude_tgts != NULL) { + rc = dss_build_coll_bitmap(exclude_tgts, exclude_tgt_nr, + &coll_args->ca_tgt_bitmap, + &coll_args->ca_tgt_bitmap_sz); + if (rc != 0) + goto out; + } + } + + if (thread) + rc = dss_thread_collective_reduce(coll_ops, coll_args, flags); + else + rc = dss_task_collective_reduce(coll_ops, coll_args, flags); + + D_DEBUG(DB_MD, DF_UUID " collective: " DF_RC "", DP_UUID(pool_uuid), DP_RC(rc)); + +out: + if (coll_args->ca_tgt_bitmap) + D_FREE(coll_args->ca_tgt_bitmap); + if (exclude_tgts) + D_FREE(exclude_tgts); + + return rc; +} + +/* collective function over all target xstreams, exclude_status indicate which + * targets should be excluded during collective. + */ +static int +ds_pool_collective(uuid_t pool_uuid, uint32_t exclude_status, int (*coll_func)(void *), void *arg, + uint32_t flags, bool thread) +{ + struct dss_coll_ops coll_ops = {0}; + struct dss_coll_args coll_args = {0}; + + coll_ops.co_func = coll_func; + coll_args.ca_func_args = arg; + return ds_pool_collective_reduce(pool_uuid, exclude_status, &coll_ops, &coll_args, flags, + thread); +} + +int +ds_pool_thread_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, + struct dss_coll_ops *coll_ops, struct dss_coll_args *coll_args, + uint32_t flags) +{ + return ds_pool_collective_reduce(pool_uuid, ex_status, coll_ops, coll_args, flags, true); +} + +int +ds_pool_task_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, struct dss_coll_ops *coll_ops, + struct dss_coll_args *coll_args, uint32_t flags) +{ + return ds_pool_collective_reduce(pool_uuid, ex_status, coll_ops, coll_args, flags, false); +} + +int +ds_pool_thread_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg, + uint32_t flags) +{ + return ds_pool_collective(pool_uuid, ex_status, coll_func, arg, flags, true); +} + +int +ds_pool_task_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg, + uint32_t flags) +{ + return ds_pool_collective(pool_uuid, ex_status, coll_func, arg, flags, false); +} + /* Discard the objects by epoch in this pool */ static void ds_pool_tgt_discard_ult(void *data) { struct ds_pool *pool; - struct tgt_discard_arg *arg = data; - struct dss_coll_ops coll_ops = { 0 }; - struct dss_coll_args coll_args = { 0 }; + struct tgt_discard_arg *arg = data; + uint32_t ex_status; int rc; /* If discard failed, let's still go ahead, since reintegration might @@ -2184,33 +2271,9 @@ ds_pool_tgt_discard_ult(void *data) D_GOTO(free, rc = 0); } - /* collective operations */ - coll_ops.co_func = pool_child_discard; - coll_args.ca_func_args = arg; - if (pool->sp_map != NULL) { - unsigned int status; - - /* It should only discard the target in DOWNOUT state, and skip - * targets in other state. - */ - status = PO_COMP_ST_UP | PO_COMP_ST_UPIN | PO_COMP_ST_DRAIN | - PO_COMP_ST_DOWN | PO_COMP_ST_NEW; - rc = ds_pool_get_tgt_idx_by_state(arg->pool_uuid, status, - &coll_args.ca_exclude_tgts, - &coll_args.ca_exclude_tgts_cnt); - if (rc) { - D_ERROR(DF_UUID "failed to get index : rc "DF_RC"\n", - DP_UUID(arg->pool_uuid), DP_RC(rc)); - D_GOTO(put, rc); - } - } - - rc = dss_thread_collective_reduce(&coll_ops, &coll_args, DSS_ULT_DEEP_STACK); - if (coll_args.ca_exclude_tgts) - D_FREE(coll_args.ca_exclude_tgts); - D_CDEBUG(rc == 0, DB_MD, DLOG_ERR, DF_UUID" tgt discard:" DF_RC"\n", - DP_UUID(arg->pool_uuid), DP_RC(rc)); -put: + ex_status = + PO_COMP_ST_UP | PO_COMP_ST_UPIN | PO_COMP_ST_DRAIN | PO_COMP_ST_DOWN | PO_COMP_ST_NEW; + ds_pool_thread_collective(arg->pool_uuid, ex_status, pool_child_discard, arg, 0); pool->sp_need_discard = 0; pool->sp_discard_status = rc; diff --git a/src/rebuild/scan.c b/src/rebuild/scan.c index 61137d3c4bb..8778292e346 100644 --- a/src/rebuild/scan.c +++ b/src/rebuild/scan.c @@ -1042,7 +1042,9 @@ rebuild_scan_leader(void *data) D_DEBUG(DB_REBUILD, "rebuild scan collective "DF_UUID" begin.\n", DP_UUID(rpt->rt_pool_uuid)); - rc = dss_thread_collective(rebuild_scanner, rpt, DSS_ULT_DEEP_STACK); + rc = ds_pool_thread_collective(rpt->rt_pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + rebuild_scanner, rpt, DSS_ULT_DEEP_STACK); if (rc) D_GOTO(out, rc); @@ -1050,7 +1052,9 @@ rebuild_scan_leader(void *data) DP_UUID(rpt->rt_pool_uuid)); ABT_mutex_lock(rpt->rt_lock); - rc = dss_task_collective(rebuild_scan_done, rpt, 0); + rc = ds_pool_task_collective(rpt->rt_pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + rebuild_scan_done, rpt, 0); ABT_mutex_unlock(rpt->rt_lock); if (rc) { D_ERROR(DF_UUID" send rebuild object list failed:%d\n", diff --git a/src/tests/suite/daos_test_common.c b/src/tests/suite/daos_test_common.c index 3f854f7b580..46033056673 100644 --- a/src/tests/suite/daos_test_common.c +++ b/src/tests/suite/daos_test_common.c @@ -67,7 +67,7 @@ test_setup_pool_create(void **state, struct test_pool *ipool, if (arg->myrank == 0) { char *env; int size_gb; - daos_size_t nvme_size; + daos_size_t nvme_size = 0; d_rank_list_t *rank_list = NULL; d_agetenv_str(&env, "POOL_SCM_SIZE");