Skip to content

Commit

Permalink
tests: use rpk producer in SI recovery tests
Browse files Browse the repository at this point in the history
use rpk producer for all TopicRecoveryTest instances. Additionally
some cleanups according to python linter.
  • Loading branch information
abhijat committed May 28, 2022
1 parent 9d764b2 commit 4329fa5
Showing 1 changed file with 71 additions and 40 deletions.
111 changes: 71 additions & 40 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import os
import time
from collections import namedtuple, defaultdict
from typing import Optional
from typing import Optional, Callable

import xxhash
from ducktape.mark import ok_to_fail
Expand Down Expand Up @@ -101,7 +101,7 @@ def _verify_file_layout(baseline_per_host,
restored_per_host,
expected_topics,
logger,
size_overrides={}):
size_overrides=None):
"""This function checks the restored segments over the expected ones.
It takes into account the fact that the md5 checksum as well as the
file name of the restored segment might be different from the original
Expand All @@ -111,6 +111,10 @@ def _verify_file_layout(baseline_per_host,
The assertion is triggered only if the difference can't be explained by the
upload lag and removal of configuration/archival-metadata batches.
"""

if size_overrides is None:
size_overrides = {}

def get_ntp_sizes(fdata_per_host, hosts_can_vary=True):
"""Pre-process file layout data from the cluster. Input is a dictionary
that maps host to dict of ntps where each ntp is mapped to the list of
Expand Down Expand Up @@ -207,14 +211,17 @@ class BaseCase:
topics = None

def __init__(self, s3_client: S3Client, kafka_tools: KafkaCliTools,
rpk_client: RpkTool, s3_bucket, logger):
rpk_client: RpkTool, s3_bucket, logger,
rpk_producer_maker: Callable):
self._kafka_tools = kafka_tools
self._s3 = s3_client
self._rpk = rpk_client
self._bucket = s3_bucket
self.logger = logger
# list of topics that have to be recovered, subclasses can override
self.expected_recovered_topics = self.topics
# Allows an RPK producer to be built with predefined test context and redpanda instance
self._rpk_producer_maker = rpk_producer_maker

def create_initial_topics(self):
"""Create initial set of topics based on class/instance topics variable."""
Expand Down Expand Up @@ -307,7 +314,7 @@ def _validate_partition_last_offset(self):
# (archival metadata records for each segment). Unfortunately the number of archival metadata
# records is non-deterministic as they can end up in the last open segment which is not
# uploaded. After bringing everything together we have the following bounds:
assert hw <= last_offset and hw >= last_offset - num_segments, \
assert last_offset >= hw >= last_offset - num_segments, \
f"High watermark has unexpected value {hw}, last offset: {last_offset}"

def _produce_and_verify(self, topic_spec):
Expand Down Expand Up @@ -343,11 +350,14 @@ def _get_all_topic_manifests(self):
self.logger.info(f"Topic manifest found at {key}, content:\n{j}")
yield (key, m)

def _restore_topic(self, manifest, overrides={}):
def _restore_topic(self, manifest, overrides=None):
"""Restore individual topic. Parameter 'path' is a path to topic
manifest, 'manifest' is a dictionary with manifest data (it's used
to generate topic configuration), 'overrides' contains values that
should override values from manifest or add new fields."""

if overrides is None:
overrides = {}
self.logger.info(f"Restore topic called. Topic-manifest: {manifest}")
topic = manifest['topic']
npart = manifest['partition_count']
Expand Down Expand Up @@ -390,9 +400,10 @@ class NoDataCase(BaseCase):
partition_count=1,
replication_factor=3), )

def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger):
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
rpk_producer_maker):
super(NoDataCase, self).__init__(s3_client, kafka_tools, rpk_client,
s3_bucket, logger)
s3_bucket, logger, rpk_producer_maker)

def validate_node(self, host, baseline, restored):
"""Validate restored node data using two sets of checksums.
Expand Down Expand Up @@ -435,10 +446,11 @@ class MissingTopicManifest(BaseCase):

topics = []

def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger):
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
rpk_producer_maker):
super(MissingTopicManifest,
self).__init__(s3_client, kafka_tools, rpk_client, s3_bucket,
logger)
logger, rpk_producer_maker)

def create_initial_topics(self):
"""Simulate missing topic manifest by not creating any topics"""
Expand Down Expand Up @@ -484,9 +496,9 @@ def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
rpk_producer_maker):
self._part1_offset = 0
self._part1_num_segments = 0
self._rpk_producer_maker = rpk_producer_maker
super(MissingPartition, self).__init__(s3_client, kafka_tools,
rpk_client, s3_bucket, logger)
super(MissingPartition,
self).__init__(s3_client, kafka_tools, rpk_client, s3_bucket,
logger, rpk_producer_maker)

def generate_baseline(self):
"""Produce enough data to trigger uploads to S3/minio"""
Expand Down Expand Up @@ -553,8 +565,7 @@ def validate_cluster(self, baseline, restored):
# uploaded. After bringing everything together we have the following bounds:
min_expected_hwm = self._part1_offset - self._part1_num_segments
max_expected_hwm = self._part1_offset
assert partition.high_watermark >= min_expected_hwm \
and partition.high_watermark <= max_expected_hwm, \
assert min_expected_hwm <= partition.high_watermark <= max_expected_hwm, \
f"Unexpected high watermark {partition.high_watermark} " \
f"(min expected: {min_expected_hwm}, max expected: {max_expected_hwm})"
else:
Expand All @@ -577,17 +588,24 @@ class MissingSegment(BaseCase):
partition_count=2,
replication_factor=3), )

def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger):
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
rpk_producer_maker):
self._part1_offset = 0
self._smaller_ntp = None
self._deleted_segment_size = None
super(MissingSegment, self).__init__(s3_client, kafka_tools,
rpk_client, s3_bucket, logger)
super(MissingSegment,
self).__init__(s3_client, kafka_tools, rpk_client, s3_bucket,
logger, rpk_producer_maker)

def generate_baseline(self):
"""Produce enough data to trigger uploads to S3/minio"""
for topic in self.topics:
self._kafka_tools.produce(topic.name, 10000, 1024)
producer = self._rpk_producer_maker(topic=topic.name,
msg_count=10000,
msg_size=1024)
producer.start()
producer.wait()
producer.free()

def _delete(self, key):
self._deleted_segment_size = self._s3.get_object_meta(
Expand Down Expand Up @@ -650,15 +668,20 @@ class FastCheck(BaseCase):
"""This test case covers normal recovery process. It creates configured
set of topics and runs recovery and validations."""
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
topics):
rpk_producer_maker, topics):
self.topics = topics
super(FastCheck, self).__init__(s3_client, kafka_tools, rpk_client,
s3_bucket, logger)
s3_bucket, logger, rpk_producer_maker)

def generate_baseline(self):
"""Produce enough data to trigger uploads to S3/minio"""
for topic in self.topics:
self._kafka_tools.produce(topic.name, 10000, 1024)
producer = self._rpk_producer_maker(topic=topic.name,
msg_count=10000,
msg_size=1024)
producer.start()
producer.wait()
producer.free()

def validate_cluster(self, baseline, restored):
"""Check that the topic is writeable"""
Expand Down Expand Up @@ -721,12 +744,13 @@ class SizeBasedRetention(BaseCase):
The verification takes into account individual segment size. The recovery process
should restore at least 10MB but not more than 10MB + segment size."""
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
topics):
rpk_producer_maker, topics):
self.topics = topics
self.max_size_bytes = 1024 * 1024 * 20
self.restored_size_bytes = 1024 * 1024 * 10
super(SizeBasedRetention, self).__init__(s3_client, kafka_tools,
rpk_client, s3_bucket, logger)
super(SizeBasedRetention,
self).__init__(s3_client, kafka_tools, rpk_client, s3_bucket,
logger, rpk_producer_maker)

def generate_baseline(self):
"""Produce enough data to trigger uploads to S3/minio.
Expand Down Expand Up @@ -811,13 +835,14 @@ class TimeBasedRetention(BaseCase):
a legacy manifest that doesn't have 'max_timestamp' fields.
"""
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
topics, remove_timestamps):
rpk_producer_maker, topics, remove_timestamps):
self.topics = topics
self.max_size_bytes = 1024 * 1024 * 20
self.restored_size_bytes = 1024 * 1024 * 10
self.remove_timestamps = remove_timestamps
super(TimeBasedRetention, self).__init__(s3_client, kafka_tools,
rpk_client, s3_bucket, logger)
super(TimeBasedRetention,
self).__init__(s3_client, kafka_tools, rpk_client, s3_bucket,
logger, rpk_producer_maker)

def generate_baseline(self):
"""Produce enough data to trigger uploads to S3/minio"""
Expand Down Expand Up @@ -1216,7 +1241,8 @@ def test_no_data(self):
in old cluster the empty topic should be created. We should be able
to produce to the topic."""
test_case = NoDataCase(self.s3_client, self.kafka_tools, self.rpk,
self.s3_bucket, self.logger)
self.s3_bucket, self.logger,
self.rpk_producer_maker)
self.do_run(test_case)

@cluster(num_nodes=3,
Expand All @@ -1226,7 +1252,8 @@ def test_missing_topic_manifest(self):
in old cluster the empty topic should be created. We should be able
to produce to the topic."""
test_case = MissingTopicManifest(self.s3_client, self.kafka_tools,
self.rpk, self.s3_bucket, self.logger)
self.rpk, self.s3_bucket, self.logger,
self.rpk_producer_maker)
self.do_run(test_case)

@cluster(num_nodes=4,
Expand All @@ -1244,17 +1271,18 @@ def test_missing_partition(self):
self.do_run(test_case)

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/4849
@cluster(num_nodes=3,
@cluster(num_nodes=4,
log_allow_list=MISSING_DATA_ERRORS + TRANSIENT_ERRORS)
def test_missing_segment(self):
"""Test the handling of the missing segment. The segment is
missing if it's present in the manifest but deleted from the
bucket."""
test_case = MissingSegment(self.s3_client, self.kafka_tools, self.rpk,
self.s3_bucket, self.logger)
self.s3_bucket, self.logger,
self.rpk_producer_maker)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
@cluster(num_nodes=4, log_allow_list=TRANSIENT_ERRORS)
def test_fast1(self):
"""Basic recovery test. This test stresses successful recovery
of the topic with different set of data."""
Expand All @@ -1264,10 +1292,11 @@ def test_fast1(self):
replication_factor=3)
]
test_case = FastCheck(self.s3_client, self.kafka_tools, self.rpk,
self.s3_bucket, self.logger, topics)
self.s3_bucket, self.logger,
self.rpk_producer_maker, topics)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
@cluster(num_nodes=4, log_allow_list=TRANSIENT_ERRORS)
def test_fast2(self):
"""Basic recovery test. This test stresses successful recovery
of the topic with different set of data."""
Expand All @@ -1280,10 +1309,11 @@ def test_fast2(self):
replication_factor=3),
]
test_case = FastCheck(self.s3_client, self.kafka_tools, self.rpk,
self.s3_bucket, self.logger, topics)
self.s3_bucket, self.logger,
self.rpk_producer_maker, topics)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
@cluster(num_nodes=4, log_allow_list=TRANSIENT_ERRORS)
def test_fast3(self):
"""Basic recovery test. This test stresses successful recovery
of the topic with different set of data."""
Expand All @@ -1299,7 +1329,8 @@ def test_fast3(self):
replication_factor=3),
]
test_case = FastCheck(self.s3_client, self.kafka_tools, self.rpk,
self.s3_bucket, self.logger, topics)
self.s3_bucket, self.logger,
self.rpk_producer_maker, topics)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
Expand All @@ -1314,7 +1345,7 @@ def test_size_based_retention(self):
]
test_case = SizeBasedRetention(self.s3_client, self.kafka_tools,
self.rpk, self.s3_bucket, self.logger,
topics)
self.rpk_producer_maker, topics)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
Expand All @@ -1330,7 +1361,7 @@ def test_time_based_retention(self):
]
test_case = TimeBasedRetention(self.s3_client, self.kafka_tools,
self.rpk, self.s3_bucket, self.logger,
topics, False)
self.rpk_producer_maker, topics, False)
self.do_run(test_case)

@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
Expand All @@ -1346,5 +1377,5 @@ def test_time_based_retention_with_legacy_manifest(self):
]
test_case = TimeBasedRetention(self.s3_client, self.kafka_tools,
self.rpk, self.s3_bucket, self.logger,
topics, True)
self.rpk_producer_maker, topics, True)
self.do_run(test_case)

0 comments on commit 4329fa5

Please sign in to comment.