Skip to content

Commit

Permalink
Merge pull request #8019 from mmaslankaprv/v22.3.x
Browse files Browse the repository at this point in the history
[v22.3.x] Backport of #5315 #7603 #7923 #7604 #7814 #8016
  • Loading branch information
dotnwat committed Jan 6, 2023
2 parents 6464a39 + f8720b7 commit 61b42e7
Show file tree
Hide file tree
Showing 20 changed files with 1,330 additions and 97 deletions.
7 changes: 7 additions & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,11 @@ RUN mkdir -p /opt/scripts && \
curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \
chmod +x /opt/scripts/seastar-addr2line

RUN mkdir -p /opt/scripts/offline_log_viewer
COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer

RUN mkdir -p /opt/scripts/consumer_offsets_recovery
COPY --chown=0:0 tools/consumer_offsets_recovery /opt/scripts/consumer_offsets_recovery
RUN python3 -m pip install --force --no-cache-dir -r /opt/scripts/consumer_offsets_recovery/requirements.txt

CMD service ssh start && tail -f /dev/null
4 changes: 3 additions & 1 deletion tests/docker/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
!tests/docker/ssh
!tests/setup.py
!tests/python
!tests/go
!tests/go
!tools/offline_log_viewer
!tools/consumer_offsets_recovery
46 changes: 46 additions & 0 deletions tests/rptest/clients/consumer_offsets_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.redpanda import RedpandaService


class ConsumerOffsetsRecovery:
"""
Wrap tools/consumer_offsets_recovery for use in tests
"""
def __init__(self, redpanda: RedpandaService):
self._redpanda = redpanda
self._cfg_path = '/tmp/cgr.properties'
self._offsets_cache_path = '/tmp/offsets'

def _cmd(self, partition_count, dry_run):
cmd = "python3 /opt/scripts/consumer_offsets_recovery/main.py"

cmd += f" --cfg {self._cfg_path}"
cmd += f" --offsets {self._offsets_cache_path}"
cmd += f" -p {partition_count}"
if not dry_run:
cmd += " --execute"
return cmd

def _render_config(self, node):
properties = {"bootstrap_servers": self._redpanda.brokers()}
content = ""
for k, v in properties.items():
content += f"{k}={v}\n"

node.account.create_file(self._cfg_path, content)

def change_partition_count(self, partition_count, node, dry_run):
node.account.remove(self._offsets_cache_path, allow_fail=True)
node.account.mkdir(self._offsets_cache_path)
self._render_config(node)
cmd = self._cmd(partition_count, dry_run)
output = node.account.ssh_output(cmd, combine_stderr=True)
self._redpanda.logger.info(f"out: {output}")
44 changes: 44 additions & 0 deletions tests/rptest/clients/offline_log_viewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import json


class OfflineLogViewer:
"""
Wrap tools/offline_log_viewer for use in tests: this is for tests that
want to peek at the structures, but also for validating the tool itself.
"""
def __init__(self, redpanda):
self._redpanda = redpanda

def _cmd(self, suffix):
viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py"
return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}"

def read_kvstore(self, node):
cmd = self._cmd("--type kvstore")
kvstore_json = node.account.ssh_output(cmd, combine_stderr=False)
return json.loads(kvstore_json)

def _json_cmd(self, node, suffix):
cmd = self._cmd(suffix=suffix)
json_out = node.account.ssh_output(cmd, combine_stderr=False)
try:
return json.loads(json_out)
except json.decoder.JSONDecodeError:
# Log the bad output before re-raising
self._redpanda.logger.error(f"Invalid JSON output: {json_out}")
raise

def read_controller(self, node):
return self._json_cmd(node, "--type controller")

def read_consumer_offsets(self, node):
return self._json_cmd(node, "--type consumer_offsets")
4 changes: 2 additions & 2 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ def parse_field(field_name, string):
static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$")

partition_pattern_static_member = re.compile(
"(?P<topic>.+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<instance_id>[^\s]*) *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<instance_id>[^\s]*) *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)
partition_pattern_dynamic_member = re.compile(
"(?P<topic>.+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)

def check_lines(lines):
Expand Down
1 change: 1 addition & 0 deletions tests/rptest/test_suite_ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ ec2:
- tests/wasm_redpanda_failure_recovery_test.py # no wasm
- tests/wasm_partition_movement_test.py # no wasm
- tests/e2e_iam_role_test.py # use static credentials
- tests/consumer_group_recovery_tool_test.py # use script available in dockerfile
111 changes: 111 additions & 0 deletions tests/rptest/tests/consumer_group_recovery_tool_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2020 Redpanda Data, Inc.
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.consumer_offsets_recovery import ConsumerOffsetsRecovery
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.rpk_producer import RpkProducer
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize


class ConsumerOffsetsRecoveryToolTest(PreallocNodesTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
self.initial_partition_count = 3
super(ConsumerOffsetsRecoveryToolTest, self).__init__(
test_ctx,
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={
"group_topic_partitions": self.initial_partition_count
},
node_prealloc_count=1,
**kwargs)

def describe_all_groups(self):
rpk = RpkTool(self.redpanda)
all_groups = {}
for g in rpk.group_list():
gd = rpk.group_describe(g)
all_groups[gd.name] = {}
for p in gd.partitions:
all_groups[
gd.name][f"{p.topic}/{p.partition}"] = p.current_offset

return all_groups

@cluster(num_nodes=4)
def test_consumer_offsets_partition_count_change(self):
topic = TopicSpec(partition_count=16, replication_factor=3)
self.client().create_topic([topic])
msg_size = 1024
msg_cnt = 10000

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size,
msg_cnt,
custom_node=self.preallocated_nodes)

producer.start(clean=False)

wait_until(lambda: producer.produce_status.acked > 10,
timeout_sec=30,
backoff_sec=0.5)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
topic.name,
msg_size,
readers=3,
nodes=self.preallocated_nodes)
consumer.start(clean=False)

producer.wait()
consumer.wait()

assert consumer.consumer_status.validator.valid_reads >= producer.produce_status.acked

groups_pre_migration = self.describe_all_groups()

cgr = ConsumerOffsetsRecovery(self.redpanda)

# execute recovery tool, ask for 16 partitions in consumer offsets topic
#
# this is dry run, expect that partition count didn't change
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=True)
rpk = RpkTool(self.redpanda)
tp_desc = list(rpk.describe_topic("__consumer_offsets"))

# check if topic partition count changed
assert len(tp_desc) == self.initial_partition_count

# now allow the tool to execute
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=False)

# check if topic partition count changed
wait_until(
lambda: len(list(rpk.describe_topic("__consumer_offsets"))) == 16,
20)

groups_post_migration = self.describe_all_groups()

assert groups_pre_migration == groups_post_migration
32 changes: 31 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
Expand All @@ -32,7 +33,10 @@ def __init__(self, test_ctx, *args, **kwargs):
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={"enable_leader_balancer": False},
extra_rp_conf={
"enable_leader_balancer": False,
"default_topic_replications": 3
},
**kwargs)

def make_consumer_properties(base_properties, instance_id=None):
Expand Down Expand Up @@ -154,6 +158,32 @@ def test_basic_group_join(self, static_members):
c.wait()
c.free()

gd = RpkTool(self.redpanda).group_describe(group=group)
viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.nodes:
consumer_offsets_partitions = viewer.read_consumer_offsets(
node=node)
offsets = {}
groups = set()
for partition in consumer_offsets_partitions:
for r in partition['records']:
self.logger.info(f"{r}")
if r['key']['type'] == 'group_metadata':
groups.add(r['key']['group_id'])
elif r['key']['type'] == 'offset_commit':
tp = f"{r['key']['topic']}/{r['key']['partition']}"
if tp not in offsets:
offsets[tp] = -1
offsets[tp] = max(r['value']['committed_offset'],
offsets[tp])

assert len(groups) == 1 and group in groups
assert all([
f"{p.topic}/{p.partition}" in offsets
and offsets[f"{p.topic}/{p.partition}"] == p.current_offset
for p in gd.partitions
])

@cluster(num_nodes=6)
def test_mixed_consumers_join(self):
"""
Expand Down
Loading

0 comments on commit 61b42e7

Please sign in to comment.