diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index eac1b7a2f95a..a70bb579e98f 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -439,26 +439,36 @@ class simple_fetch_planner final : public fetch_planner::impl { auto ntp = model::ntp( model::kafka_namespace, fp.topic, fp.partition); - // there is given partition in topic metadata, return - // unknown_topic_or_partition error - if (unlikely(!octx.rctx.metadata_cache().contains(ntp))) { - resp_it->set(make_partition_response_error( - fp.partition, error_code::unknown_topic_or_partition)); - ++resp_it; - return; - } - auto shard = octx.rctx.shards().shard_for(ntp); - if (!shard) { - /** - * no shard is found on current node, but topic exists in - * cluster metadata, this mean that the partition was moved - * but consumer has not updated its metadata yet. we return - * not_leader_for_partition error to force metadata update. + if (unlikely(!shard)) { + /* + * If the shard associated with the ntp isn't found on the + * current node then either (1) the ntp doesn't exist at all + * (e.g., it has been deleted) or (2) it has been moved and + * the client's metadata is stale. */ - resp_it->set(make_partition_response_error( - fp.partition, error_code::not_leader_for_partition)); + + error_code ec; + if (!octx.rctx.metadata_cache().contains(ntp)) { + /* + * Case (1) the given topic/partition does not exist in + * topic metadata, return unknown_topic_or_partition error + */ + ec = error_code::unknown_topic_or_partition; + } else { + /* + * Case (2): no shard is found on current node, but topic + * exists in cluster metadata, this mean that the + * partition was moved but consumer has not updated its + * metadata yet. we return not_leader_for_partition error + * to force metadata update. + */ + ec = error_code::not_leader_for_partition; + } + + resp_it->set(make_partition_response_error(fp.partition, ec)); ++resp_it; + return; }