Skip to content

Commit

Permalink
k/fetch: respect the max_bytes from the fetch request
Browse files Browse the repository at this point in the history
While limiting the number of partitions in fetch response by
`kafka_max_bytes_per_fetch`, also consider the fetch plan's `bytes_left`
which is based on on fetch request's max_bytes and on `fetch_max_bytes`
property.
  • Loading branch information
dlex committed Jun 1, 2023
1 parent ee5e069 commit eb8a915
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,17 @@ static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
const replica_selector& replica_selector,
std::vector<ntp_fetch_config> ntp_fetch_configs,
bool foreign_read,
std::optional<model::timeout_clock::time_point> deadline) {
std::optional<model::timeout_clock::time_point> deadline,
const size_t bytes_left) {
size_t total_max_bytes = 0;
for (const auto& c : ntp_fetch_configs) {
total_max_bytes += c.cfg.max_bytes;
}

auto max_bytes_per_fetch
= config::shard_local_cfg().kafka_max_bytes_per_fetch();
// bytes_left comes from the fetch plan and also accounts for the max_bytes
// field in the fetch request
const size_t max_bytes_per_fetch = std::min<size_t>(
config::shard_local_cfg().kafka_max_bytes_per_fetch(), bytes_left);
if (total_max_bytes > max_bytes_per_fetch) {
auto per_partition = max_bytes_per_fetch / ntp_fetch_configs.size();
vlog(
Expand Down Expand Up @@ -429,7 +432,7 @@ handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
return ss::now();
}

bool foreign_read = shard != ss::this_shard_id();
const bool foreign_read = shard != ss::this_shard_id();

// dispatch to remote core
return octx.rctx.partition_manager()
Expand All @@ -440,12 +443,16 @@ handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
deadline = octx.deadline,
configs = std::move(fetch.requests),
&octx](cluster::partition_manager& mgr) mutable {
// &octx is captured only to immediately use its accessors here so
// that there is a list of all objects accessed next to `invoke_on`.
// This is meant to help avoiding unintended cross shard access
return fetch_ntps_in_parallel(
mgr,
octx.rctx.replica_selector(),
std::move(configs),
foreign_read,
deadline);
deadline,
octx.bytes_left);
})
.then([responses = std::move(fetch.responses),
start_time = fetch.start_time,
Expand Down

0 comments on commit eb8a915

Please sign in to comment.