Skip to content

Commit

Permalink
Move stale metadata check inside shard_for
Browse files Browse the repository at this point in the history
metadata_cache.contains() is a slow call with many partitions becase
it scans the entire parition list looking for the specified one.

On the fetch planning path we can avoid this cost simply by
moving the check inside a shard_for check we do immediately
after the metadata check: in a stable system the shard_for check will
fail any time the metadata call will fail.

Issue redpanda-data#4410
  • Loading branch information
travisdowns committed Apr 25, 2022
1 parent fb0529d commit 0523e2e
Showing 1 changed file with 88 additions and 77 deletions.
165 changes: 88 additions & 77 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,86 +413,97 @@ class simple_fetch_planner final : public fetch_planner::impl {
/**
* group fetch requests by shard
*/
octx.for_each_fetch_partition(
[&resp_it, &octx, &plan, &bytes_left_in_plan](
const fetch_session_partition& fp) {
// if this is not an initial fetch we are allowed to skip
// partions that aleready have an error or we have enough data
if (!octx.initial_fetch) {
bool has_enough_data
= !resp_it->partition_response->records->empty()
&& octx.over_min_bytes();

if (
resp_it->partition_response->error_code != error_code::none
|| has_enough_data) {
++resp_it;
return;
}
}
/**
* if not authorized do not include into a plan
*/
if (!octx.rctx.authorized(
security::acl_operation::read, fp.topic)) {
(resp_it).set(make_partition_response_error(
fp.partition, error_code::topic_authorization_failed));
++resp_it;
return;
}

auto ntp = model::ntp(
model::kafka_namespace, fp.topic, fp.partition);

// there is given partition in topic metadata, return
// unknown_topic_or_partition error
if (unlikely(!octx.rctx.metadata_cache().contains(ntp))) {
(resp_it).set(make_partition_response_error(
fp.partition, error_code::unknown_topic_or_partition));
++resp_it;
return;
}
octx.for_each_fetch_partition([&resp_it,
&octx,
&plan,
&bytes_left_in_plan](
const fetch_session_partition& fp) {
// if this is not an initial fetch we are allowed to skip
// partions that aleready have an error or we have enough data
if (!octx.initial_fetch) {
bool has_enough_data
= !resp_it->partition_response->records->empty()
&& octx.over_min_bytes();

if (
resp_it->partition_response->error_code != error_code::none
|| has_enough_data) {
++resp_it;
return;
}
}
/**
* if not authorized do not include into a plan
*/
if (!octx.rctx.authorized(
security::acl_operation::read, fp.topic)) {
(resp_it).set(make_partition_response_error(
fp.partition, error_code::topic_authorization_failed));
++resp_it;
return;
}

auto shard = octx.rctx.shards().shard_for(ntp);
if (!shard) {
/**
* no shard is found on current node, but topic exists in
* cluster metadata, this mean that the partition was moved
* but consumer has not updated its metadata yet. we return
* not_leader_for_partition error to force metadata update.
*/
(resp_it).set(make_partition_response_error(
fp.partition, error_code::not_leader_for_partition));
++resp_it;
return;
}
auto ntp = model::ntp(
model::kafka_namespace, fp.topic, fp.partition);

auto shard = octx.rctx.shards().shard_for(ntp);
if (unlikely(!shard)) {
/*
* If the shard associated with the ntp isn't found on the
* current node then either (1) the ntp doesn't exist at all
* (e.g., it has been deleted) or (2) it has been moved and
* the client's metadata is stale.
*/

error_code ec;
if (!octx.rctx.metadata_cache().contains(ntp)) {
/*
* Case (1) the given topic/partition does not exist in
* topic metadata, return unknown_topic_or_partition error
*/
ec = error_code::unknown_topic_or_partition;
} else {
/*
* Case (2): no shard is found on current node, but topic
* exists in cluster metadata, this mean that the
* partition was moved but consumer has not updated its
* metadata yet. we return not_leader_for_partition error
* to force metadata update.
*/
ec = error_code::not_leader_for_partition;
}

resp_it.set(make_partition_response_error(fp.partition, ec));
++resp_it;

return;
}

auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp);
auto max_bytes = std::min(
bytes_left_in_plan, size_t(fp.max_bytes));
/**
* If offset is greater, assume that fetch will read max_bytes
*/
if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) {
bytes_left_in_plan -= max_bytes;
}
auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(ntp);
auto max_bytes = std::min(bytes_left_in_plan, size_t(fp.max_bytes));
/**
* If offset is greater, assume that fetch will read max_bytes
*/
if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) {
bytes_left_in_plan -= max_bytes;
}

fetch_config config{
.start_offset = fp.fetch_offset,
.max_offset = model::model_limits<model::offset>::max(),
.isolation_level = octx.request.data.isolation_level,
.max_bytes = max_bytes,
.timeout = octx.deadline.value_or(model::no_timeout),
.strict_max_bytes = octx.response_size > 0,
.skip_read = bytes_left_in_plan == 0 && max_bytes == 0,
.current_leader_epoch = fp.current_leader_epoch,
};

plan.fetches_per_shard[*shard].push_back(
make_ntp_fetch_config(ntp, config),
resp_it++,
octx.rctx.probe().auto_fetch_measurement());
});
fetch_config config{
.start_offset = fp.fetch_offset,
.max_offset = model::model_limits<model::offset>::max(),
.isolation_level = octx.request.data.isolation_level,
.max_bytes = max_bytes,
.timeout = octx.deadline.value_or(model::no_timeout),
.strict_max_bytes = octx.response_size > 0,
.skip_read = bytes_left_in_plan == 0 && max_bytes == 0,
.current_leader_epoch = fp.current_leader_epoch,
};

plan.fetches_per_shard[*shard].push_back(
make_ntp_fetch_config(ntp, config),
resp_it++,
octx.rctx.probe().auto_fetch_measurement());
});

return plan;
}
Expand Down

0 comments on commit 0523e2e

Please sign in to comment.