Skip to content

Commit

Permalink
tests: mixed-version test for installing snapshots
Browse files Browse the repository at this point in the history
This adds a test that installs snapshots across mixed versions, testing
compatibility of the respective RPCs when there is a version mismatch.

I reused an existing test, pulling out the guts into a test base class,
though I tweaked some retry intervals to speed the test up.
  • Loading branch information
andrwng committed Jul 15, 2022
1 parent 22e6436 commit 2a2115e
Showing 1 changed file with 82 additions and 38 deletions.
120 changes: 82 additions & 38 deletions tests/rptest/tests/prefix_truncate_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.admin import Admin
Expand All @@ -24,7 +25,7 @@
]


class PrefixTruncateRecoveryTest(RedpandaTest):
class PrefixTruncateRecoveryTestBase(RedpandaTest):
"""
The purpose of this test is to exercise recovery of partitions which have
had data reclaimed based on retention policy. The testing strategy is:
Expand All @@ -48,14 +49,17 @@ def __init__(self, test_context):
enable_leader_balancer=False,
)

super(PrefixTruncateRecoveryTest,
super(PrefixTruncateRecoveryTestBase,
self).__init__(test_context=test_context,
num_brokers=3,
extra_rp_conf=extra_rp_conf)

self.kafka_tools = KafkaCliTools(self.redpanda)
self.kafka_cat = KafkaCat(self.redpanda)

def setUp(self):
super(PrefixTruncateRecoveryTestBase, self).setUp()

def fully_replicated(self, nodes):
"""
Test that for each specified node that there are no reported under
Expand All @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes):
assert len(metric.samples) == len(nodes)
return [s.value for s in metric.samples]

def produce_until_reclaim(self, initial_deleted, acks):
def produce_until_reclaim(self, living_nodes, initial_deleted, acks):
"""
Produce data until we observe that segments have been deleted. The
initial_deleted parameter is the max number of segments deleted across
nodes, and we wait for all partitions to report at least initial + 3
deletions so that all nodes have experienced some deletion.
"""
deleted = self.get_segments_deleted(self.redpanda.nodes[1:])
deleted = self.get_segments_deleted(living_nodes)
if all(map(lambda d: d >= initial_deleted + 2, deleted)):
return True
self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks)
return False

@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

# stop this unfortunate node
stopped_node = self.redpanda.nodes[0]
self.redpanda.stop_node(stopped_node)

# produce data into the topic until segments are reclaimed
# by the configured retention policy
deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:]))
wait_until(lambda: self.produce_until_reclaim(deleted, acks),
timeout_sec=90,
backoff_sec=5)

# we should now observe an under replicated state
wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]),
timeout_sec=90,
backoff_sec=5)

# finally restart the node and wait until fully replicated
self.redpanda.start_node(stopped_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

self.verify_offsets()

def verify_offsets(self):
"""
Test that the ending offset for the partition as seen on each
Expand All @@ -146,3 +116,77 @@ def verify_offsets(self):
# appear to deal with this gracefully and we still pass.
offsets.append(self.kafka_cat.list_offsets(self.topic, 0))
assert all(map(lambda o: o[1] == offsets[0][1], offsets))

def run_recovery(self, acks, dst_node):
"""
Runs the workload and recovery such that 'dst_node' is the receiver of
the recovery snapshot.
"""
# Stop this unfortunate node.
self.redpanda.stop_node(dst_node)
living_nodes = [n for n in self.redpanda.nodes if n != dst_node]

# Produce until the topic reclaims some segments.
deleted = max(self.get_segments_deleted(living_nodes))
wait_until(
lambda: self.produce_until_reclaim(living_nodes, deleted, acks),
timeout_sec=90,
backoff_sec=1)

# We should now observe the under-replicated state.
wait_until(lambda: not self.fully_replicated(living_nodes),
timeout_sec=90,
backoff_sec=1)

# Continue the node and wait until it's fully replicated.
self.redpanda.start_node(dst_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=1)
self.verify_offsets()


class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase):
@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)
self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0])


class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase):
"""
Exercises the work required to install a snapshot works through the stages
of an upgrade.
"""
def __init__(self, test_context):
super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context)
self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION

def setUp(self):
self.redpanda._installer.install(self.redpanda.nodes,
self.initial_version)
super(PrefixTruncateRecoveryUpgradeTest, self).setUp()

@cluster(num_nodes=3,
log_allow_list=LOG_ALLOW_LIST +
MixedVersionWorkloadRunner.ALLOWED_LOGS)
def test_recover_during_upgrade(self):
def _run_recovery(src_node, dst_node):
# Force leadership to 'src_node' so we can deterministically check
# mixed-version traffic.
self.redpanda._admin.transfer_leadership_to(
namespace="kafka",
topic=self.topic,
partition=0,
target_id=self.redpanda.idx(src_node))
# Speed the test up a bit by using acks=1.
self.run_recovery(acks=1, dst_node=dst_node)

MixedVersionWorkloadRunner.upgrade_with_workload(
self.redpanda, self.initial_version, _run_recovery)

0 comments on commit 2a2115e

Please sign in to comment.