From 8bae69c142ba56b9b84ad2a1b0dd431ba0c23aed Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 15 May 2023 20:16:20 +0100 Subject: [PATCH] cloud_storage: fix empty segment check in reader This was incorrectly offsetting the next kafka offset by 1. A segment with kafka data in it has a next offset != its base offset, while a segment _without_ kafka data has a next offset == its base offset. This was ignored by unit tests because the tests were using segment name format v1 which ignores delta_offset_end, and ignored by integration tests because if our reader path drops out then the client will retry, and because it only causes the problem if a segment has exactly one data batch in it. --- src/v/cloud_storage/remote_partition.cc | 5 ++++- src/v/cloud_storage/tests/remote_partition_test.cc | 8 +++++++- src/v/cloud_storage/tests/util.h | 6 +++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 6adeb76ced6d..654cecbd60b1 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -127,8 +127,11 @@ remote_partition::borrow_result_t remote_partition::borrow_next_reader( if (mit->delta_offset_end == model::offset_delta{}) { break; } + + // If a segment contains kafka data batches, its next offset will + // be greater than its base offset. auto b = mit->base_kafka_offset(); - auto end = mit->next_kafka_offset() - kafka::offset(1); + auto end = mit->next_kafka_offset(); if (b != end) { break; } diff --git a/src/v/cloud_storage/tests/remote_partition_test.cc b/src/v/cloud_storage/tests/remote_partition_test.cc index 25b42d5b1992..b29fa3fe4488 100644 --- a/src/v/cloud_storage/tests/remote_partition_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_test.cc @@ -240,7 +240,13 @@ FIXTURE_TEST(test_overlapping_segments, cloud_storage_fixture) { cloud_storage::partition_manifest manifest(manifest_ntp, manifest_revision); auto expectations = make_imposter_expectations( - manifest, segments, false, model::offset_delta(0)); + manifest, + segments, + false, + model::offset_delta(0), + // Use v1 format because it only includes the base offset, not the + // committed offset. We will modify the committed offset. + segment_name_format::v1); std::stringstream sstr; manifest.serialize(sstr); diff --git a/src/v/cloud_storage/tests/util.h b/src/v/cloud_storage/tests/util.h index 46a27eb5e6f5..33db06cba4b5 100644 --- a/src/v/cloud_storage/tests/util.h +++ b/src/v/cloud_storage/tests/util.h @@ -496,7 +496,8 @@ std::vector make_imposter_expectations( cloud_storage::partition_manifest& m, const std::vector& segments, bool truncate_segments = false, - model::offset_delta delta = model::offset_delta(0)) { + model::offset_delta delta = model::offset_delta(0), + segment_name_format sname_format = segment_name_format::v2) { std::vector results; for (const auto& s : segments) { @@ -524,8 +525,7 @@ std::vector make_imposter_expectations( .delta_offset = segment_delta, .ntp_revision = m.get_revision_id(), .delta_offset_end = model::offset_delta(delta) - + model::offset_delta(s.num_config_records), - }; + + model::offset_delta(s.num_config_records)}; m.add(s.sname, meta); delta = delta