diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index 973fdf753256..46e28a9ad395 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -353,7 +353,7 @@ ss::future<> segment::do_truncate( ss::future segment::materialize_index() { vassert( - _tracker.base_offset == _tracker.dirty_offset, + _tracker.base_offset == model::next_offset(_tracker.dirty_offset), "Materializing the index must happen before tracking any data. {}", *this); return _idx.materialize_index().then([this](bool yn) { diff --git a/src/v/storage/segment.h b/src/v/storage/segment.h index 3e18ab09e956..660efdb53c8a 100644 --- a/src/v/storage/segment.h +++ b/src/v/storage/segment.h @@ -44,9 +44,9 @@ class segment { offset_tracker(model::term_id t, model::offset base) : term(t) , base_offset(base) - , committed_offset(base) - , dirty_offset(base) - , stable_offset(base) {} + , committed_offset(model::prev_offset(base)) + , dirty_offset(model::prev_offset(base)) + , stable_offset(model::prev_offset(base)) {} model::term_id term; model::offset base_offset; diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index acd704b64534..661a9885748c 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -3048,6 +3048,7 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) { append_batch(log, model::term_id(0)); // write single message for final // segment after last roll + log.flush().get(); auto pre_gaps = analyze(*disk_log); auto pre_stats = log.offsets(); BOOST_REQUIRE_EQUAL( diff --git a/tests/rptest/tests/log_segment_ms_test.py b/tests/rptest/tests/log_segment_ms_test.py index bf8743636d3a..abbbff0c114d 100644 --- a/tests/rptest/tests/log_segment_ms_test.py +++ b/tests/rptest/tests/log_segment_ms_test.py @@ -197,3 +197,74 @@ def test_segment_timely_rolling_after_change(self): wait_until(lambda: self._total_segments_count(topic) > middle_count, timeout_sec=SERVER_HOUSEKEEPING_LOOP * 2, err_msg=f"failed to roll a segment in a timely manner") + + @cluster(num_nodes=3) + def test_segment_rolling_with_retention(self): + self.redpanda.set_cluster_config({ + "log_segment_ms": None, + "log_segment_ms_min": 10000 + }) + topic = TopicSpec(segment_bytes=(1024 * 1024), + replication_factor=1, + partition_count=1) + self.client().create_topic(topic) + + producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + throughput=10000) + + producer.start() + wait_until( + lambda: self._total_segments_count(topic) >= 5, + timeout_sec=120, + err_msg= + "producer failed to produce enough messages to create 5 segments") + # stop producer + producer.stop() + producer.clean() + producer.free() + del producer + start_count = self._total_segments_count(topic) + self.client().alter_topic_config(topic.name, "segment.ms", "15000") + + # wait for the segment.ms policy to roll the segment + wait_until(lambda: self._total_segments_count(topic) > start_count, + timeout_sec=60, + err_msg=f"failed waiting for the segment to roll") + + self.client().alter_topic_config(topic.name, + "retention.local.target.ms", "10000") + self.client().alter_topic_config(topic.name, "retention.ms", "10000") + + # wait for retention policy to trigger + wait_until(lambda: self._total_segments_count(topic) <= 1, + timeout_sec=60, + err_msg=f"failed waiting for retention policy") + + producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + throughput=10000) + producer.start() + wait_until( + lambda: self._total_segments_count(topic) >= 2, + timeout_sec=120, + err_msg= + f"producer failed to produce enough messages to create 5 segments") + producer.stop() + + consumer = VerifiableConsumer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + group_id="test-group") + consumer.start() + # Wait for any messages to be consumed, + # (if there is an issue in the offsets handling it will result in consumer being stuck) + wait_until(lambda: consumer.total_consumed() >= 1000, + timeout_sec=120, + err_msg=f"Failed to consume messages") + consumer.stop()