Skip to content

Commit

Permalink
tests: added test for consumer offsets partition change tool
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Dec 21, 2022
1 parent 28c5bae commit 34e3ff2
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
46 changes: 46 additions & 0 deletions tests/rptest/clients/consumer_offsets_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.redpanda import RedpandaService


class ConsumerOffsetsRecovery:
"""
Wrap tools/consumer_offsets_recovery for use in tests
"""
def __init__(self, redpanda: RedpandaService):
self._redpanda = redpanda
self._cfg_path = '/tmp/cgr.properties'
self._offsets_cache_path = '/tmp/offsets'

def _cmd(self, partition_count, dry_run):
cmd = "python3 /opt/scripts/consumer_offsets_recovery/main.py"

cmd += f" --cfg {self._cfg_path}"
cmd += f" --offsets {self._offsets_cache_path}"
cmd += f" -p {partition_count}"
if not dry_run:
cmd += " --execute"
return cmd

def _render_config(self, node):
properties = {"bootstrap_servers": self._redpanda.brokers()}
content = ""
for k, v in properties.items():
content += f"{k}={v}\n"

node.account.create_file(self._cfg_path, content)

def change_partition_count(self, partition_count, node, dry_run):
node.account.remove(self._offsets_cache_path, allow_fail=True)
node.account.mkdir(self._offsets_cache_path)
self._render_config(node)
cmd = self._cmd(partition_count, dry_run)
output = node.account.ssh_output(cmd, combine_stderr=True)
self._redpanda.logger.info(f"out: {output}")
1 change: 1 addition & 0 deletions tests/rptest/test_suite_ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ ec2:
- tests/wasm_redpanda_failure_recovery_test.py # no wasm
- tests/wasm_partition_movement_test.py # no wasm
- tests/e2e_iam_role_test.py # use static credentials
- tests/consumer_group_recovery_tool_test.py # use script available in dockerfile
111 changes: 111 additions & 0 deletions tests/rptest/tests/consumer_group_recovery_tool_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2020 Redpanda Data, Inc.
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.consumer_offsets_recovery import ConsumerOffsetsRecovery
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.rpk_producer import RpkProducer
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize


class ConsumerOffsetsRecoveryToolTest(PreallocNodesTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
self.initial_partition_count = 3
super(ConsumerOffsetsRecoveryToolTest, self).__init__(
test_ctx,
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={
"group_topic_partitions": self.initial_partition_count
},
node_prealloc_count=1,
**kwargs)

def describe_all_groups(self):
rpk = RpkTool(self.redpanda)
all_groups = {}
for g in rpk.group_list():
gd = rpk.group_describe(g)
all_groups[gd.name] = {}
for p in gd.partitions:
all_groups[
gd.name][f"{p.topic}/{p.partition}"] = p.current_offset

return all_groups

@cluster(num_nodes=4)
def test_consumer_offsets_partition_count_change(self):
topic = TopicSpec(partition_count=16, replication_factor=3)
self.client().create_topic([topic])
msg_size = 1024
msg_cnt = 10000

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size,
msg_cnt,
custom_node=self.preallocated_nodes)

producer.start(clean=False)

wait_until(lambda: producer.produce_status.acked > 10,
timeout_sec=30,
backoff_sec=0.5)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
topic.name,
msg_size,
readers=3,
nodes=self.preallocated_nodes)
consumer.start(clean=False)

producer.wait()
consumer.wait()

assert consumer.consumer_status.validator.valid_reads >= producer.produce_status.acked

groups_pre_migration = self.describe_all_groups()

cgr = ConsumerOffsetsRecovery(self.redpanda)

# execute recovery tool, ask for 16 partitions in consumer offsets topic
#
# this is dry run, expect that partition count didn't change
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=True)
rpk = RpkTool(self.redpanda)
tp_desc = list(rpk.describe_topic("__consumer_offsets"))

# check if topic partition count changed
assert len(tp_desc) == self.initial_partition_count

# now allow the tool to execute
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=False)

# check if topic partition count changed
wait_until(
lambda: len(list(rpk.describe_topic("__consumer_offsets"))) == 16,
20)

groups_post_migration = self.describe_all_groups()

assert groups_pre_migration == groups_post_migration

0 comments on commit 34e3ff2

Please sign in to comment.