From 55df9ffcd738c8c8dbd5efa91ca1c2f015f71061 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 25 Apr 2022 15:01:53 -0700 Subject: [PATCH] Move stale metadata check inside shard_for metadata_cache.contains() is a slow call with many partitions becase it scans the entire parition list looking for the specified one. On the fetch planning path we can avoid this cost simply by moving the check inside a shard_for check we do immediately after the metadata check: in a stable system the shard_for check will fail any time the metadata call will fail. Issue #4410 --- src/v/kafka/server/handlers/fetch.cc | 44 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index eac1b7a2f95a2..cb87e14caff94 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -439,26 +439,36 @@ class simple_fetch_planner final : public fetch_planner::impl { auto ntp = model::ntp( model::kafka_namespace, fp.topic, fp.partition); - // there is given partition in topic metadata, return - // unknown_topic_or_partition error - if (unlikely(!octx.rctx.metadata_cache().contains(ntp))) { - resp_it->set(make_partition_response_error( - fp.partition, error_code::unknown_topic_or_partition)); - ++resp_it; - return; - } - auto shard = octx.rctx.shards().shard_for(ntp); - if (!shard) { - /** - * no shard is found on current node, but topic exists in - * cluster metadata, this mean that the partition was moved - * but consumer has not updated its metadata yet. we return - * not_leader_for_partition error to force metadata update. + if (unlikely(!shard)) { + /* + * If the shard associated with the ntp isn't found on the + * current node then either (1) the ntp doesn't exist at all + * (e.g., it has been deleted) or (2) it has been moved and + * the client's metadata is stale. */ - resp_it->set(make_partition_response_error( - fp.partition, error_code::not_leader_for_partition)); + + error_code ec; + if (!octx.rctx.metadata_cache().contains(ntp)) { + /* + * Case (1) the given topic/partition does not exist in + * topic metadata, return unknown_topic_or_partition error + */ + ec = error_code::unknown_topic_or_partition; + } else { + /* + * Case (2): no shard is found on current node, but topic + * exists in cluster metadata, this mean that the + * partition was moved but consumer has not updated its + * metadata yet. we return not_leader_for_partition error + * to force metadata update. + */ + ec = error_code::not_leader_for_partition; + } + + resp_it.set(make_partition_response_error(fp.partition, ec)); ++resp_it; + return; }