-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DAOS-14484 cart: Implement per-context inflight queue #13202
Changes from 17 commits
766df28
f2b7192
7e46893
39db446
94087bd
0914730
0f82132
559fcb3
8d121b5
343c2e5
8b15d48
3229a82
1f56841
59f45dc
3912044
c6820ce
d8fc6c9
f561648
d37c9a5
45463df
fe2543e
9010e17
584aaf7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ | |
#include "crt_internal.h" | ||
|
||
static void crt_epi_destroy(struct crt_ep_inflight *epi); | ||
static int context_quotas_init(crt_context_t crt_ctx); | ||
static int context_quotas_finalize(crt_context_t crt_ctx); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here, unless we have more, maybe that's not necessary to have quotas init and finalize (is finalize really needed also?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have quota mutex to destroy in finalize. Possibly for other quotas we might want to clean any lists, but not needed for rpc list as untrack logic of each rpc will take care of it. But we might need for allocation queues if we ever add those. |
||
static struct crt_ep_inflight * | ||
epi_link2ptr(d_list_t *rlink) | ||
|
@@ -141,6 +143,13 @@ crt_context_init(crt_context_t crt_ctx) | |
if (rc != 0) | ||
D_GOTO(out, rc); | ||
|
||
rc = D_MUTEX_INIT(&ctx->cc_quotas.mutex, NULL); | ||
if (rc != 0) { | ||
D_MUTEX_DESTROY(&ctx->cc_mutex); | ||
D_GOTO(out, rc); | ||
} | ||
|
||
D_INIT_LIST_HEAD(&ctx->cc_quotas.rpc_waitq); | ||
D_INIT_LIST_HEAD(&ctx->cc_link); | ||
|
||
/* create timeout binheap */ | ||
|
@@ -162,6 +171,8 @@ crt_context_init(crt_context_t crt_ctx) | |
D_GOTO(out_binheap_destroy, rc); | ||
} | ||
|
||
rc = context_quotas_init(crt_ctx); | ||
|
||
D_GOTO(out, rc); | ||
|
||
out_binheap_destroy: | ||
|
@@ -684,10 +695,17 @@ crt_context_destroy(crt_context_t crt_ctx, int force) | |
D_GOTO(out, rc = -DER_UNINIT); | ||
} | ||
|
||
rc = context_quotas_finalize(crt_ctx); | ||
if (rc) { | ||
DL_ERROR(rc, "context_quotas_finalize() failed"); | ||
if (!force) | ||
D_GOTO(out, rc); | ||
} | ||
|
||
ctx = crt_ctx; | ||
rc = crt_grp_ctx_invalid(ctx, false /* locked */); | ||
if (rc) { | ||
D_ERROR("crt_grp_ctx_invalid failed, rc: %d.\n", rc); | ||
DL_ERROR(rc, "crt_grp_ctx_invalid() failed"); | ||
if (!force) | ||
D_GOTO(out, rc); | ||
} | ||
|
@@ -1167,6 +1185,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) | |
d_list_t *rlink; | ||
d_rank_t ep_rank; | ||
int rc = 0; | ||
int quota_rc = 0; | ||
struct crt_grp_priv *grp_priv; | ||
|
||
D_ASSERT(crt_ctx != NULL); | ||
|
@@ -1177,6 +1196,10 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) | |
D_GOTO(out, rc = CRT_REQ_TRACK_IN_INFLIGHQ); | ||
} | ||
|
||
/* check inflight quota. if exceeded, queue this rpc */ | ||
quota_rc = crt_context_get_quota_resource(rpc_priv->crp_pub.cr_ctx, | ||
CRT_QUOTA_RPCS); | ||
|
||
grp_priv = crt_grp_pub2priv(rpc_priv->crp_pub.cr_ep.ep_grp); | ||
ep_rank = crt_grp_priv_get_primary_rank(grp_priv, | ||
rpc_priv->crp_pub.cr_ep.ep_rank); | ||
|
@@ -1228,15 +1251,16 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) | |
rpc_priv->crp_epi = epi; | ||
RPC_ADDREF(rpc_priv); | ||
|
||
if (crt_gdata.cg_credit_ep_ctx != 0 && | ||
if (quota_rc == -DER_QUOTA_LIMIT) { | ||
epi->epi_req_num++; | ||
rpc_priv->crp_state = RPC_STATE_QUEUED; | ||
rc = CRT_REQ_TRACK_IN_WAITQ; | ||
} else if (crt_gdata.cg_credit_ep_ctx != 0 && | ||
(epi->epi_req_num - epi->epi_reply_num) >= crt_gdata.cg_credit_ep_ctx) { | ||
if (rpc_priv->crp_opc_info->coi_queue_front) { | ||
d_list_add(&rpc_priv->crp_epi_link, | ||
&epi->epi_req_waitq); | ||
} else { | ||
d_list_add_tail(&rpc_priv->crp_epi_link, | ||
&epi->epi_req_waitq); | ||
} | ||
if (rpc_priv->crp_opc_info->coi_queue_front) | ||
d_list_add(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); | ||
else | ||
d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_waitq); | ||
|
||
epi->epi_req_wait_num++; | ||
rpc_priv->crp_state = RPC_STATE_QUEUED; | ||
|
@@ -1246,13 +1270,11 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) | |
rc = crt_req_timeout_track(rpc_priv); | ||
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); | ||
if (rc == 0) { | ||
d_list_add_tail(&rpc_priv->crp_epi_link, | ||
&epi->epi_req_q); | ||
d_list_add_tail(&rpc_priv->crp_epi_link, &epi->epi_req_q); | ||
epi->epi_req_num++; | ||
rc = CRT_REQ_TRACK_IN_INFLIGHQ; | ||
} else { | ||
RPC_ERROR(rpc_priv, | ||
"crt_req_timeout_track failed, rc: %d.\n", rc); | ||
RPC_ERROR(rpc_priv, "crt_req_timeout_track failed, rc: %d.\n", rc); | ||
/* roll back the addref above */ | ||
RPC_DECREF(rpc_priv); | ||
} | ||
|
@@ -1264,6 +1286,10 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv) | |
/* reference taken by d_hash_rec_find or "epi->epi_ref = 1" above */ | ||
D_MUTEX_LOCK(&crt_ctx->cc_mutex); | ||
d_hash_rec_decref(&crt_ctx->cc_epi_table, &epi->epi_link); | ||
|
||
if (quota_rc == -DER_QUOTA_LIMIT) | ||
d_list_add_tail(&rpc_priv->crp_waitq_link, &crt_ctx->cc_quotas.rpc_waitq); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this not within the block at line 1254 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So that it would not be done with epi->epi_mutex lock, but instead needs context lock. We can reorganize things nicer once we can get rid of EP credits-related code, which should simplify this and few other calls greatly. |
||
|
||
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); | ||
|
||
out: | ||
|
@@ -1280,9 +1306,10 @@ credits_available(struct crt_ep_inflight *epi) | |
{ | ||
int64_t inflight = epi->epi_req_num - epi->epi_reply_num; | ||
|
||
D_ASSERTF(inflight >= 0 && inflight <= crt_gdata.cg_credit_ep_ctx, | ||
"req_num=%ld reply_num=%ld credit_ep_ctx=%u\n", epi->epi_req_num, | ||
epi->epi_reply_num, crt_gdata.cg_credit_ep_ctx); | ||
/* TODO: inflight right now includes items queued in quota waitq, and can exceed credit limit */ | ||
if (inflight > crt_gdata.cg_credit_ep_ctx) | ||
return 0; | ||
|
||
return crt_gdata.cg_credit_ep_ctx - inflight; | ||
} | ||
|
||
|
@@ -1324,6 +1351,7 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) | |
} else {/* RPC_CANCELED or RPC_INITED or RPC_TIMEOUT */ | ||
epi->epi_req_num--; | ||
} | ||
|
||
D_ASSERT(epi->epi_req_num >= epi->epi_reply_num); | ||
|
||
D_MUTEX_UNLOCK(&epi->epi_mutex); | ||
|
@@ -1340,6 +1368,28 @@ crt_context_req_untrack_internal(struct crt_rpc_priv *rpc_priv) | |
RPC_DECREF(rpc_priv); | ||
} | ||
|
||
static void | ||
dispatch_rpc(struct crt_rpc_priv *rpc) { | ||
int rc; | ||
|
||
if (rpc == NULL) | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't that be an assert ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can just remove it now or change to assert yes. its a left-over from a previous behavior |
||
|
||
crt_rpc_lock(rpc); | ||
|
||
rc = crt_req_send_internal(rpc); | ||
if (rc == 0) { | ||
crt_rpc_unlock(rpc); | ||
} else { | ||
RPC_ADDREF(rpc); | ||
RPC_ERROR(rpc, "crt_req_send_internal failed, rc: %d\n", rc); | ||
rpc->crp_state = RPC_STATE_INITED; | ||
crt_context_req_untrack_internal(rpc); | ||
/* for error case here */ | ||
crt_rpc_complete_and_unlock(rpc, rc); | ||
} | ||
} | ||
|
||
void | ||
crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) | ||
{ | ||
|
@@ -1351,17 +1401,26 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) | |
|
||
D_ASSERT(crt_ctx != NULL); | ||
|
||
if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) { | ||
RPC_TRACE(DB_NET, rpc_priv, "bypass untracking for URI_LOOKUP.\n"); | ||
if (rpc_priv->crp_pub.cr_opc == CRT_OPC_URI_LOOKUP) | ||
return; | ||
} | ||
|
||
epi = rpc_priv->crp_epi; | ||
D_ASSERT(epi != NULL); | ||
|
||
/* Dispatch one rpc from wait_q if any or return resource back */ | ||
D_MUTEX_LOCK(&crt_ctx->cc_mutex); | ||
tmp_rpc = d_list_pop_entry(&crt_ctx->cc_quotas.rpc_waitq, | ||
struct crt_rpc_priv, crp_waitq_link); | ||
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); | ||
|
||
if (tmp_rpc != NULL) | ||
dispatch_rpc(tmp_rpc); | ||
else | ||
crt_context_put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. once the rpc is done, you either process the next rpc (reusing the existing quota) or you put the quota back if there is nothing else queued |
||
|
||
crt_context_req_untrack_internal(rpc_priv); | ||
|
||
/* done if flow control disabled */ | ||
/* done if ep credit flow control is disabled */ | ||
if (crt_gdata.cg_credit_ep_ctx == 0) | ||
return; | ||
|
||
|
@@ -1408,20 +1467,8 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv) | |
D_MUTEX_UNLOCK(&epi->epi_mutex); | ||
|
||
/* re-submit the rpc req */ | ||
while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) { | ||
crt_rpc_lock(tmp_rpc); | ||
rc = crt_req_send_internal(tmp_rpc); | ||
if (rc == 0) { | ||
crt_rpc_unlock(tmp_rpc); | ||
} else { | ||
RPC_ADDREF(tmp_rpc); | ||
RPC_ERROR(tmp_rpc, "crt_req_send_internal failed, rc: %d\n", rc); | ||
tmp_rpc->crp_state = RPC_STATE_INITED; | ||
crt_context_req_untrack_internal(tmp_rpc); | ||
/* for error case here */ | ||
crt_rpc_complete_and_unlock(tmp_rpc, rc); | ||
} | ||
} | ||
while ((tmp_rpc = d_list_pop_entry(&submit_list, struct crt_rpc_priv, crp_tmp_link))) | ||
dispatch_rpc(tmp_rpc); | ||
} | ||
|
||
/* TODO: Need per-provider call */ | ||
|
@@ -1910,3 +1957,152 @@ crt_req_force_completion(struct crt_rpc_priv *rpc_priv) | |
crt_req_timeout_track(rpc_priv); | ||
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex); | ||
} | ||
|
||
static int | ||
context_quotas_init(crt_context_t crt_ctx) | ||
{ | ||
frostedcmos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
struct crt_context *ctx = crt_ctx; | ||
struct crt_quotas *quotas; | ||
int rc = 0; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
quotas = &ctx->cc_quotas; | ||
|
||
quotas->limit[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota; | ||
quotas->current[CRT_QUOTA_RPCS] = 0; | ||
quotas->enabled[CRT_QUOTA_RPCS] = crt_gdata.cg_rpc_quota > 0 ? true : false; | ||
out: | ||
return rc; | ||
} | ||
|
||
static int | ||
context_quotas_finalize(crt_context_t crt_ctx) | ||
{ | ||
struct crt_context *ctx = crt_ctx; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
return -DER_INVAL; | ||
} | ||
|
||
for (int i = 0; i < CRT_QUOTA_COUNT; i++) | ||
ctx->cc_quotas.enabled[i] = false; | ||
|
||
return DER_SUCCESS; | ||
} | ||
|
||
int | ||
crt_context_quota_limit_set(crt_context_t crt_ctx, crt_quota_type_t quota, int value) | ||
{ | ||
struct crt_context *ctx = crt_ctx; | ||
int rc = 0; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
if (quota < 0 || quota >= CRT_QUOTA_COUNT) { | ||
D_ERROR("Invalid quota %d passed\n", quota); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
D_MUTEX_LOCK(&ctx->cc_quotas.mutex); | ||
ctx->cc_quotas.limit[quota] = value; | ||
D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); | ||
|
||
out: | ||
return rc; | ||
} | ||
|
||
int | ||
crt_context_quota_limit_get(crt_context_t crt_ctx, crt_quota_type_t quota, int *value) | ||
{ | ||
struct crt_context *ctx = crt_ctx; | ||
int rc = 0; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
if (quota < 0 || quota >= CRT_QUOTA_COUNT) { | ||
D_ERROR("Invalid quota %d passed\n", quota); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
if (value == NULL) { | ||
D_ERROR("NULL value\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
*value = ctx->cc_quotas.limit[quota]; | ||
|
||
out: | ||
return rc; | ||
} | ||
|
||
int | ||
crt_context_get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) | ||
{ | ||
struct crt_context *ctx = crt_ctx; | ||
int rc = 0; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
if (quota < 0 || quota >= CRT_QUOTA_COUNT) { | ||
D_ERROR("Invalid quota %d passed\n", quota); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
/* If quotas not enabled or unlimited quota */ | ||
if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) | ||
return 0; | ||
|
||
D_MUTEX_LOCK(&ctx->cc_quotas.mutex); | ||
if (ctx->cc_quotas.current[quota] < ctx->cc_quotas.limit[quota]) | ||
ctx->cc_quotas.current[quota]++; | ||
else { | ||
D_WARN("Quota limit reached for quota_type=%d\n", quota); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is overly chatty and should be a debug There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NB: "overly chatty" == gigabytes of client logs under load. ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
rc = -DER_QUOTA_LIMIT; | ||
} | ||
D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); | ||
out: | ||
return rc; | ||
} | ||
|
||
int | ||
crt_context_put_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota) | ||
{ | ||
struct crt_context *ctx = crt_ctx; | ||
int rc = 0; | ||
|
||
if (ctx == NULL) { | ||
D_ERROR("NULL context\n"); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
if (quota < 0 || quota >= CRT_QUOTA_COUNT) { | ||
D_ERROR("Invalid quota %d passed\n", quota); | ||
D_GOTO(out, rc = -DER_INVAL); | ||
} | ||
|
||
/* If quotas not enabled or unlimited quota */ | ||
if (!ctx->cc_quotas.enabled[quota] || ctx->cc_quotas.limit[quota] == 0) | ||
return 0; | ||
|
||
D_MUTEX_LOCK(&ctx->cc_quotas.mutex); | ||
D_ASSERTF(ctx->cc_quotas.current[quota] > 0, "Invalid current limit"); | ||
ctx->cc_quotas.current[quota]--; | ||
D_MUTEX_UNLOCK(&ctx->cc_quotas.mutex); | ||
|
||
out: | ||
return rc; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should just call it
D_RPC_MAX_IN_FLIGHT
even if it's a little longer ? :) as I feelD_QUOTA_RPCS
might be too generic and introduce a different nomenclature withD_QUOTA
(although I know that's what you're trying to do here).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I am open to naming, but i dont like D_RPC_MAX_IN_FLIGHT as its long and harder to remember.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with D_QUOTA_RPCS.