Skip to content

Commit

Permalink
Backports to address b/336317519
Browse files Browse the repository at this point in the history
DAOS-14679 pool: Report on stopping sp_stopping (#14374)
DAOS-15145 pool: add pool collective function (#13764)
*DAOS-14105 object: collectively punch object (#13493)

* partial backport, just the bitmap function

Required-githooks: true
Change-Id: I2b21b8121cbdecc79ae49a464a42b1d47fb9be10
Signed-off-by: Jeff Olivier <jeffolivier@google.com>
  • Loading branch information
jolivier23 committed Jun 12, 2024
1 parent c4b95eb commit c355740
Show file tree
Hide file tree
Showing 18 changed files with 353 additions and 186 deletions.
2 changes: 1 addition & 1 deletion src/container/container_iv.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ cont_iv_ent_update(struct ds_iv_entry *entry, struct ds_iv_key *key,
}
if (entry->iv_class->iv_class_id == IV_CONT_CAPA &&
!uuid_is_null(civ_key->cont_uuid)) {
rc = ds_cont_tgt_close(civ_key->cont_uuid);
rc = ds_cont_tgt_close(entry->ns->iv_pool_uuid, civ_key->cont_uuid);
if (rc)
D_GOTO(out, rc);
}
Expand Down
5 changes: 3 additions & 2 deletions src/container/srv_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,9 @@ ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,
uuid_copy(arg.cont_uuid, cont_uuid);
arg.min_eph = eph;

rc = dss_task_collective(cont_refresh_vos_agg_eph_one, &arg,
DSS_ULT_FL_PERIODIC);
rc = ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_refresh_vos_agg_eph_one, &arg, DSS_ULT_FL_PERIODIC);
return rc;
}

Expand Down
3 changes: 2 additions & 1 deletion src/container/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
int ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid,
uint64_t *snapshots, int snap_count);
int ds_cont_tgt_snapshots_refresh(uuid_t pool_uuid, uuid_t cont_uuid);
int ds_cont_tgt_close(uuid_t cont_hdl_uuid);
int
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t cont_hdl_uuid);
int ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,
daos_epoch_t eph);
int ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop);
Expand Down
62 changes: 35 additions & 27 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,9 @@ ds_cont_tgt_destroy(uuid_t pool_uuid, uuid_t cont_uuid)
cont_iv_entry_delete(pool->sp_iv_ns, pool_uuid, cont_uuid);
ds_pool_put(pool);

rc = dss_thread_collective(cont_child_destroy_one, &in, 0);
rc = ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_child_destroy_one, &in, 0);
if (rc)
D_ERROR(DF_UUID"/"DF_UUID" container child destroy failed: %d\n",
DP_UUID(pool_uuid), DP_UUID(cont_uuid), rc);
Expand Down Expand Up @@ -1631,9 +1633,7 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas,
uint32_t status_pm_ver)
{
struct cont_tgt_open_arg arg = { 0 };
struct dss_coll_ops coll_ops = { 0 };
struct dss_coll_args coll_args = { 0 };
struct cont_tgt_open_arg arg = {0};
int rc;

uuid_copy(arg.pool_uuid, pool_uuid);
Expand All @@ -1647,22 +1647,9 @@ ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
D_DEBUG(DB_TRACE, "open pool/cont/hdl "DF_UUID"/"DF_UUID"/"DF_UUID"\n",
DP_UUID(pool_uuid), DP_UUID(cont_uuid), DP_UUID(cont_hdl_uuid));

/* collective operations */
coll_ops.co_func = cont_open_one;
coll_args.ca_func_args = &arg;

/* setting aggregator args */
rc = ds_pool_get_failed_tgt_idx(pool_uuid, &coll_args.ca_exclude_tgts,
&coll_args.ca_exclude_tgts_cnt);
if (rc) {
D_ERROR(DF_UUID "failed to get index : rc "DF_RC"\n",
DP_UUID(pool_uuid), DP_RC(rc));
return rc;
}

rc = dss_thread_collective_reduce(&coll_ops, &coll_args, 0);
D_FREE(coll_args.ca_exclude_tgts);

rc = ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_open_one, &arg, 0);
if (rc != 0) {
/* Once it exclude the target from the pool, since the target
* might still in the cart group, so IV cont open might still
Expand Down Expand Up @@ -1732,12 +1719,14 @@ cont_close_one_hdl(void *vin)
}

int
ds_cont_tgt_close(uuid_t hdl_uuid)
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid)
{
struct coll_close_arg arg;

uuid_copy(arg.uuid, hdl_uuid);
return dss_thread_collective(cont_close_one_hdl, &arg, 0);
return ds_pool_thread_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_close_one_hdl, &arg, 0);
}

struct xstream_cont_query {
Expand Down Expand Up @@ -1840,6 +1829,7 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc)
struct dss_coll_ops coll_ops;
struct dss_coll_args coll_args = { 0 };
struct xstream_cont_query pack_args;
struct ds_pool_hdl *pool_hdl;

out->tqo_hae = DAOS_EPOCH_MAX;

Expand All @@ -1858,9 +1848,17 @@ ds_cont_tgt_query_handler(crt_rpc_t *rpc)
coll_args.ca_aggregator = &pack_args;
coll_args.ca_func_args = &coll_args.ca_stream_args;

rc = dss_task_collective_reduce(&coll_ops, &coll_args, 0);
pool_hdl = ds_pool_hdl_lookup(in->tqi_pool_uuid);
if (pool_hdl == NULL)
D_GOTO(out, rc = -DER_NO_HDL);

rc = ds_pool_task_collective_reduce(pool_hdl->sph_pool->sp_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
&coll_ops, &coll_args, 0);
D_ASSERTF(rc == 0, ""DF_RC"\n", DP_RC(rc));

ds_pool_hdl_put(pool_hdl);
out:
out->tqo_hae = MIN(out->tqo_hae, pack_args.xcq_hae);
out->tqo_rc = (rc == 0 ? 0 : 1);

Expand Down Expand Up @@ -1943,9 +1941,13 @@ ds_cont_tgt_snapshots_update(uuid_t pool_uuid, uuid_t cont_uuid,
uuid_copy(args.cont_uuid, cont_uuid);
args.snap_count = snap_count;
args.snapshots = snapshots;

D_DEBUG(DB_EPC, DF_UUID": refreshing snapshots %d\n",
DP_UUID(cont_uuid), snap_count);
return dss_task_collective(cont_snap_update_one, &args, 0);

return ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_snap_update_one, &args, false);
}

void
Expand Down Expand Up @@ -2032,7 +2034,9 @@ ds_cont_tgt_snapshot_notify_handler(crt_rpc_t *rpc)
args.snap_opts = in->tsi_opts;
args.oit_oid = in->tsi_oit_oid;

out->tso_rc = dss_thread_collective(cont_snap_notify_one, &args, 0);
out->tso_rc = ds_pool_thread_collective(
in->tsi_pool_uuid, PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_snap_notify_one, &args, 0);
if (out->tso_rc != 0)
D_ERROR(DF_CONT": Snapshot notify failed: "DF_RC"\n",
DP_CONT(in->tsi_pool_uuid, in->tsi_cont_uuid),
Expand Down Expand Up @@ -2077,7 +2081,9 @@ ds_cont_tgt_epoch_aggregate_handler(crt_rpc_t *rpc)
if (out->tao_rc != 0)
return;

rc = dss_task_collective(cont_epoch_aggregate_one, NULL, 0);
rc = ds_pool_task_collective(in->tai_pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_epoch_aggregate_one, NULL, 0);
if (rc != 0)
D_ERROR(DF_CONT": Aggregation failed: "DF_RC"\n",
DP_CONT(in->tai_pool_uuid, in->tai_cont_uuid),
Expand Down Expand Up @@ -2525,7 +2531,9 @@ ds_cont_tgt_prop_update(uuid_t pool_uuid, uuid_t cont_uuid, daos_prop_t *prop)
uuid_copy(arg.cpa_cont_uuid, cont_uuid);
uuid_copy(arg.cpa_pool_uuid, pool_uuid);
arg.cpa_prop = prop;
rc = dss_task_collective(cont_child_prop_update, &arg, 0);
rc = ds_pool_task_collective(pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
cont_child_prop_update, &arg, 0);
if (rc)
D_ERROR("collective cont_write_data_turn_off failed, "DF_RC"\n",
DP_RC(rc));
Expand Down
4 changes: 3 additions & 1 deletion src/dtx/dtx_resync.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,9 @@ dtx_resync_ult(void *data)
if (DAOS_FAIL_CHECK(DAOS_DTX_RESYNC_DELAY))
dss_sleep(5 * 1000);

rc = dss_thread_collective(dtx_resync_one, arg, DSS_ULT_DEEP_STACK);
rc = ds_pool_thread_collective(arg->pool_uuid,
PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT | PO_COMP_ST_NEW,
dtx_resync_one, arg, DSS_ULT_DEEP_STACK);
if (rc) {
/* If dtx resync fails, then let's still update
* sp_dtx_resync_version, so the rebuild can go ahead,
Expand Down
64 changes: 56 additions & 8 deletions src/engine/ult.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
int xs_nr;
int rc;
int tid;
int tgt_id = dss_get_module_info()->dmi_tgt_id;
uint32_t bm_len;
bool self = false;

if (ops == NULL || args == NULL || ops->co_func == NULL) {
D_DEBUG(DB_MD, "mandatory args missing dss_collective_reduce");
Expand All @@ -115,6 +118,7 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
return -DER_CANCELED;
}

bm_len = args->ca_tgt_bitmap_sz << 3;
xs_nr = dss_tgt_nr;
stream_args = &args->ca_stream_args;
D_ALLOC_ARRAY(stream_args->csa_streams, xs_nr);
Expand Down Expand Up @@ -156,19 +160,18 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
stream = &stream_args->csa_streams[tid];
stream->st_coll_args = &carg;

if (args->ca_exclude_tgts_cnt) {
int i;

for (i = 0; i < args->ca_exclude_tgts_cnt; i++)
if (args->ca_exclude_tgts[i] == tid)
break;

if (i < args->ca_exclude_tgts_cnt) {
if (args->ca_tgt_bitmap != NULL) {
if (tid >= bm_len || isclr(args->ca_tgt_bitmap, tid)) {
D_DEBUG(DB_TRACE, "Skip tgt %d\n", tid);
rc = ABT_future_set(future, (void *)stream);
D_ASSERTF(rc == ABT_SUCCESS, "%d\n", rc);
continue;
}

if (tgt_id == tid && flags & DSS_USE_CURRENT_ULT) {
self = true;
continue;
}
}

dx = dss_get_xstream(DSS_MAIN_XS_ID(tid));
Expand Down Expand Up @@ -209,6 +212,12 @@ dss_collective_reduce_internal(struct dss_coll_ops *ops,
}
}

if (self) {
stream = &stream_args->csa_streams[tgt_id];
stream->st_coll_args = &carg;
collective_func(stream);
}

ABT_future_wait(future);

rc = aggregator.at_rc;
Expand Down Expand Up @@ -350,6 +359,45 @@ sched_ult2xs_multisocket(int xs_type, int tgt_id)
return target;
}

int
dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap,
uint32_t *bitmap_sz)
{
uint8_t *bitmap = NULL;
uint32_t size = ((dss_tgt_nr - 1) >> 3) + 1;
uint32_t bits = size << 3;
int rc = 0;
int i;

D_ALLOC(bitmap, size);
if (bitmap == NULL)
D_GOTO(out, rc = -DER_NOMEM);

for (i = 0; i < size; i++)
bitmap[i] = 0xff;

for (i = dss_tgt_nr; i < bits; i++)
clrbit(bitmap, i);

if (exclude_tgts == NULL)
goto out;

for (i = 0; i < exclude_cnt; i++) {
D_ASSERT(exclude_tgts[i] < dss_tgt_nr);
clrbit(bitmap, exclude_tgts[i]);
}

out:
if (rc == 0) {
*p_bitmap = bitmap;
*bitmap_sz = size;
} else {
D_ERROR("Failed to build bitmap for collective task: " DF_RC "\n", DP_RC(rc));
}

return rc;
}

/* ============== ULT create functions =================================== */

static inline int
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ int ds_cont_list(uuid_t pool_uuid, struct daos_pool_cont_info **conts, uint64_t
int ds_cont_filter(uuid_t pool_uuid, daos_pool_cont_filter_t *filt,
struct daos_pool_cont_info2 **conts, uint64_t *ncont);
int ds_cont_upgrade(uuid_t pool_uuid, struct cont_svc *svc);
int ds_cont_tgt_close(uuid_t hdl_uuid);
int
ds_cont_tgt_close(uuid_t pool_uuid, uuid_t hdl_uuid);
int ds_cont_tgt_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid,
uuid_t cont_uuid, uint64_t flags, uint64_t sec_capas,
uint32_t status_pm_ver);
Expand Down
19 changes: 15 additions & 4 deletions src/include/daos_srv/daos_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,11 @@ int dss_parameters_set(unsigned int key_id, uint64_t value);

enum dss_ult_flags {
/* Periodically created ULTs */
DSS_ULT_FL_PERIODIC = (1 << 0),
DSS_ULT_FL_PERIODIC = (1 << 0),
/* Use DSS_DEEP_STACK_SZ as the stack size */
DSS_ULT_DEEP_STACK = (1 << 1),
DSS_ULT_DEEP_STACK = (1 << 1),
/* Use current ULT (instead of creating new one) for the task. */
DSS_USE_CURRENT_ULT = (1 << 2),
};

int dss_ult_create(void (*func)(void *), void *arg, int xs_type, int tgt_id,
Expand Down Expand Up @@ -491,8 +493,14 @@ struct dss_coll_args {
/** Arguments for dss_collective func (Mandatory) */
void *ca_func_args;
void *ca_aggregator;
int *ca_exclude_tgts;
unsigned int ca_exclude_tgts_cnt;
/* Specify on which targets to execute the task. */
uint8_t *ca_tgt_bitmap;
/*
* The size (in byte) of ca_tgt_bitmap. It may be smaller than dss_tgt_nr if only some
* VOS targets are involved. It also may be larger than dss_tgt_nr if dss_tgt_nr is not
* 2 ^ n aligned.
*/
uint32_t ca_tgt_bitmap_sz;
/** Stream arguments for all streams */
struct dss_coll_stream_args ca_stream_args;
};
Expand All @@ -514,6 +522,9 @@ dss_thread_collective_reduce(struct dss_coll_ops *ops,
unsigned int flags);
int dss_task_collective(int (*func)(void *), void *arg, unsigned int flags);
int dss_thread_collective(int (*func)(void *), void *arg, unsigned int flags);
int
dss_build_coll_bitmap(int *exclude_tgts, uint32_t exclude_cnt, uint8_t **p_bitmap,
uint32_t *bitmap_sz);

/**
* Loaded module management metholds
Expand Down
20 changes: 19 additions & 1 deletion src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ int ds_pool_tgt_map_update(struct ds_pool *pool, struct pool_buf *buf,
unsigned int map_version);

int ds_pool_start(uuid_t uuid);
void ds_pool_stop(uuid_t uuid);
int
ds_pool_stop(uuid_t uuid);
int ds_pool_extend(uuid_t pool_uuid, int ntargets, const d_rank_list_t *rank_list, int ndomains,
const uint32_t *domains, d_rank_list_t *svc_ranks);
int ds_pool_target_update_state(uuid_t pool_uuid, d_rank_list_t *ranks,
Expand Down Expand Up @@ -320,6 +321,23 @@ int ds_pool_tgt_discard(uuid_t pool_uuid, uint64_t epoch);
int
ds_pool_mark_upgrade_completed(uuid_t pool_uuid, int ret);

struct dss_coll_args;
struct dss_coll_ops;

int
ds_pool_thread_collective_reduce(uuid_t pool_uuid, uint32_t ex_status,
struct dss_coll_ops *coll_ops, struct dss_coll_args *coll_args,
uint32_t flags);
int
ds_pool_task_collective_reduce(uuid_t pool_uuid, uint32_t ex_status, struct dss_coll_ops *coll_ops,
struct dss_coll_args *coll_args, uint32_t flags);
int
ds_pool_thread_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg,
uint32_t flags);
int
ds_pool_task_collective(uuid_t pool_uuid, uint32_t ex_status, int (*coll_func)(void *), void *arg,
uint32_t flags);

/**
* Verify if pool status satisfy Redundancy Factor requirement, by checking
* pool map device status.
Expand Down
Loading

0 comments on commit c355740

Please sign in to comment.