Skip to content

Commit

Permalink
Merge pull request #5488 from andrwng/mixed_snapshot
Browse files Browse the repository at this point in the history
tests: add mixed-version test for sending Raft snapshot RPCs
  • Loading branch information
dotnwat committed Jul 21, 2022
2 parents 333dc52 + 41bbb93 commit 282ac4b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 38 deletions.
120 changes: 82 additions & 38 deletions tests/rptest/tests/prefix_truncate_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.admin import Admin
Expand All @@ -24,7 +25,7 @@
]


class PrefixTruncateRecoveryTest(RedpandaTest):
class PrefixTruncateRecoveryTestBase(RedpandaTest):
"""
The purpose of this test is to exercise recovery of partitions which have
had data reclaimed based on retention policy. The testing strategy is:
Expand All @@ -48,14 +49,17 @@ def __init__(self, test_context):
enable_leader_balancer=False,
)

super(PrefixTruncateRecoveryTest,
super(PrefixTruncateRecoveryTestBase,
self).__init__(test_context=test_context,
num_brokers=3,
extra_rp_conf=extra_rp_conf)

self.kafka_tools = KafkaCliTools(self.redpanda)
self.kafka_cat = KafkaCat(self.redpanda)

def setUp(self):
super(PrefixTruncateRecoveryTestBase, self).setUp()

def fully_replicated(self, nodes):
"""
Test that for each specified node that there are no reported under
Expand All @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes):
assert len(metric.samples) == len(nodes)
return [s.value for s in metric.samples]

def produce_until_reclaim(self, initial_deleted, acks):
def produce_until_reclaim(self, living_nodes, initial_deleted, acks):
"""
Produce data until we observe that segments have been deleted. The
initial_deleted parameter is the max number of segments deleted across
nodes, and we wait for all partitions to report at least initial + 3
deletions so that all nodes have experienced some deletion.
"""
deleted = self.get_segments_deleted(self.redpanda.nodes[1:])
deleted = self.get_segments_deleted(living_nodes)
if all(map(lambda d: d >= initial_deleted + 2, deleted)):
return True
self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks)
return False

@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

# stop this unfortunate node
stopped_node = self.redpanda.nodes[0]
self.redpanda.stop_node(stopped_node)

# produce data into the topic until segments are reclaimed
# by the configured retention policy
deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:]))
wait_until(lambda: self.produce_until_reclaim(deleted, acks),
timeout_sec=90,
backoff_sec=5)

# we should now observe an under replicated state
wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]),
timeout_sec=90,
backoff_sec=5)

# finally restart the node and wait until fully replicated
self.redpanda.start_node(stopped_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

self.verify_offsets()

def verify_offsets(self):
"""
Test that the ending offset for the partition as seen on each
Expand All @@ -146,3 +116,77 @@ def verify_offsets(self):
# appear to deal with this gracefully and we still pass.
offsets.append(self.kafka_cat.list_offsets(self.topic, 0))
assert all(map(lambda o: o[1] == offsets[0][1], offsets))

def run_recovery(self, acks, dst_node):
"""
Runs the workload and recovery such that 'dst_node' is the receiver of
the recovery snapshot.
"""
# Stop this unfortunate node.
self.redpanda.stop_node(dst_node)
living_nodes = [n for n in self.redpanda.nodes if n != dst_node]

# Produce until the topic reclaims some segments.
deleted = max(self.get_segments_deleted(living_nodes))
wait_until(
lambda: self.produce_until_reclaim(living_nodes, deleted, acks),
timeout_sec=90,
backoff_sec=1)

# We should now observe the under-replicated state.
wait_until(lambda: not self.fully_replicated(living_nodes),
timeout_sec=90,
backoff_sec=1)

# Continue the node and wait until it's fully replicated.
self.redpanda.start_node(dst_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=1)
self.verify_offsets()


class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase):
@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)
self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0])


class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase):
"""
Exercises the work required to install a snapshot works through the stages
of an upgrade.
"""
def __init__(self, test_context):
super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context)
self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION

def setUp(self):
self.redpanda._installer.install(self.redpanda.nodes,
self.initial_version)
super(PrefixTruncateRecoveryUpgradeTest, self).setUp()

@cluster(num_nodes=3,
log_allow_list=LOG_ALLOW_LIST +
MixedVersionWorkloadRunner.ALLOWED_LOGS)
def test_recover_during_upgrade(self):
def _run_recovery(src_node, dst_node):
# Force leadership to 'src_node' so we can deterministically check
# mixed-version traffic.
self.redpanda._admin.transfer_leadership_to(
namespace="kafka",
topic=self.topic,
partition=0,
target_id=self.redpanda.idx(src_node))
# Speed the test up a bit by using acks=1.
self.run_recovery(acks=1, dst_node=dst_node)

MixedVersionWorkloadRunner.upgrade_with_workload(
self.redpanda, self.initial_version, _run_recovery)
65 changes: 65 additions & 0 deletions tests/rptest/tests/upgrade_with_workload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.redpanda import RedpandaService, RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda_installer import RedpandaInstaller


class MixedVersionWorkloadRunner():
ALLOWED_LOGS = RESTART_LOG_ALLOW_LIST

# For testing RPC compatibility, pick a version that doesn't have serde
# enabled.
PRE_SERDE_VERSION = (22, 1, 4)

@staticmethod
def upgrade_with_workload(redpanda: RedpandaService, initial_version,
workload_fn):
"""
Runs through an upgrade while running the given workload during the
intermediate mixed-cluster stages.
It's expected that before starting, the version from which Redpanda is
being upgraded has already been installed and deployed (nodes started
with new bits) on the service.
'workload_fn' is a function that takes two nodes and deterministically
performs work between nodes, for instance, by having RPCs go from one
node to the other.
"""
num_nodes = len(redpanda.nodes)
assert num_nodes >= 2, f"Expected at least two nodes, got {num_nodes}"
installer: RedpandaInstaller = redpanda._installer
nodes = redpanda.nodes
node0 = nodes[0]
node1 = nodes[1]

# Upgrade one node and send RPCs in both directions.
installer.install([node0], RedpandaInstaller.HEAD)
redpanda.restart_nodes([node0])
workload_fn(node0, node1)
workload_fn(node1, node0)

# Continue on with the upgrade. The versions are identical at this
# point so just run the workload in one direction.
installer.install([node1], RedpandaInstaller.HEAD)
redpanda.restart_nodes([node1])
workload_fn(node0, node1)

# Partial roll back and make sure we can still run the workload.
installer.install([node1], initial_version)
redpanda.restart_nodes([node1])
workload_fn(node0, node1)
workload_fn(node1, node0)

# Complete the upgrade. The versions are identical again so just run
# through the workload in one direction.
installer.install(nodes, RedpandaInstaller.HEAD)
redpanda.restart_nodes(nodes)
workload_fn(node0, node1)

0 comments on commit 282ac4b

Please sign in to comment.