Skip to content

Commit

Permalink
Merge pull request #5776 from Lazin/feature/topic-recovery-e2e-tx-test
Browse files Browse the repository at this point in the history
rptest: Add end to end topic recovery test with transactions
  • Loading branch information
Lazin committed Aug 2, 2022
2 parents 5d623ae + de66d19 commit 0d8ca88
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 50 deletions.
128 changes: 126 additions & 2 deletions tests/rptest/tests/e2e_topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
from ducktape.cluster.cluster_spec import ClusterSpec
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import RedpandaService, SISettings
from rptest.util import Scale
from rptest.util import Scale, segments_count
from rptest.clients.rpk import RpkTool
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.franz_go_verifiable_services import FranzGoVerifiableProducer, FranzGoVerifiableSeqConsumer
from ducktape.utils.util import wait_until
import time
import json
from itertools import zip_longest

from rptest.util import segments_count
from rptest.utils.si_utils import Producer

import confluent_kafka as ck


class EndToEndTopicRecovery(RedpandaTest):
Expand All @@ -31,14 +37,23 @@ class EndToEndTopicRecovery(RedpandaTest):
topics = (TopicSpec(), )

def __init__(self, test_context):
extra_rp_conf = dict(
enable_idempotence=True,
enable_transactions=True,
enable_leader_balancer=False,
partition_autobalancing_mode="off",
group_initial_rebalance_delay=300,
)
si_settings = SISettings(
log_segment_size=1024 * 1024,
cloud_storage_segment_max_upload_interval_sec=5,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True)
self.scale = Scale(test_context)
self._bucket = si_settings.cloud_storage_bucket
super().__init__(test_context=test_context, si_settings=si_settings)
super().__init__(test_context=test_context,
si_settings=si_settings,
extra_rp_conf=extra_rp_conf)
self._ctx = test_context
self._producer = None
self._consumer = None
Expand Down Expand Up @@ -169,3 +184,112 @@ def s3_has_all_data():
self.logger.info(f"Consumer read {consume_total} messages")
assert produce_acked >= num_messages
assert consume_valid >= produce_acked, f"produced {produce_acked}, consumed {consume_valid}"

@cluster(num_nodes=4)
@matrix(recovery_overrides=[{}, {
'retention.bytes': 1024,
'redpanda.remote.write': True,
'redpanda.remote.read': True,
}])
def test_restore_with_aborted_tx(self, recovery_overrides):
"""Produce data using transactions includin some percentage of aborted
transactions. Run recovery and make sure that record batches generated
by aborted transactions are not visible.
This should work because rm_stm will rebuild its snapshot right after
recovery. The SI will also supports aborted transctions via tx manifests.
The test has two variants:
- the partition is downloaded fully;
- the partition is downloaded partially and SI is used to read from the start;
"""
if self.debug_mode:
self.logger.info(
"Skipping test in debug mode (requires release build)")
return

producer = Producer(self.redpanda.brokers(), "topic-recovery-tx-test",
self.logger)

def done():
for _ in range(100):
try:
producer.produce(self.topic)
except ck.KafkaException as err:
self.logger.warn(f"producer error: {err}")
producer.reconnect()
self.logger.info("producer iteration complete")
topic_partitions = segments_count(self.redpanda,
self.topic,
partition_idx=0)
partitions = []
for p in topic_partitions:
partitions.append(p >= 10)
return all(partitions)

wait_until(done,
timeout_sec=120,
backoff_sec=1,
err_msg="producing failed")

assert producer.num_aborted > 0

# Wait until everything is uploaded
def s3_has_all_data():
objects = list(self.redpanda.get_objects_from_si())
for o in objects:
if o.Key.endswith("/manifest.json") and self.topic in o.Key:
data = self.redpanda.s3_client.get_object_data(
self._bucket, o.Key)
manifest = json.loads(data)
last_upl_offset = manifest['last_offset']
self.logger.info(
f"Found manifest at {o.Key}, last_offset is {last_upl_offset}"
)
# We have one partition so this invariant holds
# it has to be changed when the number of partitions
# will get larger. This will also require dfferent
# S3 check.
return last_upl_offset >= producer.cur_offset
return False

time.sleep(10)
wait_until(s3_has_all_data,
timeout_sec=300,
backoff_sec=5,
err_msg=f"Not all data is uploaded to S3 bucket")

# Whipe out the state on the nodes
self._stop_redpanda_nodes()
self._wipe_data()
# Run recovery
self._start_redpanda_nodes()
for topic_spec in self.topics:
self._restore_topic(topic_spec, recovery_overrides)

# Consume and validate
consumer = ck.Consumer(
{
'bootstrap.servers': self.redpanda.brokers(),
'group.id': 'topic-recovery-tx-test',
'auto.offset.reset': 'earliest',
},
logger=self.logger)
consumer.subscribe([self.topic])

consumed = []
while True:
msgs = consumer.consume(timeout=5.0)
if len(msgs) == 0:
break
consumed.extend([(m.key(), m.offset()) for m in msgs])

first_mismatch = ''
for p_key, (c_key, c_offset) in zip_longest(producer.keys, consumed):
if p_key != c_key:
first_mismatch = f"produced: {p_key}, consumed: {c_key} (offset: {c_offset})"
break

assert (not first_mismatch), (
f"produced and consumed messages differ, "
f"produced length: {len(producer.keys)}, consumed length: {len(consumed)}, "
f"first mismatch: {first_mismatch}")
52 changes: 4 additions & 48 deletions tests/rptest/tests/shadow_indexing_tx_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
segments_count,
wait_for_segments_removal,
)
from rptest.utils.si_utils import Producer

import confluent_kafka as ck

Expand Down Expand Up @@ -62,58 +63,13 @@ def test_shadow_indexing_aborted_txs(self):
when fetching from remote segments."""
topic = self.topics[0]

class Producer:
def __init__(self, brokers, logger):
self.keys = []
self.cur_offset = 0
self.brokers = brokers
self.logger = logger
self.num_aborted = 0
self.reconnect()

def reconnect(self):
self.producer = ck.Producer({
'bootstrap.servers':
self.brokers,
'transactional.id':
'shadow-indexing-tx-test',
})
self.producer.init_transactions()

def produce(self, topic):
"""produce some messages inside a transaction with increasing keys
and random values. Then randomly commit/abort the transaction."""

n_msgs = random.randint(50, 100)
keys = []

self.producer.begin_transaction()
for _ in range(n_msgs):
val = ''.join(
map(chr, (random.randint(0, 256)
for _ in range(random.randint(100, 1000)))))
self.producer.produce(topic.name, val,
str(self.cur_offset))
keys.append(str(self.cur_offset).encode('utf8'))
self.cur_offset += 1

self.logger.info(
f"writing {len(keys)} msgs: {keys[0]}-{keys[-1]}...")
self.producer.flush()
if random.random() < 0.1:
self.producer.abort_transaction()
self.num_aborted += 1
self.logger.info("aborted txn")
else:
self.producer.commit_transaction()
self.keys.extend(keys)

producer = Producer(self.redpanda.brokers(), self.logger)
producer = Producer(self.redpanda.brokers(), "shadow-indexing-tx-test",
self.logger)

def done():
for _ in range(100):
try:
producer.produce(topic)
producer.produce(topic.name)
except ck.KafkaException as err:
self.logger.warn(f"producer error: {err}")
producer.reconnect()
Expand Down
47 changes: 47 additions & 0 deletions tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST

import confluent_kafka
import random

EMPTY_SEGMENT_SIZE = 4096

BLOCK_SIZE = 4096
Expand Down Expand Up @@ -270,3 +273,47 @@ def is_segment(self, o: S3ObjectMetadata) -> bool:

def path_matches_any_topic(self, path: str) -> bool:
return any(t in path for t in self.topic_names)


class Producer:
def __init__(self, brokers, name, logger):
self.keys = []
self.cur_offset = 0
self.brokers = brokers
self.logger = logger
self.num_aborted = 0
self.name = name
self.reconnect()

def reconnect(self):
self.producer = confluent_kafka.Producer({
'bootstrap.servers': self.brokers,
'transactional.id': self.name,
})
self.producer.init_transactions()

def produce(self, topic):
"""produce some messages inside a transaction with increasing keys
and random values. Then randomly commit/abort the transaction."""

n_msgs = random.randint(50, 100)
keys = []

self.producer.begin_transaction()
for _ in range(n_msgs):
val = ''.join(
map(chr, (random.randint(0, 256)
for _ in range(random.randint(100, 1000)))))
self.producer.produce(topic, val, str(self.cur_offset))
keys.append(str(self.cur_offset).encode('utf8'))
self.cur_offset += 1

self.logger.info(f"writing {len(keys)} msgs: {keys[0]}-{keys[-1]}...")
self.producer.flush()
if random.random() < 0.1:
self.producer.abort_transaction()
self.num_aborted += 1
self.logger.info("aborted txn")
else:
self.producer.commit_transaction()
self.keys.extend(keys)

0 comments on commit 0d8ca88

Please sign in to comment.