From 0523e2e07f71d476ada90cb2a799f67baabefe87 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 25 Apr 2022 14:36:39 -0700 Subject: [PATCH] Move stale metadata check inside shard_for metadata_cache.contains() is a slow call with many partitions becase it scans the entire parition list looking for the specified one. On the fetch planning path we can avoid this cost simply by moving the check inside a shard_for check we do immediately after the metadata check: in a stable system the shard_for check will fail any time the metadata call will fail. Issue #4410 --- src/v/kafka/server/handlers/fetch.cc | 165 ++++++++++++++------------- 1 file changed, 88 insertions(+), 77 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 159960dfe526f..ed3e05d059715 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -413,86 +413,97 @@ class simple_fetch_planner final : public fetch_planner::impl { /** * group fetch requests by shard */ - octx.for_each_fetch_partition( - [&resp_it, &octx, &plan, &bytes_left_in_plan]( - const fetch_session_partition& fp) { - // if this is not an initial fetch we are allowed to skip - // partions that aleready have an error or we have enough data - if (!octx.initial_fetch) { - bool has_enough_data - = !resp_it->partition_response->records->empty() - && octx.over_min_bytes(); - - if ( - resp_it->partition_response->error_code != error_code::none - || has_enough_data) { - ++resp_it; - return; - } - } - /** - * if not authorized do not include into a plan - */ - if (!octx.rctx.authorized( - security::acl_operation::read, fp.topic)) { - (resp_it).set(make_partition_response_error( - fp.partition, error_code::topic_authorization_failed)); - ++resp_it; - return; - } - - auto ntp = model::ntp( - model::kafka_namespace, fp.topic, fp.partition); - - // there is given partition in topic metadata, return - // unknown_topic_or_partition error - if (unlikely(!octx.rctx.metadata_cache().contains(ntp))) { - (resp_it).set(make_partition_response_error( - fp.partition, error_code::unknown_topic_or_partition)); - ++resp_it; - return; - } + octx.for_each_fetch_partition([&resp_it, + &octx, + &plan, + &bytes_left_in_plan]( + const fetch_session_partition& fp) { + // if this is not an initial fetch we are allowed to skip + // partions that aleready have an error or we have enough data + if (!octx.initial_fetch) { + bool has_enough_data + = !resp_it->partition_response->records->empty() + && octx.over_min_bytes(); + + if ( + resp_it->partition_response->error_code != error_code::none + || has_enough_data) { + ++resp_it; + return; + } + } + /** + * if not authorized do not include into a plan + */ + if (!octx.rctx.authorized( + security::acl_operation::read, fp.topic)) { + (resp_it).set(make_partition_response_error( + fp.partition, error_code::topic_authorization_failed)); + ++resp_it; + return; + } - auto shard = octx.rctx.shards().shard_for(ntp); - if (!shard) { - /** - * no shard is found on current node, but topic exists in - * cluster metadata, this mean that the partition was moved - * but consumer has not updated its metadata yet. we return - * not_leader_for_partition error to force metadata update. - */ - (resp_it).set(make_partition_response_error( - fp.partition, error_code::not_leader_for_partition)); - ++resp_it; - return; - } + auto ntp = model::ntp( + model::kafka_namespace, fp.topic, fp.partition); + + auto shard = octx.rctx.shards().shard_for(ntp); + if (unlikely(!shard)) { + /* + * If the shard associated with the ntp isn't found on the + * current node then either (1) the ntp doesn't exist at all + * (e.g., it has been deleted) or (2) it has been moved and + * the client's metadata is stale. + */ + + error_code ec; + if (!octx.rctx.metadata_cache().contains(ntp)) { + /* + * Case (1) the given topic/partition does not exist in + * topic metadata, return unknown_topic_or_partition error + */ + ec = error_code::unknown_topic_or_partition; + } else { + /* + * Case (2): no shard is found on current node, but topic + * exists in cluster metadata, this mean that the + * partition was moved but consumer has not updated its + * metadata yet. we return not_leader_for_partition error + * to force metadata update. + */ + ec = error_code::not_leader_for_partition; + } + + resp_it.set(make_partition_response_error(fp.partition, ec)); + ++resp_it; + + return; + } - auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp); - auto max_bytes = std::min( - bytes_left_in_plan, size_t(fp.max_bytes)); - /** - * If offset is greater, assume that fetch will read max_bytes - */ - if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { - bytes_left_in_plan -= max_bytes; - } + auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp); + auto max_bytes = std::min(bytes_left_in_plan, size_t(fp.max_bytes)); + /** + * If offset is greater, assume that fetch will read max_bytes + */ + if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { + bytes_left_in_plan -= max_bytes; + } - fetch_config config{ - .start_offset = fp.fetch_offset, - .max_offset = model::model_limits::max(), - .isolation_level = octx.request.data.isolation_level, - .max_bytes = max_bytes, - .timeout = octx.deadline.value_or(model::no_timeout), - .strict_max_bytes = octx.response_size > 0, - .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, - .current_leader_epoch = fp.current_leader_epoch, - }; - - plan.fetches_per_shard[*shard].push_back( - make_ntp_fetch_config(ntp, config), - resp_it++, - octx.rctx.probe().auto_fetch_measurement()); - }); + fetch_config config{ + .start_offset = fp.fetch_offset, + .max_offset = model::model_limits::max(), + .isolation_level = octx.request.data.isolation_level, + .max_bytes = max_bytes, + .timeout = octx.deadline.value_or(model::no_timeout), + .strict_max_bytes = octx.response_size > 0, + .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, + .current_leader_epoch = fp.current_leader_epoch, + }; + + plan.fetches_per_shard[*shard].push_back( + make_ntp_fetch_config(ntp, config), + resp_it++, + octx.rctx.probe().auto_fetch_measurement()); + }); return plan; }