diff --git a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc index 4e31af0f3733..57f8a34befa0 100644 --- a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc +++ b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc @@ -86,7 +86,7 @@ static ss::future> fetch_offsets_from_shard( ret.push_back(response_t::make_epoch_end_offset( r.ntp.tp.partition, get_epoch_end_offset(r.requested_epoch, *p), - p->leader_epoch())); + r.requested_epoch)); } return ret; }); diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index e3ba953a87a6..09eba7082f9d 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -284,6 +284,11 @@ std::optional replicated_partition::get_leader_epoch_last_offset( const model::term_id term(epoch); const auto first_local_offset = _partition->start_offset(); const auto first_local_term = _partition->get_term(first_local_offset); + const auto last_local_term = _partition->term(); + if (term > last_local_term) { + // Request for term that is in the future + return std::nullopt; + } // Look for the highest offset in the requested term, or the first offset // in the next term. This mirrors behavior in Kafka, see // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 diff --git a/tests/rptest/tests/offset_for_leader_epoch_test.py b/tests/rptest/tests/offset_for_leader_epoch_test.py index 0b9cc095b990..3ce3beddb71e 100644 --- a/tests/rptest/tests/offset_for_leader_epoch_test.py +++ b/tests/rptest/tests/offset_for_leader_epoch_test.py @@ -108,11 +108,11 @@ def test_offset_for_leader_epoch(self): for o in leader_epoch_offsets: # check if the offset epoch matches what is expected or it is not available - # (may be the case if leader wasn't elected in term 1 but other term in this case the offset for term 1 will not be presetn) + # (may be the case if leader wasn't elected in term 1 but other term in this case the offset for term 1 will not be present) assert initial_offsets[(o.topic, o.partition)] == o.epoch_end_offset or ( o.epoch_end_offset == -1 - and o.leader_epoch > 1) + and o.leader_epoch == 1) # restart all the nodes to force leader election, # increase start timeout as partition count may get large @@ -129,7 +129,7 @@ def test_offset_for_leader_epoch(self): assert initial_offsets[(o.topic, o.partition)] == o.epoch_end_offset or ( o.epoch_end_offset == -1 - and o.leader_epoch > 2) + and o.leader_epoch == 1) last_offsets = self.list_offsets(topics=topics, total_partitions=total_partitions) @@ -152,6 +152,16 @@ def test_offset_for_leader_epoch(self): for o in leader_epoch_offsets: assert o.error is not None and "UNKNOWN_LEADER_EPOCH" in o.error + # test case for requested_epoch larger then leader_epoch + + leader_epoch_offsets = kcl.offset_for_leader_epoch(topics=topic_names, + leader_epoch=15000) + + for o in leader_epoch_offsets: + # Ensure the leader_epoch returned is not the current leader_epoch + # but the requested + assert o.error == '' and o.leader_epoch == 15000 and o.epoch_end_offset == -1 + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) def test_offset_for_leader_epoch_transfer(self):