Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add mixed-version test for sending Raft snapshot RPCs #5488

Merged
merged 2 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 82 additions & 38 deletions tests/rptest/tests/prefix_truncate_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.upgrade_with_workload import MixedVersionWorkloadRunner
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.admin import Admin
Expand All @@ -24,7 +25,7 @@
]


class PrefixTruncateRecoveryTest(RedpandaTest):
class PrefixTruncateRecoveryTestBase(RedpandaTest):
"""
The purpose of this test is to exercise recovery of partitions which have
had data reclaimed based on retention policy. The testing strategy is:
Expand All @@ -48,14 +49,17 @@ def __init__(self, test_context):
enable_leader_balancer=False,
)

super(PrefixTruncateRecoveryTest,
super(PrefixTruncateRecoveryTestBase,
self).__init__(test_context=test_context,
num_brokers=3,
extra_rp_conf=extra_rp_conf)

self.kafka_tools = KafkaCliTools(self.redpanda)
self.kafka_cat = KafkaCat(self.redpanda)

def setUp(self):
super(PrefixTruncateRecoveryTestBase, self).setUp()

def fully_replicated(self, nodes):
"""
Test that for each specified node that there are no reported under
Expand All @@ -76,53 +80,19 @@ def get_segments_deleted(self, nodes):
assert len(metric.samples) == len(nodes)
return [s.value for s in metric.samples]

def produce_until_reclaim(self, initial_deleted, acks):
def produce_until_reclaim(self, living_nodes, initial_deleted, acks):
"""
Produce data until we observe that segments have been deleted. The
initial_deleted parameter is the max number of segments deleted across
nodes, and we wait for all partitions to report at least initial + 3
deletions so that all nodes have experienced some deletion.
"""
deleted = self.get_segments_deleted(self.redpanda.nodes[1:])
deleted = self.get_segments_deleted(living_nodes)
if all(map(lambda d: d >= initial_deleted + 2, deleted)):
return True
self.kafka_tools.produce(self.topic, 1024, 1024, acks=acks)
return False

@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

# stop this unfortunate node
stopped_node = self.redpanda.nodes[0]
self.redpanda.stop_node(stopped_node)

# produce data into the topic until segments are reclaimed
# by the configured retention policy
deleted = max(self.get_segments_deleted(self.redpanda.nodes[1:]))
wait_until(lambda: self.produce_until_reclaim(deleted, acks),
timeout_sec=90,
backoff_sec=5)

# we should now observe an under replicated state
wait_until(lambda: not self.fully_replicated(self.redpanda.nodes[1:]),
timeout_sec=90,
backoff_sec=5)

# finally restart the node and wait until fully replicated
self.redpanda.start_node(stopped_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)

self.verify_offsets()

def verify_offsets(self):
"""
Test that the ending offset for the partition as seen on each
Expand All @@ -146,3 +116,77 @@ def verify_offsets(self):
# appear to deal with this gracefully and we still pass.
offsets.append(self.kafka_cat.list_offsets(self.topic, 0))
assert all(map(lambda o: o[1] == offsets[0][1], offsets))

def run_recovery(self, acks, dst_node):
"""
Runs the workload and recovery such that 'dst_node' is the receiver of
the recovery snapshot.
"""
# Stop this unfortunate node.
self.redpanda.stop_node(dst_node)
living_nodes = [n for n in self.redpanda.nodes if n != dst_node]

# Produce until the topic reclaims some segments.
deleted = max(self.get_segments_deleted(living_nodes))
wait_until(
lambda: self.produce_until_reclaim(living_nodes, deleted, acks),
timeout_sec=90,
backoff_sec=1)

# We should now observe the under-replicated state.
wait_until(lambda: not self.fully_replicated(living_nodes),
timeout_sec=90,
backoff_sec=1)

# Continue the node and wait until it's fully replicated.
self.redpanda.start_node(dst_node)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=1)
self.verify_offsets()


class PrefixTruncateRecoveryTest(PrefixTruncateRecoveryTestBase):
@cluster(num_nodes=3, log_allow_list=LOG_ALLOW_LIST)
@matrix(acks=[-1, 1], start_empty=[True, False])
def test_prefix_truncate_recovery(self, acks, start_empty):
# cover boundary conditions of partition being empty/non-empty
if not start_empty:
self.kafka_tools.produce(self.topic, 2048, 1024, acks=acks)
wait_until(lambda: self.fully_replicated(self.redpanda.nodes),
timeout_sec=90,
backoff_sec=5)
self.run_recovery(acks=acks, dst_node=self.redpanda.nodes[0])


class PrefixTruncateRecoveryUpgradeTest(PrefixTruncateRecoveryTestBase):
"""
Exercises the work required to install a snapshot works through the stages
of an upgrade.
"""
def __init__(self, test_context):
super(PrefixTruncateRecoveryUpgradeTest, self).__init__(test_context)
self.initial_version = MixedVersionWorkloadRunner.PRE_SERDE_VERSION

def setUp(self):
self.redpanda._installer.install(self.redpanda.nodes,
self.initial_version)
super(PrefixTruncateRecoveryUpgradeTest, self).setUp()

@cluster(num_nodes=3,
log_allow_list=LOG_ALLOW_LIST +
MixedVersionWorkloadRunner.ALLOWED_LOGS)
def test_recover_during_upgrade(self):
def _run_recovery(src_node, dst_node):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

# Force leadership to 'src_node' so we can deterministically check
# mixed-version traffic.
self.redpanda._admin.transfer_leadership_to(
namespace="kafka",
topic=self.topic,
partition=0,
target_id=self.redpanda.idx(src_node))
# Speed the test up a bit by using acks=1.
self.run_recovery(acks=1, dst_node=dst_node)

MixedVersionWorkloadRunner.upgrade_with_workload(
self.redpanda, self.initial_version, _run_recovery)
65 changes: 65 additions & 0 deletions tests/rptest/tests/upgrade_with_workload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.redpanda import RedpandaService, RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda_installer import RedpandaInstaller


class MixedVersionWorkloadRunner():
ALLOWED_LOGS = RESTART_LOG_ALLOW_LIST

# For testing RPC compatibility, pick a version that doesn't have serde
# enabled.
PRE_SERDE_VERSION = (22, 1, 4)

@staticmethod
def upgrade_with_workload(redpanda: RedpandaService, initial_version,
workload_fn):
"""
Runs through an upgrade while running the given workload during the
intermediate mixed-cluster stages.
It's expected that before starting, the version from which Redpanda is
being upgraded has already been installed and deployed (nodes started
with new bits) on the service.
'workload_fn' is a function that takes two nodes and deterministically
performs work between nodes, for instance, by having RPCs go from one
node to the other.
"""
num_nodes = len(redpanda.nodes)
assert num_nodes >= 2, f"Expected at least two nodes, got {num_nodes}"
installer: RedpandaInstaller = redpanda._installer
nodes = redpanda.nodes
node0 = nodes[0]
node1 = nodes[1]
andrwng marked this conversation as resolved.
Show resolved Hide resolved

# Upgrade one node and send RPCs in both directions.
installer.install([node0], RedpandaInstaller.HEAD)
redpanda.restart_nodes([node0])
workload_fn(node0, node1)
workload_fn(node1, node0)

# Continue on with the upgrade. The versions are identical at this
# point so just run the workload in one direction.
installer.install([node1], RedpandaInstaller.HEAD)
redpanda.restart_nodes([node1])
workload_fn(node0, node1)

# Partial roll back and make sure we can still run the workload.
installer.install([node1], initial_version)
redpanda.restart_nodes([node1])
workload_fn(node0, node1)
workload_fn(node1, node0)

# Complete the upgrade. The versions are identical again so just run
# through the workload in one direction.
installer.install(nodes, RedpandaInstaller.HEAD)
redpanda.restart_nodes(nodes)
workload_fn(node0, node1)