Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of #12063 v23.1.x #12086

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/offset_for_leader_epoch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static ss::future<std::vector<epoch_end_offset>> fetch_offsets_from_shard(
ret.push_back(response_t::make_epoch_end_offset(
r.ntp.tp.partition,
get_epoch_end_offset(r.requested_epoch, *p),
p->leader_epoch()));
r.requested_epoch));
}
return ret;
});
Expand Down
5 changes: 5 additions & 0 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
const model::term_id term(epoch);
const auto first_local_offset = _partition->start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
const auto last_local_term = _partition->term();
if (term > last_local_term) {
// Request for term that is in the future
return std::nullopt;
}
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
Expand Down
16 changes: 13 additions & 3 deletions tests/rptest/tests/offset_for_leader_epoch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ def test_offset_for_leader_epoch(self):

for o in leader_epoch_offsets:
# check if the offset epoch matches what is expected or it is not available
# (may be the case if leader wasn't elected in term 1 but other term in this case the offset for term 1 will not be presetn)
# (may be the case if leader wasn't elected in term 1 but other term in this case the offset for term 1 will not be present)
assert initial_offsets[(o.topic,
o.partition)] == o.epoch_end_offset or (
o.epoch_end_offset == -1
and o.leader_epoch > 1)
and o.leader_epoch == 1)

# restart all the nodes to force leader election,
# increase start timeout as partition count may get large
Expand All @@ -129,7 +129,7 @@ def test_offset_for_leader_epoch(self):
assert initial_offsets[(o.topic,
o.partition)] == o.epoch_end_offset or (
o.epoch_end_offset == -1
and o.leader_epoch > 2)
and o.leader_epoch == 1)

last_offsets = self.list_offsets(topics=topics,
total_partitions=total_partitions)
Expand All @@ -152,6 +152,16 @@ def test_offset_for_leader_epoch(self):
for o in leader_epoch_offsets:
assert o.error is not None and "UNKNOWN_LEADER_EPOCH" in o.error

# test case for requested_epoch larger then leader_epoch

leader_epoch_offsets = kcl.offset_for_leader_epoch(topics=topic_names,
leader_epoch=15000)

for o in leader_epoch_offsets:
# Ensure the leader_epoch returned is not the current leader_epoch
# but the requested
assert o.error == '' and o.leader_epoch == 15000 and o.epoch_end_offset == -1

@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_offset_for_leader_epoch_transfer(self):

Expand Down