From 766df287c45a90a697adec3b9542a069236b7595 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Tue, 10 Oct 2023 21:41:39 +0000 Subject: [PATCH 01/15] initial changes Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 130 ++++++++++++++++++++++++++++++++++ src/cart/crt_internal_types.h | 10 +++ src/cart/crt_rpc.c | 4 +- src/include/cart/api.h | 45 ++++++++++++ src/include/cart/types.h | 15 ++++ 5 files changed, 202 insertions(+), 2 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index c632f804964..c42f3642de0 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -141,6 +141,12 @@ crt_context_init(crt_context_t crt_ctx) if (rc != 0) D_GOTO(out, rc); + rc = D_MUTEX_INIT(&ctx->cc_quotas.mutex, NULL); + if (rc != 0) { + D_MUTEX_DESTROY(&ctx->cc_mutex); + D_GOTO(out, rc); + } + D_INIT_LIST_HEAD(&ctx->cc_link); /* create timeout binheap */ @@ -1910,3 +1916,127 @@ crt_req_force_completion(struct crt_rpc_priv *rpc_priv) crt_req_timeout_track(rpc_priv); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); } + +int +crt_context_quotas_init(crt_context_t crt_ctx) +{ + struct crt_context *ctx = crt_ctx; + struct crt_quotas *quotas; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + quotas = &ctx->cc_quotas; + + if (quotas->enabled) { + D_ERROR("Quotas already enabled\n"); + D_GOTO(out, rc = -DER_ALREADY); + } + + quotas->limit[CRT_QUOTA_RPC_ALLOC_SOFT] = 64; + quotas->limit[CRT_QUOTA_RPC_ALLOC_HARD] = 512; + quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 256; + + quotas->current[CRT_QUOTA_RPC_ALLOC_SOFT] = 0; + quotas->current[CRT_QUOTA_RPC_ALLOC_HARD] = 0; + quotas->current[CRT_QUOTA_RPC_INFLIGHT] = 0; + + quotas->enabled = true; +out: + return rc; +} + +int +crt_context_quotas_finalize(crt context_t crt_ctx) +{ + struct crt_context *ctx = crt_ctx; + struct crt_quotas *quotas; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + quotas = &ctx->cc_quotas; + + if (!quotas->enabled) { + D_ERROR("Quotas were not enabled\n"); + D_GOTO(out, rc = -DER_ALREADY); + } + + quotas->enabled = false; + +out: + return rc; + +} + +int +crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + D_MUTEX_LOCK(&ctx->cc_quotas.mutex); + ctx->cc_quotas.limit[quota] = value; + D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); + +out: + return rc; +} + +int +crt_context_quota_get(crt_context_t crt_ctx, crt_quota_t quota, int *value) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + if (value == NULL) { + D_ERROR("NULL value\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + *value = ctx->cc_quotas.limit[quota]; + +out: + return rc; +} + +int +crt_context_get_quota(crt_context_t crt_ctx, crt_quota_t quota) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + +} + +int +crt_context_put_quota(crt_context_t crt_ctx, crt_quota_t quota) +{ +} + diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 1e4eaa28cfa..db5a84a4af5 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -184,6 +184,13 @@ extern struct crt_plugin_gdata crt_plugin_gdata; #define CRT_DEFAULT_CREDITS_PER_EP_CTX (32) #define CRT_MAX_CREDITS_PER_EP_CTX (256) +struct crt_quotas { + int limit[CRT_QUOTA_COUNT]; + int current[CRT_QUOTA_COUNT]; + pthread_mutext_t mutex; + bool enabled; +}; + /* crt_context */ struct crt_context { d_list_t cc_link; /** link to gdata.cg_ctx_list */ @@ -222,6 +229,9 @@ struct crt_context { /** Stores self uri for the current context */ char cc_self_uri[CRT_ADDR_STR_MAX_LEN]; + + /** Stores quotas */ + struct crt_quotas cc_quotas; }; /* in-flight RPC req list, be tracked per endpoint for every crt_context */ diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index 60be954e282..f02989d6ef2 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -562,9 +562,9 @@ int crt_req_create(crt_context_t crt_ctx, crt_endpoint_t *tgt_ep, crt_opcode_t opc, crt_rpc_t **req) { - int rc = 0; - struct crt_grp_priv *grp_priv = NULL; + struct crt_grp_priv *grp_priv = NULL; struct crt_rpc_priv *rpc_priv; + int rc = 0; if (crt_ctx == CRT_CONTEXT_NULL || req == NULL) { D_ERROR("invalid parameter (NULL crt_ctx or req).\n"); diff --git a/src/include/cart/api.h b/src/include/cart/api.h index 45ef8c67529..75854b7361a 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -2235,6 +2235,51 @@ crt_quiet_error(int err) return err == -DER_GRPVER; } +/** + * Initialize and enable default quotas for the context + * + * \param[in] crt_ctx CaRT context + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quotas_init(crt_context_t crt_ctx); + +/** + * Finalize and disables quotas for the context + * + * \param[in] crt_ctx CaRT context + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quotas_finalize(crt_context_t crt_ctx); + +/** + * Change the quota limit. + * + * \param[in] crt_ctx CaRT context + * \param[in] quota Quota type + * \param[in] val Value + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value); + +/** + * Query the quota limit. + * + * \param[in] crt_ctx CaRT context + * \param[in] quota Quota type + * \param[out] val Returned value + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quota_get(crt_context_t crt_ctx, crt_quota_t quota, int *value); + + /** @} */ diff --git a/src/include/cart/types.h b/src/include/cart/types.h index 0ce7dd79815..c9ee3c7a22f 100644 --- a/src/include/cart/types.h +++ b/src/include/cart/types.h @@ -440,6 +440,21 @@ typedef enum { CRT_GROUP_MOD_OP_COUNT, } crt_group_mod_op_t; +/** + * Quotas supported by CaRT. + */ +typedef enum { + /** Soft limit on rpc allocations */ + CRT_QUOTA_RPC_ALLOC_SOFT, + /** Hard limit on rpc allocations */ + CRT_QUOTA_RPC_ALLOC_HARD, + /** Limit of number of inflight rpcs */ + CRT_QUOTA_RPC_INFLIGHT, + + /** Total count of supported quotas */ + CRT_QUOTA_COUNT, +} crt_quota_t; + /** @} */ #endif /* __CRT_TYPES_H__ */ From f2b7192f988cc8f6c484143fdf604d0041f5447d Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Thu, 19 Oct 2023 06:37:37 +0000 Subject: [PATCH 02/15] - Add per-context quotas - Implement RPC inflight quota Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 160 ++++++++++++++++++++++++++-------- src/cart/crt_hg.c | 2 +- src/cart/crt_internal_fns.h | 3 + src/cart/crt_internal_types.h | 3 +- src/cart/crt_rpc.h | 2 + src/include/daos_errno.h | 4 +- 6 files changed, 133 insertions(+), 41 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index c42f3642de0..2fcb3be5cf6 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -147,6 +147,7 @@ crt_context_init(crt_context_t crt_ctx) D_GOTO(out, rc); } + D_INIT_LIST_HEAD(&ctx->cc_quotas.rpc_waitq); D_INIT_LIST_HEAD(&ctx->cc_link); /* create timeout binheap */ @@ -168,6 +169,8 @@ crt_context_init(crt_context_t crt_ctx) D_GOTO(out_binheap_destroy, rc); } + rc = crt_context_quotas_init(crt_ctx); + D_GOTO(out, rc); out_binheap_destroy: @@ -1173,6 +1176,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) d_list_t *rlink; d_rank_t ep_rank; int rc = 0; + int quota_rc = 0; struct crt_grp_priv *grp_priv; D_ASSERT(crt_ctx != NULL); @@ -1183,6 +1187,10 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) D_GOTO(out, rc = CRT_REQ_TRACK_IN_INFLIGHQ); } + /* check inflight quota. if exceeded, queue this rpc */ + quota_rc = crt_context_get_quota_resource(rpc_priv->crp_pub.cr_ctx, + CRT_QUOTA_RPC_INFLIGHT); + grp_priv = crt_grp_pub2priv(rpc_priv->crp_pub.cr_ep.ep_grp); ep_rank = crt_grp_priv_get_primary_rank(grp_priv, rpc_priv->crp_pub.cr_ep.ep_rank); @@ -1227,6 +1235,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) } D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); + /* add the RPC req to crt_ep_inflight */ D_MUTEX_LOCK(&epi->epi_mutex); D_ASSERT(epi->epi_req_num >= epi->epi_reply_num); @@ -1234,15 +1243,16 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) rpc_priv->crp_epi = epi; RPC_ADDREF(rpc_priv); - if (crt_gdata.cg_credit_ep_ctx != 0 && + if (quota_rc == -DER_QUOTA_LIMIT) { + epi->epi_req_num++; + rpc_priv->crp_state = RPC_STATE_QUEUED; + rc = CRT_REQ_TRACK_IN_WAITQ; + } else if (crt_gdata.cg_credit_ep_ctx != 0 && (epi->epi_req_num - epi->epi_reply_num) >= crt_gdata.cg_credit_ep_ctx) { - if (rpc_priv->crp_opc_info->coi_queue_front) { - d_list_add(&rpc_priv->crp_epi_link, - &epi->epi_req_waitq); - } else { - d_list_add_tail(&rpc_priv->crp_epi_link, - &epi->epi_req_waitq); - } + if (rpc_priv->crp_opc_info->coi_queue_front) + d_list_add(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); + else + d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); epi->epi_req_wait_num++; rpc_priv->crp_state = RPC_STATE_QUEUED; @@ -1252,13 +1262,11 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) rc = crt_req_timeout_track(rpc_priv); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); if (rc == 0) { - d_list_add_tail(&rpc_priv->crp_epi_link, - &epi->epi_req_q); + d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_q); epi->epi_req_num++; rc = CRT_REQ_TRACK_IN_INFLIGHQ; } else { - RPC_ERROR(rpc_priv, - "crt_req_timeout_track failed, rc: %d.\n", rc); + RPC_ERROR(rpc_priv, "crt_req_timeout_track failed, rc: %d.\n", rc); /* roll back the addref above */ RPC_DECREF(rpc_priv); } @@ -1270,6 +1278,10 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) /* reference taken by d_hash_rec_find or "epi->epi_ref = 1" above */ D_MUTEX_LOCK(&crt_ctx->cc_mutex); d_hash_rec_decref(&crt_ctx->cc_epi_table, &epi->epi_link); + + if (quota_rc == -DER_QUOTA_LIMIT) + d_list_add_tail(&rpc_priv->crp_waitq_link, &crt_ctx->cc_quotas.rpc_waitq); + D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); out: @@ -1330,6 +1342,7 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) } else {/* RPC_CANCELED or RPC_INITED or RPC_TIMEOUT */ epi->epi_req_num--; } + D_ASSERT(epi->epi_req_num >= epi->epi_reply_num); D_MUTEX_UNLOCK(&epi->epi_mutex); @@ -1346,6 +1359,33 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) RPC_DECREF(rpc_priv); } +static void +dispatch_rpc(struct crt_rpc_priv *rpc) { + int rc; + + if (rpc == NULL) + return; + + /* No need to worry about rc here, we just released quota earlier */ + rc = crt_context_get_quota_resource(rpc->crp_pub.cr_ctx, + CRT_QUOTA_RPC_INFLIGHT); + D_ASSERT(rc == 0); + + crt_rpc_lock(rpc); + + rc = crt_req_send_internal(rpc); + if (rc == 0) { + crt_rpc_unlock(rpc); + } else { + RPC_ADDREF(rpc); + RPC_ERROR(rpc, "crt_req_send_internal failed, rc: %d\n", rc); + rpc->crp_state = RPC_STATE_INITED; + crt_context_req_untrack_internal(rpc); + /* for error case here */ + crt_rpc_complete_and_unlock(rpc, rc); + } +} + void crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) { @@ -1357,14 +1397,21 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) D_ASSERT(crt_ctx != NULL); - if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) { - RPC_TRACE(DB_NET, rpc_priv, "bypass untracking for URI_LOOKUP.\n"); + if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) return; - } epi = rpc_priv->crp_epi; D_ASSERT(epi != NULL); + /* + * Return quota resource and dispatch 1 rpc if any on waitq. + * dispatch_rpc() will either acquire a quota resource or will requeue this rpc + */ + crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPC_INFLIGHT); + tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, + struct crt_rpc_priv, crp_waitq_link); + dispatch_rpc(tmp_rpc); + crt_context_req_untrack_internal(rpc_priv); /* done if flow control disabled */ @@ -1414,20 +1461,8 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) D_MUTEX_UNLOCK(&epi->epi_mutex); /* re-submit the rpc req */ - while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) { - crt_rpc_lock(tmp_rpc); - rc = crt_req_send_internal(tmp_rpc); - if (rc == 0) { - crt_rpc_unlock(tmp_rpc); - } else { - RPC_ADDREF(tmp_rpc); - RPC_ERROR(tmp_rpc, "crt_req_send_internal failed, rc: %d\n", rc); - tmp_rpc->crp_state = RPC_STATE_INITED; - crt_context_req_untrack_internal(tmp_rpc); - /* for error case here */ - crt_rpc_complete_and_unlock(tmp_rpc, rc); - } - } + while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) + dispatch_rpc(tmp_rpc); } /* TODO: Need per-provider call */ @@ -1938,7 +1973,9 @@ crt_context_quotas_init(crt_context_t crt_ctx) quotas->limit[CRT_QUOTA_RPC_ALLOC_SOFT] = 64; quotas->limit[CRT_QUOTA_RPC_ALLOC_HARD] = 512; - quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 256; + + /* TODO: Set this to EP_CREDITS eventually; for now use 4 for testing */ + quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 4; quotas->current[CRT_QUOTA_RPC_ALLOC_SOFT] = 0; quotas->current[CRT_QUOTA_RPC_ALLOC_HARD] = 0; @@ -1950,7 +1987,7 @@ crt_context_quotas_init(crt_context_t crt_ctx) } int -crt_context_quotas_finalize(crt context_t crt_ctx) +crt_context_quotas_finalize(crt_context_t crt_ctx) { struct crt_context *ctx = crt_ctx; struct crt_quotas *quotas; @@ -1972,11 +2009,11 @@ crt_context_quotas_finalize(crt context_t crt_ctx) out: return rc; - + } int -crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value) +crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_t quota, int value) { struct crt_context *ctx = crt_ctx; int rc = 0; @@ -2000,7 +2037,7 @@ crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value) } int -crt_context_quota_get(crt_context_t crt_ctx, crt_quota_t quota, int *value) +crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_t quota, int *value) { struct crt_context *ctx = crt_ctx; int rc = 0; @@ -2027,16 +2064,63 @@ crt_context_quota_get(crt_context_t crt_ctx, crt_quota_t quota, int *value) } int -crt_context_get_quota(crt_context_t crt_ctx, crt_quota_t quota) +crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_t quota) { struct crt_context *ctx = crt_ctx; int rc = 0; - + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + /* If quotas not enabled or unlimited quota */ + if (!ctx->cc_quotas.enabled || ctx->cc_quotas.limit[quota] == 0) + return 0; + + D_MUTEX_LOCK(&ctx->cc_quotas.mutex); + + if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) + ctx->cc_quotas.current[quota]++; + else + rc = -DER_QUOTA_LIMIT; + + D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); +out: + return rc; } int -crt_context_put_quota(crt_context_t crt_ctx, crt_quota_t quota) +crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_t quota) { -} + struct crt_context *ctx = crt_ctx; + int rc = 0; + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + /* If quotas not enabled or unlimited quota */ + if (!ctx->cc_quotas.enabled || ctx->cc_quotas.limit[quota] == 0) + return 0; + + D_MUTEX_LOCK(&ctx->cc_quotas.mutex); + D_ASSERTF(ctx->cc_quotas.current[quota] > 0, "Invalid current limit"); + ctx->cc_quotas.current[quota]--; + + D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); + +out: + return rc; +} diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 464afbb9554..370ebc5c315 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -1397,7 +1397,7 @@ crt_hg_req_send_cb(const struct hg_cb_info *hg_cbinfo) void crt_hg_req_send(struct crt_rpc_priv *rpc_priv) { - hg_return_t hg_ret; + hg_return_t hg_ret; D_ASSERT(rpc_priv != NULL); diff --git a/src/cart/crt_internal_fns.h b/src/cart/crt_internal_fns.h index c88c794852b..d36abc3981e 100644 --- a/src/cart/crt_internal_fns.h +++ b/src/cart/crt_internal_fns.h @@ -31,6 +31,9 @@ enum { CRT_REQ_TRACK_IN_WAITQ, }; +int crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_t quota); +int crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_t quota); + int crt_context_req_track(struct crt_rpc_priv *rpc_priv); bool crt_context_empty(int provider, int locked); void crt_context_req_untrack(struct crt_rpc_priv *rpc_priv); diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index db5a84a4af5..ac8e8d724ec 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -187,7 +187,8 @@ extern struct crt_plugin_gdata crt_plugin_gdata; struct crt_quotas { int limit[CRT_QUOTA_COUNT]; int current[CRT_QUOTA_COUNT]; - pthread_mutext_t mutex; + pthread_mutex_t mutex; + d_list_t rpc_waitq; bool enabled; }; diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index ebda720c995..7d0d5878239 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -130,6 +130,8 @@ struct crt_rpc_priv { d_list_t crp_epi_link; /* tmp_link used in crt_context_req_untrack */ d_list_t crp_tmp_link; + /* link for crt_context::cc_quotas.rpc_waitq */ + d_list_t crp_waitq_link; /* link to parent RPC crp_opc_info->co_child_rpcs/co_replied_rpcs */ d_list_t crp_parent_link; /* binheap node for timeout management, in crt_context::cc_bh_timeout */ diff --git a/src/include/daos_errno.h b/src/include/daos_errno.h index 86709a6bd94..474ef4d041a 100644 --- a/src/include/daos_errno.h +++ b/src/include/daos_errno.h @@ -117,7 +117,9 @@ extern "C" { /** Invalid user/group permissions.*/ \ ACTION(DER_SHMEM_PERMS, Unable to access shared memory segment due to incompatible user or group permissions) \ /** Fatal (non-retry-able) transport layer mercury error */ \ - ACTION(DER_HG_FATAL, Fatal transport layer mercury error) + ACTION(DER_HG_FATAL, Fatal transport layer mercury error) \ + /** Quota limit reached on the requested resource */ \ + ACTION(DER_QUOTA_LIMIT, Quota limit reached) /** TODO: add more error numbers */ /** Preprocessor macro defining DAOS errno values and internal definition of d_errstr */ From 39db4460752bfff5b2436e4e09c035bb2accd3e5 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Thu, 19 Oct 2023 07:21:47 +0000 Subject: [PATCH 03/15] - update test_gurt to include new errors Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/gurt/tests/test_gurt.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/gurt/tests/test_gurt.c b/src/gurt/tests/test_gurt.c index b81a3b5f1cd..a1d490382f9 100644 --- a/src/gurt/tests/test_gurt.c +++ b/src/gurt/tests/test_gurt.c @@ -120,11 +120,11 @@ test_d_errstr(void **state) /* Check the boundary at the end of the GURT error numbers, this will need updating if * additional error numbers are added. */ - value = d_errstr(-DER_HG_FATAL); - assert_string_equal(value, "DER_HG_FATAL"); - value = d_errstr(-1045); - assert_string_equal(value, "DER_HG_FATAL"); - value = d_errstr(-(DER_HG_FATAL + 1)); + value = d_errstr(-DER_QUOTA_LIMIT); + assert_string_equal(value, "DER_QUOTA_LIMIT"); + value = d_errstr(-1046); + assert_string_equal(value, "DER_QUOTA_LIMIT"); + value = d_errstr(-(DER_QUOTA_LIMIT + 1)); assert_string_equal(value, "DER_UNKNOWN"); /* Check the end of the DAOS error numbers. */ From 94087bdaf914661717ddb4941f2d7b4e70fa3287 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Thu, 19 Oct 2023 17:28:06 +0000 Subject: [PATCH 04/15] - Change the check of ep credits to account for rpcs queued in the waitq Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 2fcb3be5cf6..49f6f653560 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1298,9 +1298,10 @@ credits_available(struct crt_ep_inflight *epi) { int64_t inflight = epi->epi_req_num - epi->epi_reply_num; - D_ASSERTF(inflight >= 0 && inflight <= crt_gdata.cg_credit_ep_ctx, - "req_num=%ld reply_num=%ld credit_ep_ctx=%u\n", epi->epi_req_num, - epi->epi_reply_num, crt_gdata.cg_credit_ep_ctx); + /* TODO: inflight right now includes items queued in quota waitq, and can exceed credit limit */ + if (inflight > crt_gdata.cg_credit_ep_ctx) + return 0; + return crt_gdata.cg_credit_ep_ctx - inflight; } From 0914730d43a1764e33c6587d3346f2353a3971dd Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Sat, 21 Oct 2023 06:37:05 +0000 Subject: [PATCH 05/15] - rename crt_quota_t -> crt_quota_type_t - add comment to not implemented quotas - set default to 32 inflight Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 12 ++++++------ src/cart/crt_internal_fns.h | 4 ++-- src/include/cart/api.h | 4 ++-- src/include/cart/types.h | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 49f6f653560..507634a6942 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1975,8 +1975,8 @@ crt_context_quotas_init(crt_context_t crt_ctx) quotas->limit[CRT_QUOTA_RPC_ALLOC_SOFT] = 64; quotas->limit[CRT_QUOTA_RPC_ALLOC_HARD] = 512; - /* TODO: Set this to EP_CREDITS eventually; for now use 4 for testing */ - quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 4; + /* TODO: Set based on ep credits */ + quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 32; quotas->current[CRT_QUOTA_RPC_ALLOC_SOFT] = 0; quotas->current[CRT_QUOTA_RPC_ALLOC_HARD] = 0; @@ -2014,7 +2014,7 @@ crt_context_quotas_finalize(crt_context_t crt_ctx) } int -crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_t quota, int value) +crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value) { struct crt_context *ctx = crt_ctx; int rc = 0; @@ -2038,7 +2038,7 @@ crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_t quota, int value) } int -crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_t quota, int *value) +crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value) { struct crt_context *ctx = crt_ctx; int rc = 0; @@ -2065,7 +2065,7 @@ crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_t quota, int *value } int -crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_t quota) +crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) { struct crt_context *ctx = crt_ctx; int rc = 0; @@ -2097,7 +2097,7 @@ crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_t quota) } int -crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_t quota) +crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) { struct crt_context *ctx = crt_ctx; int rc = 0; diff --git a/src/cart/crt_internal_fns.h b/src/cart/crt_internal_fns.h index d36abc3981e..0d537ffcdef 100644 --- a/src/cart/crt_internal_fns.h +++ b/src/cart/crt_internal_fns.h @@ -31,8 +31,8 @@ enum { CRT_REQ_TRACK_IN_WAITQ, }; -int crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_t quota); -int crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_t quota); +int crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); +int crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); int crt_context_req_track(struct crt_rpc_priv *rpc_priv); bool crt_context_empty(int provider, int locked); diff --git a/src/include/cart/api.h b/src/include/cart/api.h index a22573be13a..b5bbfd9ef2d 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -2277,7 +2277,7 @@ int crt_context_quotas_finalize(crt_context_t crt_ctx); * \return DER_SUCCESS on success, negative value on * failure. */ -int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value); +int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value); /** * Query the quota limit. @@ -2289,7 +2289,7 @@ int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_t quota, int value); * \return DER_SUCCESS on success, negative value on * failure. */ -int crt_context_quota_get(crt_context_t crt_ctx, crt_quota_t quota, int *value); +int crt_context_quota_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value); /** @} diff --git a/src/include/cart/types.h b/src/include/cart/types.h index c9ee3c7a22f..0b72b23cb4b 100644 --- a/src/include/cart/types.h +++ b/src/include/cart/types.h @@ -445,15 +445,15 @@ typedef enum { */ typedef enum { /** Soft limit on rpc allocations */ - CRT_QUOTA_RPC_ALLOC_SOFT, + CRT_QUOTA_RPC_ALLOC_SOFT, /* TODO: Not implemented */ /** Hard limit on rpc allocations */ - CRT_QUOTA_RPC_ALLOC_HARD, + CRT_QUOTA_RPC_ALLOC_HARD, /* TODO: Not implemented */ /** Limit of number of inflight rpcs */ CRT_QUOTA_RPC_INFLIGHT, /** Total count of supported quotas */ CRT_QUOTA_COUNT, -} crt_quota_t; +} crt_quota_type_t; /** @} */ From 0f82132247764667608248e18c8fce58487cfe69 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Mon, 23 Oct 2023 17:22:56 +0000 Subject: [PATCH 06/15] - Instead of release/re-request quota upon one rpc completion, keep its quota reservation for any rpc in the list. Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 507634a6942..475781b3122 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1361,16 +1361,21 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) } static void -dispatch_rpc(struct crt_rpc_priv *rpc) { +dispatch_rpc(struct crt_rpc_priv *rpc, bool get_quota) { int rc; if (rpc == NULL) return; - /* No need to worry about rc here, we just released quota earlier */ - rc = crt_context_get_quota_resource(rpc->crp_pub.cr_ctx, - CRT_QUOTA_RPC_INFLIGHT); - D_ASSERT(rc == 0); + /* TODO: Decide if we need to handle true case */ + if (get_quota) { + /* No need to worry about rc here, we just released quota earlier */ + rc = crt_context_get_quota_resource(rpc->crp_pub.cr_ctx, + CRT_QUOTA_RPC_INFLIGHT); + /* TODO: Consider if we need to requeue */ + if (rc) + D_WARN("Quota query failed with rc=%d\n", rc); + } crt_rpc_lock(rpc); @@ -1408,10 +1413,13 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) * Return quota resource and dispatch 1 rpc if any on waitq. * dispatch_rpc() will either acquire a quota resource or will requeue this rpc */ - crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPC_INFLIGHT); tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, struct crt_rpc_priv, crp_waitq_link); - dispatch_rpc(tmp_rpc); + if (tmp_rpc != NULL) { + dispatch_rpc(tmp_rpc, false); + } else { + crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPC_INFLIGHT); + } crt_context_req_untrack_internal(rpc_priv); @@ -1463,7 +1471,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) /* re-submit the rpc req */ while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) - dispatch_rpc(tmp_rpc); + dispatch_rpc(tmp_rpc, false); } /* TODO: Need per-provider call */ From 8d121b5073da3782d95f340a3d7bac001ac41798 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Tue, 24 Oct 2023 07:29:05 +0000 Subject: [PATCH 07/15] modify quota limit for a debug and add warning print when quota limit is reached. Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 475781b3122..fb85a152954 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1984,7 +1984,7 @@ crt_context_quotas_init(crt_context_t crt_ctx) quotas->limit[CRT_QUOTA_RPC_ALLOC_HARD] = 512; /* TODO: Set based on ep credits */ - quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 32; + quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 64; quotas->current[CRT_QUOTA_RPC_ALLOC_SOFT] = 0; quotas->current[CRT_QUOTA_RPC_ALLOC_HARD] = 0; @@ -2096,8 +2096,10 @@ crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) ctx->cc_quotas.current[quota]++; - else + else { + D_WARN("Quota limit reached for quota_type=%d\n", quota); rc = -DER_QUOTA_LIMIT; + } D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); out: From 3229a8224568314c998c55f7e64b1125085574ff Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Fri, 1 Dec 2023 19:32:11 +0000 Subject: [PATCH 08/15] Cleanup and refactoring a bit: - crt_context_quotas_init/finalize removed, changed to internal static calls - renamed crt_context_quota_set -> crt_context_quota_limit_set; similar for get - D_QUOTA_RPCS envariable added. Default set to 64. - Enable flag kept per quota type - Allocation quotas removed as they are not implemented for now Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/README.env | 6 ++++ src/cart/crt_context.c | 68 ++++++++++++++--------------------- src/cart/crt_init.c | 6 ++++ src/cart/crt_internal_types.h | 7 ++-- src/include/cart/api.h | 24 ++----------- src/include/cart/types.h | 6 +--- 6 files changed, 47 insertions(+), 70 deletions(-) diff --git a/src/cart/README.env b/src/cart/README.env index 58df8cf2baf..3d4a12963df 100644 --- a/src/cart/README.env +++ b/src/cart/README.env @@ -139,6 +139,12 @@ This file lists the environment variables used in CaRT. It its value exceed 256, then will use 256 for flow control. Set it to zero means disable the flow control in cart. + . D_QUOTA_RPCS + Set it as the max number of per-context inflight RPCs that a sender will send + onto a wire. Quota on each context is independent of each other. + If it is not set the default value of 64 is used. + Setting it to 0 disables quota + . CRT_CTX_SHARE_ADDR Set it to non-zero to make all the contexts share one network address, in this case CaRT will create one SEP and each context maps to one tx/rx diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 3f232484003..82662d155b2 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -11,6 +11,8 @@ #include "crt_internal.h" static void crt_epi_destroy(struct crt_ep_inflight *epi); +static int context_quotas_init(crt_context_t crt_ctx); +static int context_quotas_finalize(crt_context_t crt_ctx); static struct crt_ep_inflight * epi_link2ptr(d_list_t *rlink) @@ -169,7 +171,7 @@ crt_context_init(crt_context_t crt_ctx) D_GOTO(out_binheap_destroy, rc); } - rc = crt_context_quotas_init(crt_ctx); + rc = context_quotas_init(crt_ctx); D_GOTO(out, rc); @@ -693,10 +695,17 @@ crt_context_destroy(crt_context_t crt_ctx, int force) D_GOTO(out, rc = -DER_UNINIT); } + rc = context_quotas_finalize(crt_ctx); + if (rc) { + DL_ERROR(rc, "context_quotas_finalize() failed"); + if (!force) + D_GOTO(out, rc); + } + ctx = crt_ctx; rc = crt_grp_ctx_invalid(ctx, false /* locked */); if (rc) { - D_ERROR("crt_grp_ctx_invalid failed, rc: %d.\n", rc); + DL_ERROR(rc, "crt_grp_ctx_invalid() failed"); if (!force) D_GOTO(out, rc); } @@ -1189,7 +1198,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) /* check inflight quota. if exceeded, queue this rpc */ quota_rc = crt_context_get_quota_resource(rpc_priv->crp_pub.cr_ctx, - CRT_QUOTA_RPC_INFLIGHT); + CRT_QUOTA_RPCS); grp_priv = crt_grp_pub2priv(rpc_priv->crp_pub.cr_ep.ep_grp); ep_rank = crt_grp_priv_get_primary_rank(grp_priv, @@ -1371,7 +1380,7 @@ dispatch_rpc(struct crt_rpc_priv *rpc, bool get_quota) { if (get_quota) { /* No need to worry about rc here, we just released quota earlier */ rc = crt_context_get_quota_resource(rpc->crp_pub.cr_ctx, - CRT_QUOTA_RPC_INFLIGHT); + CRT_QUOTA_RPCS); /* TODO: Consider if we need to requeue */ if (rc) D_WARN("Quota query failed with rc=%d\n", rc); @@ -1418,7 +1427,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) if (tmp_rpc != NULL) { dispatch_rpc(tmp_rpc, false); } else { - crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPC_INFLIGHT); + crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); } crt_context_req_untrack_internal(rpc_priv); @@ -1961,8 +1970,8 @@ crt_req_force_completion(struct crt_rpc_priv *rpc_priv) D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); } -int -crt_context_quotas_init(crt_context_t crt_ctx) +static int +context_quotas_init(crt_context_t crt_ctx) { struct crt_context *ctx = crt_ctx; struct crt_quotas *quotas; @@ -1975,50 +1984,27 @@ crt_context_quotas_init(crt_context_t crt_ctx) quotas = &ctx->cc_quotas; - if (quotas->enabled) { - D_ERROR("Quotas already enabled\n"); - D_GOTO(out, rc = -DER_ALREADY); - } - - quotas->limit[CRT_QUOTA_RPC_ALLOC_SOFT] = 64; - quotas->limit[CRT_QUOTA_RPC_ALLOC_HARD] = 512; - - /* TODO: Set based on ep credits */ - quotas->limit[CRT_QUOTA_RPC_INFLIGHT] = 64; - - quotas->current[CRT_QUOTA_RPC_ALLOC_SOFT] = 0; - quotas->current[CRT_QUOTA_RPC_ALLOC_HARD] = 0; - quotas->current[CRT_QUOTA_RPC_INFLIGHT] = 0; - - quotas->enabled = true; + quotas->limit[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota; + quotas->current[CRT_QUOTA_RPCS] = 0; + quotas->enabled[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota > 0 ? true : false; out: return rc; } -int -crt_context_quotas_finalize(crt_context_t crt_ctx) +static int +context_quotas_finalize(crt_context_t crt_ctx) { struct crt_context *ctx = crt_ctx; - struct crt_quotas *quotas; - int rc = 0; if (ctx == NULL) { D_ERROR("NULL context\n"); - D_GOTO(out, rc = -DER_INVAL); - } - - quotas = &ctx->cc_quotas; - - if (!quotas->enabled) { - D_ERROR("Quotas were not enabled\n"); - D_GOTO(out, rc = -DER_ALREADY); + return -DER_INVAL; } - quotas->enabled = false; - -out: - return rc; + for (int i = 0; i < CRT_QUOTA_COUNT; i++) + ctx->cc_quotas.enabled[i] = false; + return DER_SUCCESS; } int @@ -2089,7 +2075,7 @@ crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) } /* If quotas not enabled or unlimited quota */ - if (!ctx->cc_quotas.enabled || ctx->cc_quotas.limit[quota] == 0) + if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) return 0; D_MUTEX_LOCK(&ctx->cc_quotas.mutex); @@ -2123,7 +2109,7 @@ crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) } /* If quotas not enabled or unlimited quota */ - if (!ctx->cc_quotas.enabled || ctx->cc_quotas.limit[quota] == 0) + if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) return 0; D_MUTEX_LOCK(&ctx->cc_quotas.mutex); diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index 3f31c3cadf1..825d25ded04 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -96,6 +96,7 @@ dump_envariables(void) "D_PORT_AUTO_ADJUST", "D_POLL_TIMEOUT", "D_LOG_FILE_APPEND_RANK", + "D_QUOTA_RPCS", "DAOS_SIGNAL_REGISTER"}; D_INFO("-- ENVARS: --\n"); @@ -255,6 +256,8 @@ prov_data_init(struct crt_prov_gdata *prov_data, crt_provider_t provider, return DER_SUCCESS; } +#define CRT_QUOTA_RPCS_DEFAULT 64 + /* first step init - for initializing crt_gdata */ static int data_init(int server, crt_init_options_t *opt) { @@ -317,6 +320,9 @@ static int data_init(int server, crt_init_options_t *opt) d_getenv_int("CRT_CREDIT_EP_CTX", &credits); } + crt_gdata.cg_rpc_quota = CRT_QUOTA_RPCS_DEFAULT; + d_getenv_int("D_QUOTA_RPCS", &crt_gdata.cg_rpc_quota); + /* Must be set on the server when using UCX, will not affect OFI */ d_getenv_char("UCX_IB_FORK_INIT", &ucx_ib_fork_init); if (ucx_ib_fork_init) { diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 857f6cf6bc8..71b2ca1faf5 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -88,7 +88,7 @@ struct crt_gdata { /** Provider specific data */ struct crt_prov_gdata cg_prov_gdata_primary; - /** */ + /** Placeholder for secondary provider data */ struct crt_prov_gdata *cg_prov_gdata_secondary; /** global timeout value (second) for all RPCs */ @@ -106,6 +106,7 @@ struct crt_gdata { /** HG level global data */ struct crt_hg_gdata *cg_hg; + /** Points to default group */ struct crt_grp_gdata *cg_grp; /** refcount to protect crt_init/crt_finalize */ @@ -141,6 +142,8 @@ struct crt_gdata { struct d_tm_node_t *cg_uri_other; /** Number of cores on a system */ long cg_num_cores; + /** Inflight rpc quota limit */ + uint32_t cg_rpc_quota; }; extern struct crt_gdata crt_gdata; @@ -188,9 +191,9 @@ extern struct crt_plugin_gdata crt_plugin_gdata; struct crt_quotas { int limit[CRT_QUOTA_COUNT]; int current[CRT_QUOTA_COUNT]; + bool enabled[CRT_QUOTA_COUNT]; pthread_mutex_t mutex; d_list_t rpc_waitq; - bool enabled; }; /* crt_context */ diff --git a/src/include/cart/api.h b/src/include/cart/api.h index d69d8e48f55..312a9afc070 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -2247,26 +2247,6 @@ crt_quiet_error(int err) return err == -DER_GRPVER; } -/** - * Initialize and enable default quotas for the context - * - * \param[in] crt_ctx CaRT context - * - * \return DER_SUCCESS on success, negative value on - * failure. - */ -int crt_context_quotas_init(crt_context_t crt_ctx); - -/** - * Finalize and disables quotas for the context - * - * \param[in] crt_ctx CaRT context - * - * \return DER_SUCCESS on success, negative value on - * failure. - */ -int crt_context_quotas_finalize(crt_context_t crt_ctx); - /** * Change the quota limit. * @@ -2277,7 +2257,7 @@ int crt_context_quotas_finalize(crt_context_t crt_ctx); * \return DER_SUCCESS on success, negative value on * failure. */ -int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value); +int crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value); /** * Query the quota limit. @@ -2289,7 +2269,7 @@ int crt_context_quota_set(crt_context_t crt_ctx, crt_quota_type_t quota, int val * \return DER_SUCCESS on success, negative value on * failure. */ -int crt_context_quota_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value); +int crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value); /** * Get the proto version of an RPC request. diff --git a/src/include/cart/types.h b/src/include/cart/types.h index 0b72b23cb4b..d02e2881047 100644 --- a/src/include/cart/types.h +++ b/src/include/cart/types.h @@ -444,12 +444,8 @@ typedef enum { * Quotas supported by CaRT. */ typedef enum { - /** Soft limit on rpc allocations */ - CRT_QUOTA_RPC_ALLOC_SOFT, /* TODO: Not implemented */ - /** Hard limit on rpc allocations */ - CRT_QUOTA_RPC_ALLOC_HARD, /* TODO: Not implemented */ /** Limit of number of inflight rpcs */ - CRT_QUOTA_RPC_INFLIGHT, + CRT_QUOTA_RPCS, /** Total count of supported quotas */ CRT_QUOTA_COUNT, From 59f45dc8c17d64e18904109f2186d56347d60094 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Tue, 5 Dec 2023 17:46:34 +0000 Subject: [PATCH 09/15] Add lock around dequeue Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 82662d155b2..d470e018e5a 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1422,8 +1422,10 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) * Return quota resource and dispatch 1 rpc if any on waitq. * dispatch_rpc() will either acquire a quota resource or will requeue this rpc */ + D_MUTEX_LOCK(&crt_ctx->cc_mutex); tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, struct crt_rpc_priv, crp_waitq_link); + D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); if (tmp_rpc != NULL) { dispatch_rpc(tmp_rpc, false); } else { From c6820ce30ba5300bfb56cdc3306053b022c9e0fd Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Mon, 11 Dec 2023 07:57:18 +0000 Subject: [PATCH 10/15] - cleanup Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 26 +++++--------------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index d470e018e5a..c1fc68b7a6a 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1244,7 +1244,6 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) } D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); - /* add the RPC req to crt_ep_inflight */ D_MUTEX_LOCK(&epi->epi_mutex); D_ASSERT(epi->epi_req_num >= epi->epi_reply_num); @@ -1370,22 +1369,12 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) } static void -dispatch_rpc(struct crt_rpc_priv *rpc, bool get_quota) { +dispatch_rpc(struct crt_rpc_priv *rpc) { int rc; if (rpc == NULL) return; - /* TODO: Decide if we need to handle true case */ - if (get_quota) { - /* No need to worry about rc here, we just released quota earlier */ - rc = crt_context_get_quota_resource(rpc->crp_pub.cr_ctx, - CRT_QUOTA_RPCS); - /* TODO: Consider if we need to requeue */ - if (rc) - D_WARN("Quota query failed with rc=%d\n", rc); - } - crt_rpc_lock(rpc); rc = crt_req_send_internal(rpc); @@ -1418,23 +1407,21 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) epi = rpc_priv->crp_epi; D_ASSERT(epi != NULL); - /* - * Return quota resource and dispatch 1 rpc if any on waitq. - * dispatch_rpc() will either acquire a quota resource or will requeue this rpc - */ + /* Dispatch one rpc from wait_q if any or return resource back */ D_MUTEX_LOCK(&crt_ctx->cc_mutex); tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, struct crt_rpc_priv, crp_waitq_link); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); + if (tmp_rpc != NULL) { - dispatch_rpc(tmp_rpc, false); + dispatch_rpc(tmp_rpc); } else { crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); } crt_context_req_untrack_internal(rpc_priv); - /* done if flow control disabled */ + /* done if ep credit flow control is disabled */ if (crt_gdata.cg_credit_ep_ctx == 0) return; @@ -2081,14 +2068,12 @@ crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) return 0; D_MUTEX_LOCK(&ctx->cc_quotas.mutex); - if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) ctx->cc_quotas.current[quota]++; else { D_WARN("Quota limit reached for quota_type=%d\n", quota); rc = -DER_QUOTA_LIMIT; } - D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); out: return rc; @@ -2117,7 +2102,6 @@ crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) D_MUTEX_LOCK(&ctx->cc_quotas.mutex); D_ASSERTF(ctx->cc_quotas.current[quota] > 0, "Invalid current limit"); ctx->cc_quotas.current[quota]--; - D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); out: From d8fc6c909f7669ef2766d8b2206bdf17c5192fab Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Mon, 11 Dec 2023 16:19:22 +0000 Subject: [PATCH 11/15] fix Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index c1fc68b7a6a..8a44fdd5bd7 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1413,11 +1413,10 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) struct crt_rpc_priv, crp_waitq_link); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); - if (tmp_rpc != NULL) { + if (tmp_rpc != NULL) dispatch_rpc(tmp_rpc); - } else { + else crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); - } crt_context_req_untrack_internal(rpc_priv); @@ -1469,7 +1468,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) /* re-submit the rpc req */ while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) - dispatch_rpc(tmp_rpc, false); + dispatch_rpc(tmp_rpc); } /* TODO: Need per-provider call */ From f56164808380ee8e7ee6e2548b2dbb60b5db9269 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Tue, 12 Dec 2023 16:16:02 +0000 Subject: [PATCH 12/15] - disable quotas by default on engines - move default to a diff file Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_init.c | 5 +++-- src/cart/crt_rpc.h | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index f6df772ba1b..1ca20794c64 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -258,7 +258,6 @@ prov_data_init(struct crt_prov_gdata *prov_data, crt_provider_t provider, return DER_SUCCESS; } -#define CRT_QUOTA_RPCS_DEFAULT 64 /* first step init - for initializing crt_gdata */ static int data_init(int server, crt_init_options_t *opt) @@ -325,7 +324,9 @@ static int data_init(int server, crt_init_options_t *opt) d_getenv_int("CRT_CREDIT_EP_CTX", &credits); } - crt_gdata.cg_rpc_quota = CRT_QUOTA_RPCS_DEFAULT; + /* Enable quotas by default only on clients */ + crt_gdata.cg_rpc_quota = crt_is_service() ? 0 : CRT_QUOTA_RPCS_DEFAULT; + d_getenv_int("D_QUOTA_RPCS", &crt_gdata.cg_rpc_quota); /* Must be set on the server when using UCX, will not affect OFI */ diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index 02cb3ed04f7..75a3f92ea33 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -18,6 +18,8 @@ #define CRT_DEFAULT_TIMEOUT_S (60) /* second */ #define CRT_DEFAULT_TIMEOUT_US (CRT_DEFAULT_TIMEOUT_S * 1e6) /* micro-second */ +#define CRT_QUOTA_RPCS_DEFAULT 64 + /* uri lookup max retry times */ #define CRT_URI_LOOKUP_RETRY_MAX (8) From 45463dff0ad264d34102e2278aa96210a6bfd649 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Wed, 13 Dec 2023 17:57:57 +0000 Subject: [PATCH 13/15] - fix incorrect check Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_init.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index 1ca20794c64..2182735293e 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -325,7 +325,7 @@ static int data_init(int server, crt_init_options_t *opt) } /* Enable quotas by default only on clients */ - crt_gdata.cg_rpc_quota = crt_is_service() ? 0 : CRT_QUOTA_RPCS_DEFAULT; + crt_gdata.cg_rpc_quota = server ? 0 : CRT_QUOTA_RPCS_DEFAULT; d_getenv_int("D_QUOTA_RPCS", &crt_gdata.cg_rpc_quota); From 9010e17d585a065c3c6b724f90d229da734507a9 Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Fri, 29 Dec 2023 07:36:51 +0000 Subject: [PATCH 14/15] - 'current' quota changed to atomic. locks removed - crt_req_set/get quota resource shortened and static inline now - changed warning to debug message when exceeding quotas Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 62 +++++++++++++---------------------- src/cart/crt_internal_fns.h | 3 -- src/cart/crt_internal_types.h | 2 +- 3 files changed, 24 insertions(+), 43 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index 8a44fdd5bd7..e3ad9d92c63 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -14,6 +14,9 @@ static void crt_epi_destroy(struct crt_ep_inflight *epi); static int context_quotas_init(crt_context_t crt_ctx); static int context_quotas_finalize(crt_context_t crt_ctx); +static inline int get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); +static inline void put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); + static struct crt_ep_inflight * epi_link2ptr(d_list_t *rlink) { @@ -1197,8 +1200,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) } /* check inflight quota. if exceeded, queue this rpc */ - quota_rc = crt_context_get_quota_resource(rpc_priv->crp_pub.cr_ctx, - CRT_QUOTA_RPCS); + quota_rc = get_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); grp_priv = crt_grp_pub2priv(rpc_priv->crp_pub.cr_ep.ep_grp); ep_rank = crt_grp_priv_get_primary_rank(grp_priv, @@ -1416,7 +1418,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) if (tmp_rpc != NULL) dispatch_rpc(tmp_rpc); else - crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); + put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); crt_context_req_untrack_internal(rpc_priv); @@ -2046,63 +2048,45 @@ crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int * return rc; } -int -crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) +static inline int +get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) { struct crt_context *ctx = crt_ctx; int rc = 0; - if (ctx == NULL) { - D_ERROR("NULL context\n"); - D_GOTO(out, rc = -DER_INVAL); - } - - if (quota < 0 || quota >= CRT_QUOTA_COUNT) { - D_ERROR("Invalid quota %d passed\n", quota); - D_GOTO(out, rc = -DER_INVAL); - } + D_ASSERTF(ctx != NULL, "NULL context\n"); + D_ASSERTF(quota >= 0 && quota < CRT_QUOTA_COUNT, "Invalid quota\n"); /* If quotas not enabled or unlimited quota */ if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) return 0; - D_MUTEX_LOCK(&ctx->cc_quotas.mutex); - if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) - ctx->cc_quotas.current[quota]++; - else { - D_WARN("Quota limit reached for quota_type=%d\n", quota); + /* It's ok if we go slightly above quota in a corner case, but avoid locks */ + if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) { + atomic_fetch_add(&ctx->cc_quotas.current[quota], 1); + } else { + D_DEBUG(DB_TRACE, "Quota limit (%d) reached for quota_type=%d\n", + ctx->cc_quotas.limit[quota], quota); rc = -DER_QUOTA_LIMIT; } - D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); -out: + return rc; } -int -crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) +static inline void +put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) { struct crt_context *ctx = crt_ctx; - int rc = 0; - - if (ctx == NULL) { - D_ERROR("NULL context\n"); - D_GOTO(out, rc = -DER_INVAL); - } - if (quota < 0 || quota >= CRT_QUOTA_COUNT) { - D_ERROR("Invalid quota %d passed\n", quota); - D_GOTO(out, rc = -DER_INVAL); - } + D_ASSERTF(ctx != NULL, "NULL context\n"); + D_ASSERTF(quota >= 0 && quota < CRT_QUOTA_COUNT, "Invalid quota\n"); /* If quotas not enabled or unlimited quota */ if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) - return 0; + return; - D_MUTEX_LOCK(&ctx->cc_quotas.mutex); D_ASSERTF(ctx->cc_quotas.current[quota] > 0, "Invalid current limit"); - ctx->cc_quotas.current[quota]--; - D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); + atomic_fetch_sub(&ctx->cc_quotas.current[quota], 1); -out: - return rc; + return; } diff --git a/src/cart/crt_internal_fns.h b/src/cart/crt_internal_fns.h index 0d537ffcdef..c88c794852b 100644 --- a/src/cart/crt_internal_fns.h +++ b/src/cart/crt_internal_fns.h @@ -31,9 +31,6 @@ enum { CRT_REQ_TRACK_IN_WAITQ, }; -int crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); -int crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); - int crt_context_req_track(struct crt_rpc_priv *rpc_priv); bool crt_context_empty(int provider, int locked); void crt_context_req_untrack(struct crt_rpc_priv *rpc_priv); diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index c35595b2e99..2e71e28b693 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -194,7 +194,7 @@ extern struct crt_plugin_gdata crt_plugin_gdata; struct crt_quotas { int limit[CRT_QUOTA_COUNT]; - int current[CRT_QUOTA_COUNT]; + ATOMIC uint32_t current[CRT_QUOTA_COUNT]; bool enabled[CRT_QUOTA_COUNT]; pthread_mutex_t mutex; d_list_t rpc_waitq; From 584aaf767c0c4c38bfc65fe89d33c411d0061aef Mon Sep 17 00:00:00 2001 From: Alexander A Oganezov Date: Fri, 29 Dec 2023 07:42:04 +0000 Subject: [PATCH 15/15] - change check to assert for null rpc Required-githooks: true Signed-off-by: Alexander A Oganezov --- src/cart/crt_context.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index e3ad9d92c63..78702e25b38 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -1374,8 +1374,7 @@ static void dispatch_rpc(struct crt_rpc_priv *rpc) { int rc; - if (rpc == NULL) - return; + D_ASSERTF(rpc != NULL, "rpc is NULL\n"); crt_rpc_lock(rpc);