diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 159960dfe526f..ed3e05d059715 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -413,86 +413,97 @@ class simple_fetch_planner final : public fetch_planner::impl { /** * group fetch requests by shard */ - octx.for_each_fetch_partition( - [&resp_it, &octx, &plan, &bytes_left_in_plan]( - const fetch_session_partition& fp) { - // if this is not an initial fetch we are allowed to skip - // partions that aleready have an error or we have enough data - if (!octx.initial_fetch) { - bool has_enough_data - = !resp_it->partition_response->records->empty() - && octx.over_min_bytes(); - - if ( - resp_it->partition_response->error_code != error_code::none - || has_enough_data) { - ++resp_it; - return; - } - } - /** - * if not authorized do not include into a plan - */ - if (!octx.rctx.authorized( - security::acl_operation::read, fp.topic)) { - (resp_it).set(make_partition_response_error( - fp.partition, error_code::topic_authorization_failed)); - ++resp_it; - return; - } - - auto ntp = model::ntp( - model::kafka_namespace, fp.topic, fp.partition); - - // there is given partition in topic metadata, return - // unknown_topic_or_partition error - if (unlikely(!octx.rctx.metadata_cache().contains(ntp))) { - (resp_it).set(make_partition_response_error( - fp.partition, error_code::unknown_topic_or_partition)); - ++resp_it; - return; - } + octx.for_each_fetch_partition([&resp_it, + &octx, + &plan, + &bytes_left_in_plan]( + const fetch_session_partition& fp) { + // if this is not an initial fetch we are allowed to skip + // partions that aleready have an error or we have enough data + if (!octx.initial_fetch) { + bool has_enough_data + = !resp_it->partition_response->records->empty() + && octx.over_min_bytes(); + + if ( + resp_it->partition_response->error_code != error_code::none + || has_enough_data) { + ++resp_it; + return; + } + } + /** + * if not authorized do not include into a plan + */ + if (!octx.rctx.authorized( + security::acl_operation::read, fp.topic)) { + (resp_it).set(make_partition_response_error( + fp.partition, error_code::topic_authorization_failed)); + ++resp_it; + return; + } - auto shard = octx.rctx.shards().shard_for(ntp); - if (!shard) { - /** - * no shard is found on current node, but topic exists in - * cluster metadata, this mean that the partition was moved - * but consumer has not updated its metadata yet. we return - * not_leader_for_partition error to force metadata update. - */ - (resp_it).set(make_partition_response_error( - fp.partition, error_code::not_leader_for_partition)); - ++resp_it; - return; - } + auto ntp = model::ntp( + model::kafka_namespace, fp.topic, fp.partition); + + auto shard = octx.rctx.shards().shard_for(ntp); + if (unlikely(!shard)) { + /* + * If the shard associated with the ntp isn't found on the + * current node then either (1) the ntp doesn't exist at all + * (e.g., it has been deleted) or (2) it has been moved and + * the client's metadata is stale. + */ + + error_code ec; + if (!octx.rctx.metadata_cache().contains(ntp)) { + /* + * Case (1) the given topic/partition does not exist in + * topic metadata, return unknown_topic_or_partition error + */ + ec = error_code::unknown_topic_or_partition; + } else { + /* + * Case (2): no shard is found on current node, but topic + * exists in cluster metadata, this mean that the + * partition was moved but consumer has not updated its + * metadata yet. we return not_leader_for_partition error + * to force metadata update. + */ + ec = error_code::not_leader_for_partition; + } + + resp_it.set(make_partition_response_error(fp.partition, ec)); + ++resp_it; + + return; + } - auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp); - auto max_bytes = std::min( - bytes_left_in_plan, size_t(fp.max_bytes)); - /** - * If offset is greater, assume that fetch will read max_bytes - */ - if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { - bytes_left_in_plan -= max_bytes; - } + auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp); + auto max_bytes = std::min(bytes_left_in_plan, size_t(fp.max_bytes)); + /** + * If offset is greater, assume that fetch will read max_bytes + */ + if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { + bytes_left_in_plan -= max_bytes; + } - fetch_config config{ - .start_offset = fp.fetch_offset, - .max_offset = model::model_limits::max(), - .isolation_level = octx.request.data.isolation_level, - .max_bytes = max_bytes, - .timeout = octx.deadline.value_or(model::no_timeout), - .strict_max_bytes = octx.response_size > 0, - .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, - .current_leader_epoch = fp.current_leader_epoch, - }; - - plan.fetches_per_shard[*shard].push_back( - make_ntp_fetch_config(ntp, config), - resp_it++, - octx.rctx.probe().auto_fetch_measurement()); - }); + fetch_config config{ + .start_offset = fp.fetch_offset, + .max_offset = model::model_limits::max(), + .isolation_level = octx.request.data.isolation_level, + .max_bytes = max_bytes, + .timeout = octx.deadline.value_or(model::no_timeout), + .strict_max_bytes = octx.response_size > 0, + .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, + .current_leader_epoch = fp.current_leader_epoch, + }; + + plan.fetches_per_shard[*shard].push_back( + make_ntp_fetch_config(ntp, config), + resp_it++, + octx.rctx.probe().auto_fetch_measurement()); + }); return plan; }