From c34af359147f476da09b32127218900ac7288fcd Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 14 Jul 2022 15:55:43 -0700 Subject: [PATCH 1/2] tests: add harness for multi-version testing This commit adds a test harness that runs workloads on a partially upgraded cluster. The goal here is to ensure we exercise RPC serialization methods when the versions between the sender and receiver on mismatched. To that end, the harness expects to be provided a workload that spurs RPCs in a deterministic direction: - We first perform a partial upgrade, - then run the workload between an upgraded node and non-upgraded node, - then run the workload in the opposite direction, - then do a partial rollback and repeat, - then proceed with a full upgrade and repeat. --- tests/rptest/tests/upgrade_with_workload.py | 65 +++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 tests/rptest/tests/upgrade_with_workload.py diff --git a/tests/rptest/tests/upgrade_with_workload.py b/tests/rptest/tests/upgrade_with_workload.py new file mode 100644 index 000000000000..4b28d4c314d5 --- /dev/null +++ b/tests/rptest/tests/upgrade_with_workload.py @@ -0,0 +1,65 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.redpanda import RedpandaService, RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda_installer import RedpandaInstaller + + +class MixedVersionWorkloadRunner(): + ALLOWED_LOGS = RESTART_LOG_ALLOW_LIST + + # For testing RPC compatibility, pick a version that doesn't have serde + # enabled. + PRE_SERDE_VERSION = (22, 1, 4) + + @staticmethod + def upgrade_with_workload(redpanda: RedpandaService, initial_version, + workload_fn): + """ + Runs through an upgrade while running the given workload during the + intermediate mixed-cluster stages. + + It's expected that before starting, the version from which Redpanda is + being upgraded has already been installed and deployed (nodes started + with new bits) on the service. + + 'workload_fn' is a function that takes two nodes and deterministically + performs work between nodes, for instance, by having RPCs go from one + node to the other. + """ + num_nodes = len(redpanda.nodes) + assert num_nodes >= 2, f"Expected at least two nodes, got {num_nodes}" + installer: RedpandaInstaller = redpanda._installer + nodes = redpanda.nodes + node0 = nodes[0] + node1 = nodes[1] + + # Upgrade one node and send RPCs in both directions. + installer.install([node0], RedpandaInstaller.HEAD) + redpanda.restart_nodes([node0]) + workload_fn(node0, node1) + workload_fn(node1, node0) + + # Continue on with the upgrade. The versions are identical at this + # point so just run the workload in one direction. + installer.install([node1], RedpandaInstaller.HEAD) + redpanda.restart_nodes([node1]) + workload_fn(node0, node1) + + # Partial roll back and make sure we can still run the workload. + installer.install([node1], initial_version) + redpanda.restart_nodes([node1]) + workload_fn(node0, node1) + workload_fn(node1, node0) + + # Complete the upgrade. The versions are identical again so just run + # through the workload in one direction. + installer.install(nodes, RedpandaInstaller.HEAD) + redpanda.restart_nodes(nodes) + workload_fn(node0, node1) From 41bbb93a818c36cd5fa5c272b1f984509fcc0a54 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 14 Jul 2022 18:03:41 -0700 Subject: [PATCH 2/2] tests: mixed-version test for installing snapshots This adds a test that installs snapshots across mixed versions, testing compatibility of the respective RPCs when there is a version mismatch. I reused an existing test, pulling out the guts into a test base class, though I tweaked some retry intervals to speed the test up. --- .../tests/prefix_truncate_recovery_test.py | 120 ++++++++++++------ 1 file changed, 82 insertions(+), 38 deletions(-) diff --git a/tests/rptest/tests/prefix_truncate_recovery_test.py b/tests/rptest/tests/prefix_truncate_recovery_test.py index 7d9e559a6835..553988dcb56a 100644 --- a/tests/rptest/tests/prefix_truncate_recovery_test.py +++ b/tests/rptest/tests/prefix_truncate_recovery_test.py @@ -13,6 +13,7 @@ from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.clients.kafka_cat import KafkaCat from rptest.services.admin import Admin @@ -24,7 +25,7 @@ ] -class PrefixTruncateRecoveryTest(RedpandaTest): +class PrefixTruncateRecoveryTestBase(RedpandaTest): """ The purpose of this test is to exercise recovery of partitions which have had data reclaimed based on retention policy. The testing strategy is: @@ -48,7 +49,7 @@ def __init__(self, test_context): enable_leader_balancer=False, ) - super(PrefixTruncateRecoveryTest, + super(PrefixTruncateRecoveryTestBase, self).__init__(test_context=test_context, num_brokers=3, extra_rp_conf=extra_rp_conf) @@ -56,6 +57,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) self.kafka_cat = KafkaCat(self.redpanda) + def setUp(self): + super(PrefixTruncateRecoveryTestBase, self).setUp() + def fully_replicated(self, nodes): """ Test that for each specified node that there are no reported under @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes): assert len(metric.samples) == len(nodes) return [s.value for s in metric.samples] - def produce_until_reclaim(self, initial_deleted, acks): + def produce_until_reclaim(self, living_nodes, initial_deleted, acks): """ Produce data until we observe that segments have been deleted. The initial_deleted parameter is the max number of segments deleted across nodes, and we wait for all partitions to report at least initial + 3 deletions so that all nodes have experienced some deletion. """ - deleted = self.get_segments_deleted(self.redpanda.nodes[1:]) + deleted = self.get_segments_deleted(living_nodes) if all(map(lambda d: d >= initial_deleted + 2, deleted)): return True self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks) return False - @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) - @matrix(acks=[-1, 1], start_empty=[True, False]) - def test_prefix_truncate_recovery(self, acks, start_empty): - # cover boundary conditions of partition being empty/non-empty - if not start_empty: - self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - # stop this unfortunate node - stopped_node = self.redpanda.nodes[0] - self.redpanda.stop_node(stopped_node) - - # produce data into the topic until segments are reclaimed - # by the configured retention policy - deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:])) - wait_until(lambda: self.produce_until_reclaim(deleted, acks), - timeout_sec=90, - backoff_sec=5) - - # we should now observe an under replicated state - wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]), - timeout_sec=90, - backoff_sec=5) - - # finally restart the node and wait until fully replicated - self.redpanda.start_node(stopped_node) - wait_until(lambda: self.fully_replicated(self.redpanda.nodes), - timeout_sec=90, - backoff_sec=5) - - self.verify_offsets() - def verify_offsets(self): """ Test that the ending offset for the partition as seen on each @@ -146,3 +116,77 @@ def verify_offsets(self): # appear to deal with this gracefully and we still pass. offsets.append(self.kafka_cat.list_offsets(self.topic, 0)) assert all(map(lambda o: o[1] == offsets[0][1], offsets)) + + def run_recovery(self, acks, dst_node): + """ + Runs the workload and recovery such that 'dst_node' is the receiver of + the recovery snapshot. + """ + # Stop this unfortunate node. + self.redpanda.stop_node(dst_node) + living_nodes = [n for n in self.redpanda.nodes if n != dst_node] + + # Produce until the topic reclaims some segments. + deleted = max(self.get_segments_deleted(living_nodes)) + wait_until( + lambda: self.produce_until_reclaim(living_nodes, deleted, acks), + timeout_sec=90, + backoff_sec=1) + + # We should now observe the under-replicated state. + wait_until(lambda: not self.fully_replicated(living_nodes), + timeout_sec=90, + backoff_sec=1) + + # Continue the node and wait until it's fully replicated. + self.redpanda.start_node(dst_node) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=1) + self.verify_offsets() + + +class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase): + @cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST) + @matrix(acks=[-1, 1], start_empty=[True, False]) + def test_prefix_truncate_recovery(self, acks, start_empty): + # cover boundary conditions of partition being empty/non-empty + if not start_empty: + self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks) + wait_until(lambda: self.fully_replicated(self.redpanda.nodes), + timeout_sec=90, + backoff_sec=5) + self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0]) + + +class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase): + """ + Exercises the work required to install a snapshot works through the stages + of an upgrade. + """ + def __init__(self, test_context): + super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context) + self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION + + def setUp(self): + self.redpanda._installer.install(self.redpanda.nodes, + self.initial_version) + super(PrefixTruncateRecoveryUpgradeTest, self).setUp() + + @cluster(num_nodes=3, + log_allow_list=LOG_ALLOW_LIST + + MixedVersionWorkloadRunner.ALLOWED_LOGS) + def test_recover_during_upgrade(self): + def _run_recovery(src_node, dst_node): + # Force leadership to 'src_node' so we can deterministically check + # mixed-version traffic. + self.redpanda._admin.transfer_leadership_to( + namespace="kafka", + topic=self.topic, + partition=0, + target_id=self.redpanda.idx(src_node)) + # Speed the test up a bit by using acks=1. + self.run_recovery(acks=1, dst_node=dst_node) + + MixedVersionWorkloadRunner.upgrade_with_workload( + self.redpanda, self.initial_version, _run_recovery)