diff --git a/tests/rptest/tests/prefix_truncate_recovery_test.py b/tests/rptest/tests/prefix_truncate_recovery_test.py index 7d9e559a6835..553988dcb56a 100644 --- a/tests/rptest/tests/prefix_truncate_recovery_test.py +++ b/tests/rptest/tests/prefix_truncate_recovery_test.py @@ -13,6 +13,7 @@ from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.clients.kafka_cat import KafkaCat from rptest.services.admin import Admin @@ -24,7 +25,7 @@ ] -class PrefixTruncateRecoveryTest(RedpandaTest): +class PrefixTruncateRecoveryTestBase(RedpandaTest): """ The purpose of this test is to exercise recovery of partitions which have had data reclaimed based on retention policy. The testing strategy is: @@ -48,7 +49,7 @@ def __init__(self, test_context): enable_leader_balancer=False, ) - super(PrefixTruncateRecoveryTest, + super(PrefixTruncateRecoveryTestBase, self).__init__(test_context=test_context, num_brokers=3, extra_rp_conf=extra_rp_conf) @@ -56,6 +57,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) self.kafka_cat = KafkaCat(self.redpanda) + def setUp(self): + super(PrefixTruncateRecoveryTestBase, self).setUp() + def fully_replicated(self, nodes): """ Test that for each specified node that there are no reported under @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes): assert len(metric.samples) == len(nodes) return [s.value for s in metric.samples] - def produce_until_reclaim(self, initial_deleted, acks): + def produce_until_reclaim(self, living_nodes, initial_deleted, acks): """ Produce data until we observe that segments have been deleted. The initial_deleted parameter is the max number of segments deleted across nodes, and we wait for all partitions to report at least initial + 3 deletions so that all nodes have experienced some deletion. """ - deleted = self.get_segments_deleted(self.redpanda.nodes[1:]) + deleted = self.get_segments_deleted(living_nodes) if all(map(lambda d: d >= initial_deleted + 2, deleted)): return True self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks) return False - @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) - @matrix(acks=[-1, 1], start_empty=[True, False]) - def test_prefix_truncate_recovery(self, acks, start_empty): - # cover boundary conditions of partition being empty/non-empty - if not start_empty: - self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - # stop this unfortunate node - stopped_node = self.redpanda.nodes[0] - self.redpanda.stop_node(stopped_node) - - # produce data into the topic until segments are reclaimed - # by the configured retention policy - deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:])) - wait_until(lambda: self.produce_until_reclaim(deleted, acks), - timeout_sec=90, - backoff_sec=5) - - # we should now observe an under replicated state - wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]), - timeout_sec=90, - backoff_sec=5) - - # finally restart the node and wait until fully replicated - self.redpanda.start_node(stopped_node) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - self.verify_offsets() - def verify_offsets(self): """ Test that the ending offset for the partition as seen on each @@ -146,3 +116,77 @@ def verify_offsets(self): # appear to deal with this gracefully and we still pass. offsets.append(self.kafka_cat.list_offsets(self.topic, 0)) assert all(map(lambda o: o[1] == offsets[0][1], offsets)) + + def run_recovery(self, acks, dst_node): + """ + Runs the workload and recovery such that 'dst_node' is the receiver of + the recovery snapshot. + """ + # Stop this unfortunate node. + self.redpanda.stop_node(dst_node) + living_nodes = [n for n in self.redpanda.nodes if n != dst_node] + + # Produce until the topic reclaims some segments. + deleted = max(self.get_segments_deleted(living_nodes)) + wait_until( + lambda: self.produce_until_reclaim(living_nodes, deleted, acks), + timeout_sec=90, + backoff_sec=1) + + # We should now observe the under-replicated state. + wait_until(lambda: not self.fully_replicated(living_nodes), + timeout_sec=90, + backoff_sec=1) + + # Continue the node and wait until it's fully replicated. + self.redpanda.start_node(dst_node) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=1) + self.verify_offsets() + + +class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase): + @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) + @matrix(acks=[-1, 1], start_empty=[True, False]) + def test_prefix_truncate_recovery(self, acks, start_empty): + # cover boundary conditions of partition being empty/non-empty + if not start_empty: + self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=5) + self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0]) + + +class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase): + """ + Exercises the work required to install a snapshot works through the stages + of an upgrade. + """ + def __init__(self, test_context): + super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context) + self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION + + def setUp(self): + self.redpanda._installer.install(self.redpanda.nodes, + self.initial_version) + super(PrefixTruncateRecoveryUpgradeTest, self).setUp() + + @cluster(num_nodes=3, + log_allow_list=LOG_ALLOW_LIST + + MixedVersionWorkloadRunner.ALLOWED_LOGS) + def test_recover_during_upgrade(self): + def _run_recovery(src_node, dst_node): + # Force leadership to 'src_node' so we can deterministically check + # mixed-version traffic. + self.redpanda._admin.transfer_leadership_to( + namespace="kafka", + topic=self.topic, + partition=0, + target_id=self.redpanda.idx(src_node)) + # Speed the test up a bit by using acks=1. + self.run_recovery(acks=1, dst_node=dst_node) + + MixedVersionWorkloadRunner.upgrade_with_workload( + self.redpanda, self.initial_version, _run_recovery) diff --git a/tests/rptest/tests/upgrade_with_workload.py b/tests/rptest/tests/upgrade_with_workload.py new file mode 100644 index 000000000000..4b28d4c314d5 --- /dev/null +++ b/tests/rptest/tests/upgrade_with_workload.py @@ -0,0 +1,65 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.redpanda import RedpandaService, RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda_installer import RedpandaInstaller + + +class MixedVersionWorkloadRunner(): + ALLOWED_LOGS = RESTART_LOG_ALLOW_LIST + + # For testing RPC compatibility, pick a version that doesn't have serde + # enabled. + PRE_SERDE_VERSION = (22, 1, 4) + + @staticmethod + def upgrade_with_workload(redpanda: RedpandaService, initial_version, + workload_fn): + """ + Runs through an upgrade while running the given workload during the + intermediate mixed-cluster stages. + + It's expected that before starting, the version from which Redpanda is + being upgraded has already been installed and deployed (nodes started + with new bits) on the service. + + 'workload_fn' is a function that takes two nodes and deterministically + performs work between nodes, for instance, by having RPCs go from one + node to the other. + """ + num_nodes = len(redpanda.nodes) + assert num_nodes >= 2, f"Expected at least two nodes, got {num_nodes}" + installer: RedpandaInstaller = redpanda._installer + nodes = redpanda.nodes + node0 = nodes[0] + node1 = nodes[1] + + # Upgrade one node and send RPCs in both directions. + installer.install([node0], RedpandaInstaller.HEAD) + redpanda.restart_nodes([node0]) + workload_fn(node0, node1) + workload_fn(node1, node0) + + # Continue on with the upgrade. The versions are identical at this + # point so just run the workload in one direction. + installer.install([node1], RedpandaInstaller.HEAD) + redpanda.restart_nodes([node1]) + workload_fn(node0, node1) + + # Partial roll back and make sure we can still run the workload. + installer.install([node1], initial_version) + redpanda.restart_nodes([node1]) + workload_fn(node0, node1) + workload_fn(node1, node0) + + # Complete the upgrade. The versions are identical again so just run + # through the workload in one direction. + installer.install(nodes, RedpandaInstaller.HEAD) + redpanda.restart_nodes(nodes) + workload_fn(node0, node1)