diff --git a/tests/rptest/tests/prefix_truncate_recovery_test.py b/tests/rptest/tests/prefix_truncate_recovery_test.py index 7d9e559a68357..553988dcb56ad 100644 --- a/tests/rptest/tests/prefix_truncate_recovery_test.py +++ b/tests/rptest/tests/prefix_truncate_recovery_test.py @@ -13,6 +13,7 @@ from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.clients.kafka_cat import KafkaCat from rptest.services.admin import Admin @@ -24,7 +25,7 @@ ] -class PrefixTruncateRecoveryTest(RedpandaTest): +class PrefixTruncateRecoveryTestBase(RedpandaTest): """ The purpose of this test is to exercise recovery of partitions which have had data reclaimed based on retention policy. The testing strategy is: @@ -48,7 +49,7 @@ def __init__(self, test_context): enable_leader_balancer=False, ) - super(PrefixTruncateRecoveryTest, + super(PrefixTruncateRecoveryTestBase, self).__init__(test_context=test_context, num_brokers=3, extra_rp_conf=extra_rp_conf) @@ -56,6 +57,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) self.kafka_cat = KafkaCat(self.redpanda) + def setUp(self): + super(PrefixTruncateRecoveryTestBase, self).setUp() + def fully_replicated(self, nodes): """ Test that for each specified node that there are no reported under @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes): assert len(metric.samples) == len(nodes) return [s.value for s in metric.samples] - def produce_until_reclaim(self, initial_deleted, acks): + def produce_until_reclaim(self, living_nodes, initial_deleted, acks): """ Produce data until we observe that segments have been deleted. The initial_deleted parameter is the max number of segments deleted across nodes, and we wait for all partitions to report at least initial + 3 deletions so that all nodes have experienced some deletion. """ - deleted = self.get_segments_deleted(self.redpanda.nodes[1:]) + deleted = self.get_segments_deleted(living_nodes) if all(map(lambda d: d >= initial_deleted + 2, deleted)): return True self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks) return False - @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) - @matrix(acks=[-1, 1], start_empty=[True, False]) - def test_prefix_truncate_recovery(self, acks, start_empty): - # cover boundary conditions of partition being empty/non-empty - if not start_empty: - self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - # stop this unfortunate node - stopped_node = self.redpanda.nodes[0] - self.redpanda.stop_node(stopped_node) - - # produce data into the topic until segments are reclaimed - # by the configured retention policy - deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:])) - wait_until(lambda: self.produce_until_reclaim(deleted, acks), - timeout_sec=90, - backoff_sec=5) - - # we should now observe an under replicated state - wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]), - timeout_sec=90, - backoff_sec=5) - - # finally restart the node and wait until fully replicated - self.redpanda.start_node(stopped_node) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - self.verify_offsets() - def verify_offsets(self): """ Test that the ending offset for the partition as seen on each @@ -146,3 +116,77 @@ def verify_offsets(self): # appear to deal with this gracefully and we still pass. offsets.append(self.kafka_cat.list_offsets(self.topic, 0)) assert all(map(lambda o: o[1] == offsets[0][1], offsets)) + + def run_recovery(self, acks, dst_node): + """ + Runs the workload and recovery such that 'dst_node' is the receiver of + the recovery snapshot. + """ + # Stop this unfortunate node. + self.redpanda.stop_node(dst_node) + living_nodes = [n for n in self.redpanda.nodes if n != dst_node] + + # Produce until the topic reclaims some segments. + deleted = max(self.get_segments_deleted(living_nodes)) + wait_until( + lambda: self.produce_until_reclaim(living_nodes, deleted, acks), + timeout_sec=90, + backoff_sec=1) + + # We should now observe the under-replicated state. + wait_until(lambda: not self.fully_replicated(living_nodes), + timeout_sec=90, + backoff_sec=1) + + # Continue the node and wait until it's fully replicated. + self.redpanda.start_node(dst_node) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=1) + self.verify_offsets() + + +class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase): + @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) + @matrix(acks=[-1, 1], start_empty=[True, False]) + def test_prefix_truncate_recovery(self, acks, start_empty): + # cover boundary conditions of partition being empty/non-empty + if not start_empty: + self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=5) + self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0]) + + +class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase): + """ + Exercises the work required to install a snapshot works through the stages + of an upgrade. + """ + def __init__(self, test_context): + super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context) + self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION + + def setUp(self): + self.redpanda._installer.install(self.redpanda.nodes, + self.initial_version) + super(PrefixTruncateRecoveryUpgradeTest, self).setUp() + + @cluster(num_nodes=3, + log_allow_list=LOG_ALLOW_LIST + + MixedVersionWorkloadRunner.ALLOWED_LOGS) + def test_recover_during_upgrade(self): + def _run_recovery(src_node, dst_node): + # Force leadership to 'src_node' so we can deterministically check + # mixed-version traffic. + self.redpanda._admin.transfer_leadership_to( + namespace="kafka", + topic=self.topic, + partition=0, + target_id=self.redpanda.idx(src_node)) + # Speed the test up a bit by using acks=1. + self.run_recovery(acks=1, dst_node=dst_node) + + MixedVersionWorkloadRunner.upgrade_with_workload( + self.redpanda, self.initial_version, _run_recovery)