diff --git a/src/cart/README.env b/src/cart/README.env index 58df8cf2baf..3d4a12963df 100644 --- a/src/cart/README.env +++ b/src/cart/README.env @@ -139,6 +139,12 @@ This file lists the environment variables used in CaRT. It its value exceed 256, then will use 256 for flow control. Set it to zero means disable the flow control in cart. + . D_QUOTA_RPCS + Set it as the max number of per-context inflight RPCs that a sender will send + onto a wire. Quota on each context is independent of each other. + If it is not set the default value of 64 is used. + Setting it to 0 disables quota + . CRT_CTX_SHARE_ADDR Set it to non-zero to make all the contexts share one network address, in this case CaRT will create one SEP and each context maps to one tx/rx diff --git a/src/cart/crt_context.c b/src/cart/crt_context.c index fdcd73c8ec4..78702e25b38 100644 --- a/src/cart/crt_context.c +++ b/src/cart/crt_context.c @@ -11,6 +11,11 @@ #include "crt_internal.h" static void crt_epi_destroy(struct crt_ep_inflight *epi); +static int context_quotas_init(crt_context_t crt_ctx); +static int context_quotas_finalize(crt_context_t crt_ctx); + +static inline int get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); +static inline void put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota); static struct crt_ep_inflight * epi_link2ptr(d_list_t *rlink) @@ -141,6 +146,13 @@ crt_context_init(crt_context_t crt_ctx) if (rc != 0) D_GOTO(out, rc); + rc = D_MUTEX_INIT(&ctx->cc_quotas.mutex, NULL); + if (rc != 0) { + D_MUTEX_DESTROY(&ctx->cc_mutex); + D_GOTO(out, rc); + } + + D_INIT_LIST_HEAD(&ctx->cc_quotas.rpc_waitq); D_INIT_LIST_HEAD(&ctx->cc_link); /* create timeout binheap */ @@ -162,6 +174,8 @@ crt_context_init(crt_context_t crt_ctx) D_GOTO(out_binheap_destroy, rc); } + rc = context_quotas_init(crt_ctx); + D_GOTO(out, rc); out_binheap_destroy: @@ -684,10 +698,17 @@ crt_context_destroy(crt_context_t crt_ctx, int force) D_GOTO(out, rc = -DER_UNINIT); } + rc = context_quotas_finalize(crt_ctx); + if (rc) { + DL_ERROR(rc, "context_quotas_finalize() failed"); + if (!force) + D_GOTO(out, rc); + } + ctx = crt_ctx; rc = crt_grp_ctx_invalid(ctx, false /* locked */); if (rc) { - D_ERROR("crt_grp_ctx_invalid failed, rc: %d.\n", rc); + DL_ERROR(rc, "crt_grp_ctx_invalid() failed"); if (!force) D_GOTO(out, rc); } @@ -1167,6 +1188,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) d_list_t *rlink; d_rank_t ep_rank; int rc = 0; + int quota_rc = 0; struct crt_grp_priv *grp_priv; D_ASSERT(crt_ctx != NULL); @@ -1177,6 +1199,9 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) D_GOTO(out, rc = CRT_REQ_TRACK_IN_INFLIGHQ); } + /* check inflight quota. if exceeded, queue this rpc */ + quota_rc = get_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); + grp_priv = crt_grp_pub2priv(rpc_priv->crp_pub.cr_ep.ep_grp); ep_rank = crt_grp_priv_get_primary_rank(grp_priv, rpc_priv->crp_pub.cr_ep.ep_rank); @@ -1228,15 +1253,16 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) rpc_priv->crp_epi = epi; RPC_ADDREF(rpc_priv); - if (crt_gdata.cg_credit_ep_ctx != 0 && + if (quota_rc == -DER_QUOTA_LIMIT) { + epi->epi_req_num++; + rpc_priv->crp_state = RPC_STATE_QUEUED; + rc = CRT_REQ_TRACK_IN_WAITQ; + } else if (crt_gdata.cg_credit_ep_ctx != 0 && (epi->epi_req_num - epi->epi_reply_num) >= crt_gdata.cg_credit_ep_ctx) { - if (rpc_priv->crp_opc_info->coi_queue_front) { - d_list_add(&rpc_priv->crp_epi_link, - &epi->epi_req_waitq); - } else { - d_list_add_tail(&rpc_priv->crp_epi_link, - &epi->epi_req_waitq); - } + if (rpc_priv->crp_opc_info->coi_queue_front) + d_list_add(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); + else + d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); epi->epi_req_wait_num++; rpc_priv->crp_state = RPC_STATE_QUEUED; @@ -1246,13 +1272,11 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) rc = crt_req_timeout_track(rpc_priv); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); if (rc == 0) { - d_list_add_tail(&rpc_priv->crp_epi_link, - &epi->epi_req_q); + d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_q); epi->epi_req_num++; rc = CRT_REQ_TRACK_IN_INFLIGHQ; } else { - RPC_ERROR(rpc_priv, - "crt_req_timeout_track failed, rc: %d.\n", rc); + RPC_ERROR(rpc_priv, "crt_req_timeout_track failed, rc: %d.\n", rc); /* roll back the addref above */ RPC_DECREF(rpc_priv); } @@ -1264,6 +1288,10 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) /* reference taken by d_hash_rec_find or "epi->epi_ref = 1" above */ D_MUTEX_LOCK(&crt_ctx->cc_mutex); d_hash_rec_decref(&crt_ctx->cc_epi_table, &epi->epi_link); + + if (quota_rc == -DER_QUOTA_LIMIT) + d_list_add_tail(&rpc_priv->crp_waitq_link, &crt_ctx->cc_quotas.rpc_waitq); + D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); out: @@ -1280,9 +1308,10 @@ credits_available(struct crt_ep_inflight *epi) { int64_t inflight = epi->epi_req_num - epi->epi_reply_num; - D_ASSERTF(inflight >= 0 && inflight <= crt_gdata.cg_credit_ep_ctx, - "req_num=%ld reply_num=%ld credit_ep_ctx=%u\n", epi->epi_req_num, - epi->epi_reply_num, crt_gdata.cg_credit_ep_ctx); + /* TODO: inflight right now includes items queued in quota waitq, and can exceed credit limit */ + if (inflight > crt_gdata.cg_credit_ep_ctx) + return 0; + return crt_gdata.cg_credit_ep_ctx - inflight; } @@ -1324,6 +1353,7 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) } else {/* RPC_CANCELED or RPC_INITED or RPC_TIMEOUT */ epi->epi_req_num--; } + D_ASSERT(epi->epi_req_num >= epi->epi_reply_num); D_MUTEX_UNLOCK(&epi->epi_mutex); @@ -1340,6 +1370,27 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) RPC_DECREF(rpc_priv); } +static void +dispatch_rpc(struct crt_rpc_priv *rpc) { + int rc; + + D_ASSERTF(rpc != NULL, "rpc is NULL\n"); + + crt_rpc_lock(rpc); + + rc = crt_req_send_internal(rpc); + if (rc == 0) { + crt_rpc_unlock(rpc); + } else { + RPC_ADDREF(rpc); + RPC_ERROR(rpc, "crt_req_send_internal failed, rc: %d\n", rc); + rpc->crp_state = RPC_STATE_INITED; + crt_context_req_untrack_internal(rpc); + /* for error case here */ + crt_rpc_complete_and_unlock(rpc, rc); + } +} + void crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) { @@ -1351,17 +1402,26 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) D_ASSERT(crt_ctx != NULL); - if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) { - RPC_TRACE(DB_NET, rpc_priv, "bypass untracking for URI_LOOKUP.\n"); + if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) return; - } epi = rpc_priv->crp_epi; D_ASSERT(epi != NULL); + /* Dispatch one rpc from wait_q if any or return resource back */ + D_MUTEX_LOCK(&crt_ctx->cc_mutex); + tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, + struct crt_rpc_priv, crp_waitq_link); + D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); + + if (tmp_rpc != NULL) + dispatch_rpc(tmp_rpc); + else + put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); + crt_context_req_untrack_internal(rpc_priv); - /* done if flow control disabled */ + /* done if ep credit flow control is disabled */ if (crt_gdata.cg_credit_ep_ctx == 0) return; @@ -1408,20 +1468,8 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) D_MUTEX_UNLOCK(&epi->epi_mutex); /* re-submit the rpc req */ - while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) { - crt_rpc_lock(tmp_rpc); - rc = crt_req_send_internal(tmp_rpc); - if (rc == 0) { - crt_rpc_unlock(tmp_rpc); - } else { - RPC_ADDREF(tmp_rpc); - RPC_ERROR(tmp_rpc, "crt_req_send_internal failed, rc: %d\n", rc); - tmp_rpc->crp_state = RPC_STATE_INITED; - crt_context_req_untrack_internal(tmp_rpc); - /* for error case here */ - crt_rpc_complete_and_unlock(tmp_rpc, rc); - } - } + while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) + dispatch_rpc(tmp_rpc); } /* TODO: Need per-provider call */ @@ -1910,3 +1958,134 @@ crt_req_force_completion(struct crt_rpc_priv *rpc_priv) crt_req_timeout_track(rpc_priv); D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); } + +static int +context_quotas_init(crt_context_t crt_ctx) +{ + struct crt_context *ctx = crt_ctx; + struct crt_quotas *quotas; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + quotas = &ctx->cc_quotas; + + quotas->limit[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota; + quotas->current[CRT_QUOTA_RPCS] = 0; + quotas->enabled[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota > 0 ? true : false; +out: + return rc; +} + +static int +context_quotas_finalize(crt_context_t crt_ctx) +{ + struct crt_context *ctx = crt_ctx; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + return -DER_INVAL; + } + + for (int i = 0; i < CRT_QUOTA_COUNT; i++) + ctx->cc_quotas.enabled[i] = false; + + return DER_SUCCESS; +} + +int +crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + D_MUTEX_LOCK(&ctx->cc_quotas.mutex); + ctx->cc_quotas.limit[quota] = value; + D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); + +out: + return rc; +} + +int +crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + if (ctx == NULL) { + D_ERROR("NULL context\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + if (quota < 0 || quota >= CRT_QUOTA_COUNT) { + D_ERROR("Invalid quota %d passed\n", quota); + D_GOTO(out, rc = -DER_INVAL); + } + + if (value == NULL) { + D_ERROR("NULL value\n"); + D_GOTO(out, rc = -DER_INVAL); + } + + *value = ctx->cc_quotas.limit[quota]; + +out: + return rc; +} + +static inline int +get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) +{ + struct crt_context *ctx = crt_ctx; + int rc = 0; + + D_ASSERTF(ctx != NULL, "NULL context\n"); + D_ASSERTF(quota >= 0 && quota < CRT_QUOTA_COUNT, "Invalid quota\n"); + + /* If quotas not enabled or unlimited quota */ + if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) + return 0; + + /* It's ok if we go slightly above quota in a corner case, but avoid locks */ + if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) { + atomic_fetch_add(&ctx->cc_quotas.current[quota], 1); + } else { + D_DEBUG(DB_TRACE, "Quota limit (%d) reached for quota_type=%d\n", + ctx->cc_quotas.limit[quota], quota); + rc = -DER_QUOTA_LIMIT; + } + + return rc; +} + +static inline void +put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) +{ + struct crt_context *ctx = crt_ctx; + + D_ASSERTF(ctx != NULL, "NULL context\n"); + D_ASSERTF(quota >= 0 && quota < CRT_QUOTA_COUNT, "Invalid quota\n"); + + /* If quotas not enabled or unlimited quota */ + if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) + return; + + D_ASSERTF(ctx->cc_quotas.current[quota] > 0, "Invalid current limit"); + atomic_fetch_sub(&ctx->cc_quotas.current[quota], 1); + + return; +} diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 18573f95e16..6c3805a78c9 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -1400,7 +1400,7 @@ crt_hg_req_send_cb(const struct hg_cb_info *hg_cbinfo) void crt_hg_req_send(struct crt_rpc_priv *rpc_priv) { - hg_return_t hg_ret; + hg_return_t hg_ret; D_ASSERT(rpc_priv != NULL); diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index ce580b1fe68..2182735293e 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -96,6 +96,7 @@ dump_envariables(void) "D_PORT_AUTO_ADJUST", "D_POLL_TIMEOUT", "D_LOG_FILE_APPEND_RANK", + "D_QUOTA_RPCS", "D_POST_INIT", "D_POST_INCR", "DAOS_SIGNAL_REGISTER"}; @@ -257,6 +258,7 @@ prov_data_init(struct crt_prov_gdata *prov_data, crt_provider_t provider, return DER_SUCCESS; } + /* first step init - for initializing crt_gdata */ static int data_init(int server, crt_init_options_t *opt) { @@ -322,6 +324,11 @@ static int data_init(int server, crt_init_options_t *opt) d_getenv_int("CRT_CREDIT_EP_CTX", &credits); } + /* Enable quotas by default only on clients */ + crt_gdata.cg_rpc_quota = server ? 0 : CRT_QUOTA_RPCS_DEFAULT; + + d_getenv_int("D_QUOTA_RPCS", &crt_gdata.cg_rpc_quota); + /* Must be set on the server when using UCX, will not affect OFI */ d_getenv_char("UCX_IB_FORK_INIT", &ucx_ib_fork_init); if (ucx_ib_fork_init) { diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index e9faa7607cd..2e71e28b693 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -88,7 +88,7 @@ struct crt_gdata { /** Provider specific data */ struct crt_prov_gdata cg_prov_gdata_primary; - /** */ + /** Placeholder for secondary provider data */ struct crt_prov_gdata *cg_prov_gdata_secondary; /** Hints to mercury for request post init (ignored for clients) */ @@ -110,6 +110,7 @@ struct crt_gdata { /** HG level global data */ struct crt_hg_gdata *cg_hg; + /** Points to default group */ struct crt_grp_gdata *cg_grp; /** refcount to protect crt_init/crt_finalize */ @@ -145,6 +146,8 @@ struct crt_gdata { struct d_tm_node_t *cg_uri_other; /** Number of cores on a system */ long cg_num_cores; + /** Inflight rpc quota limit */ + uint32_t cg_rpc_quota; }; extern struct crt_gdata crt_gdata; @@ -189,6 +192,14 @@ extern struct crt_plugin_gdata crt_plugin_gdata; #define CRT_DEFAULT_CREDITS_PER_EP_CTX (32) #define CRT_MAX_CREDITS_PER_EP_CTX (256) +struct crt_quotas { + int limit[CRT_QUOTA_COUNT]; + ATOMIC uint32_t current[CRT_QUOTA_COUNT]; + bool enabled[CRT_QUOTA_COUNT]; + pthread_mutex_t mutex; + d_list_t rpc_waitq; +}; + /* crt_context */ struct crt_context { d_list_t cc_link; /** link to gdata.cg_ctx_list */ @@ -227,6 +238,9 @@ struct crt_context { /** Stores self uri for the current context */ char cc_self_uri[CRT_ADDR_STR_MAX_LEN]; + + /** Stores quotas */ + struct crt_quotas cc_quotas; }; /* in-flight RPC req list, be tracked per endpoint for every crt_context */ diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index 5b680c4b6e8..e6fa9dd4705 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -656,9 +656,9 @@ int crt_req_create(crt_context_t crt_ctx, crt_endpoint_t *tgt_ep, crt_opcode_t opc, crt_rpc_t **req) { - int rc = 0; - struct crt_grp_priv *grp_priv = NULL; + struct crt_grp_priv *grp_priv = NULL; struct crt_rpc_priv *rpc_priv; + int rc = 0; if (crt_ctx == CRT_CONTEXT_NULL || req == NULL) { D_ERROR("invalid parameter (NULL crt_ctx or req).\n"); diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index a1d466c6c67..75a3f92ea33 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -18,6 +18,8 @@ #define CRT_DEFAULT_TIMEOUT_S (60) /* second */ #define CRT_DEFAULT_TIMEOUT_US (CRT_DEFAULT_TIMEOUT_S * 1e6) /* micro-second */ +#define CRT_QUOTA_RPCS_DEFAULT 64 + /* uri lookup max retry times */ #define CRT_URI_LOOKUP_RETRY_MAX (8) @@ -130,6 +132,8 @@ struct crt_rpc_priv { d_list_t crp_epi_link; /* tmp_link used in crt_context_req_untrack */ d_list_t crp_tmp_link; + /* link for crt_context::cc_quotas.rpc_waitq */ + d_list_t crp_waitq_link; /* link to parent RPC crp_opc_info->co_child_rpcs/co_replied_rpcs */ d_list_t crp_parent_link; /* binheap node for timeout management, in crt_context::cc_bh_timeout */ diff --git a/src/gurt/tests/test_gurt.c b/src/gurt/tests/test_gurt.c index 2bfddd37012..9d2dc8ea17c 100644 --- a/src/gurt/tests/test_gurt.c +++ b/src/gurt/tests/test_gurt.c @@ -120,11 +120,11 @@ test_d_errstr(void **state) /* Check the boundary at the end of the GURT error numbers, this will need updating if * additional error numbers are added. */ - value = d_errstr(-DER_HG_FATAL); - assert_string_equal(value, "DER_HG_FATAL"); - value = d_errstr(-1045); - assert_string_equal(value, "DER_HG_FATAL"); - value = d_errstr(-(DER_HG_FATAL + 1)); + value = d_errstr(-DER_QUOTA_LIMIT); + assert_string_equal(value, "DER_QUOTA_LIMIT"); + value = d_errstr(-1046); + assert_string_equal(value, "DER_QUOTA_LIMIT"); + value = d_errstr(-(DER_QUOTA_LIMIT + 1)); assert_string_equal(value, "DER_UNKNOWN"); /* Check the end of the DAOS error numbers. */ diff --git a/src/include/cart/api.h b/src/include/cart/api.h index 3aa87fccdaa..e76fb3433e3 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -2247,6 +2247,30 @@ crt_quiet_error(int err) return err == -DER_GRPVER; } +/** + * Change the quota limit. + * + * \param[in] crt_ctx CaRT context + * \param[in] quota Quota type + * \param[in] val Value + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value); + +/** + * Query the quota limit. + * + * \param[in] crt_ctx CaRT context + * \param[in] quota Quota type + * \param[out] val Returned value + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value); + /** * Get the proto version of an RPC request. * diff --git a/src/include/cart/types.h b/src/include/cart/types.h index 0ce7dd79815..d02e2881047 100644 --- a/src/include/cart/types.h +++ b/src/include/cart/types.h @@ -440,6 +440,17 @@ typedef enum { CRT_GROUP_MOD_OP_COUNT, } crt_group_mod_op_t; +/** + * Quotas supported by CaRT. + */ +typedef enum { + /** Limit of number of inflight rpcs */ + CRT_QUOTA_RPCS, + + /** Total count of supported quotas */ + CRT_QUOTA_COUNT, +} crt_quota_type_t; + /** @} */ #endif /* __CRT_TYPES_H__ */ diff --git a/src/include/daos_errno.h b/src/include/daos_errno.h index 2eca16d371e..f3a3cdddcdf 100644 --- a/src/include/daos_errno.h +++ b/src/include/daos_errno.h @@ -117,7 +117,9 @@ extern "C" { /** Invalid user/group permissions.*/ \ ACTION(DER_SHMEM_PERMS, Unable to access shared memory segment due to incompatible user or group permissions) \ /** Fatal (non-retry-able) transport layer mercury error */ \ - ACTION(DER_HG_FATAL, Fatal transport layer mercury error) + ACTION(DER_HG_FATAL, Fatal transport layer mercury error) \ + /** Quota limit reached on the requested resource */ \ + ACTION(DER_QUOTA_LIMIT, Quota limit reached) /** TODO: add more error numbers */ /** Preprocessor macro defining DAOS errno values and internal definition of d_errstr */