Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tool allowing changing number of __consumer_offsets topic partitions #7604

Merged
merged 3 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,8 @@ RUN mkdir -p /opt/scripts && \
RUN mkdir -p /opt/scripts/offline_log_viewer
COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer

RUN mkdir -p /opt/scripts/consumer_offsets_recovery
COPY --chown=0:0 tools/consumer_offsets_recovery /opt/scripts/consumer_offsets_recovery
RUN python3 -m pip install --force --no-cache-dir -r /opt/scripts/consumer_offsets_recovery/requirements.txt

CMD service ssh start && tail -f /dev/null
1 change: 1 addition & 0 deletions tests/docker/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
!tests/python
!tests/go
!tools/offline_log_viewer
!tools/consumer_offsets_recovery
46 changes: 46 additions & 0 deletions tests/rptest/clients/consumer_offsets_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.redpanda import RedpandaService


class ConsumerOffsetsRecovery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super +1

"""
Wrap tools/consumer_offsets_recovery for use in tests
"""
def __init__(self, redpanda: RedpandaService):
self._redpanda = redpanda
self._cfg_path = '/tmp/cgr.properties'
self._offsets_cache_path = '/tmp/offsets'

def _cmd(self, partition_count, dry_run):
cmd = "python3 /opt/scripts/consumer_offsets_recovery/main.py"

cmd += f" --cfg {self._cfg_path}"
cmd += f" --offsets {self._offsets_cache_path}"
cmd += f" -p {partition_count}"
if not dry_run:
cmd += " --execute"
return cmd

def _render_config(self, node):
properties = {"bootstrap_servers": self._redpanda.brokers()}
content = ""
for k, v in properties.items():
content += f"{k}={v}\n"

node.account.create_file(self._cfg_path, content)

def change_partition_count(self, partition_count, node, dry_run):
node.account.remove(self._offsets_cache_path, allow_fail=True)
node.account.mkdir(self._offsets_cache_path)
self._render_config(node)
cmd = self._cmd(partition_count, dry_run)
output = node.account.ssh_output(cmd, combine_stderr=True)
self._redpanda.logger.info(f"out: {output}")
1 change: 1 addition & 0 deletions tests/rptest/test_suite_ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ ec2:
- tests/wasm_redpanda_failure_recovery_test.py # no wasm
- tests/wasm_partition_movement_test.py # no wasm
- tests/e2e_iam_role_test.py # use static credentials
- tests/consumer_group_recovery_tool_test.py # use script available in dockerfile
111 changes: 111 additions & 0 deletions tests/rptest/tests/consumer_group_recovery_tool_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2020 Redpanda Data, Inc.
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.consumer_offsets_recovery import ConsumerOffsetsRecovery
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.rpk_producer import RpkProducer
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize


class ConsumerOffsetsRecoveryToolTest(PreallocNodesTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
self.initial_partition_count = 3
super(ConsumerOffsetsRecoveryToolTest, self).__init__(
test_ctx,
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={
"group_topic_partitions": self.initial_partition_count
},
node_prealloc_count=1,
**kwargs)

def describe_all_groups(self):
rpk = RpkTool(self.redpanda)
all_groups = {}
for g in rpk.group_list():
gd = rpk.group_describe(g)
all_groups[gd.name] = {}
for p in gd.partitions:
all_groups[
gd.name][f"{p.topic}/{p.partition}"] = p.current_offset

return all_groups

@cluster(num_nodes=4)
def test_consumer_offsets_partition_count_change(self):
topic = TopicSpec(partition_count=16, replication_factor=3)
self.client().create_topic([topic])
msg_size = 1024
msg_cnt = 10000

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size,
msg_cnt,
custom_node=self.preallocated_nodes)

producer.start(clean=False)

wait_until(lambda: producer.produce_status.acked > 10,
timeout_sec=30,
backoff_sec=0.5)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
topic.name,
msg_size,
readers=3,
nodes=self.preallocated_nodes)
consumer.start(clean=False)

producer.wait()
consumer.wait()

assert consumer.consumer_status.validator.valid_reads >= producer.produce_status.acked

groups_pre_migration = self.describe_all_groups()

cgr = ConsumerOffsetsRecovery(self.redpanda)

# execute recovery tool, ask for 16 partitions in consumer offsets topic
#
# this is dry run, expect that partition count didn't change
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=True)
rpk = RpkTool(self.redpanda)
tp_desc = list(rpk.describe_topic("__consumer_offsets"))

# check if topic partition count changed
assert len(tp_desc) == self.initial_partition_count

# now allow the tool to execute
cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=False)

# check if topic partition count changed
wait_until(
lambda: len(list(rpk.describe_topic("__consumer_offsets"))) == 16,
20)

groups_post_migration = self.describe_all_groups()

assert groups_pre_migration == groups_post_migration
6 changes: 6 additions & 0 deletions tools/consumer_offsets_recovery/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
bootstrap_servers=127.0.0.1
# security
# security_protocol=SASL_PLAINTEXT
# sasl_mechanism=SCRAM-SHA-256
# sasl_plain_username=admin
# sasl_plain_password=admin
163 changes: 163 additions & 0 deletions tools/consumer_offsets_recovery/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import os
from pathlib import Path

import subprocess
from kafka import KafkaAdminClient
from kafka import KafkaConsumer
from kafka.admin.new_topic import NewTopic
from kafka import TopicPartition
from kafka import OffsetAndMetadata
from jproperties import Properties
import logging

logger = logging.getLogger('cg-recovery-tool')


def read_offsets(admin: KafkaAdminClient):

groups_dict = {}
groups = admin.list_consumer_groups()
for g, _ in groups:
logger.info(f"reading group '{g}' offsets")
offsets = admin.list_consumer_group_offsets(group_id=g)
groups_dict[g] = offsets

return groups_dict


def read_config(path):
logger.debug(f"reading configuration from {path}")
cfg = Properties()
with open(path, 'rb') as f:
cfg.load(f)
cfg_dict = {}
for k, v in cfg.items():
cfg_dict[k] = v.data

return cfg_dict


def seek_to_file(path, cfg, dry_run):

logger.debug(f"reading offsets file: {path}")
offsets = {}
with open(path, 'r') as f:
group = ""
consumer = None
for i, l in enumerate(f):
if i == 0:
group = l.strip()
if not dry_run:
consumer = KafkaConsumer(group_id=group, **cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: No need to check if i is 0 within the loop, can just initialize consumer before for loop and perform some error checking against the value of the first line in the file.

logger.info(f"seeking group {group} to file {path}")
continue
if l == "":
continue

topic, partition, offset = tuple([p.strip() for p in l.split(',')])
logger.debug(
f"group: {group} partition: {topic}/{partition} offset: {offset}"
)
tp = TopicPartition(topic=topic, partition=int(partition))

offsets[tp] = OffsetAndMetadata(offset=int(offset), metadata='')
if not dry_run:
consumer.commit(offsets=offsets)


def offset_file(group):
return f"{group}.offsets"


def seek_all(config, path, dry_run):
logger.info(f"reading offsets from: {path}")
for _, _, files in os.walk(path):
for f in files:
seek_to_file(Path(path).joinpath(f), config, dry_run)


def query_offsets(offsets_path, cfg):
admin = KafkaAdminClient(**cfg)

current_offsets = read_offsets(admin)
path = Path(offsets_path)
logger.info(f"storing offsets in {offsets_path}")
path.mkdir(exist_ok=True)
for group_id, offsets in current_offsets.items():
with open(path.joinpath(offset_file(group_id)), 'w') as f:
f.write(f"{group_id}\n")
for tp, md in offsets.items():
topic, partition = tp
offset, _ = md
f.write(f"{topic},{partition},{offset}\n")


def recreate_topic(partitions, replication_factor, cfg):
admin = KafkaAdminClient(**cfg)
logger.info('removing __consumer_offsets topic')

admin.delete_topics(["__consumer_offsets"])

new_topic = NewTopic(name="__consumer_offsets",
num_partitions=partitions,
topic_configs={
"cleanup.policy": "compact",
"retention.bytes": -1,
"retention.ms": -1,
},
replication_factor=replication_factor)
logger.info('recreating __consumer_offsets topic')
admin.create_topics(new_topics=[new_topic])


def main():
import argparse

def generate_options():
parser = argparse.ArgumentParser(
description='Redpanda group recovery tool')
parser.add_argument('--cfg',
type=str,
help='config file path',
required=True)
parser.add_argument('-o',
"--offsets-path",
type=str,
default="./offsets")
parser.add_argument('-v', "--verbose", action="store_true")
parser.add_argument('-s', "--skip-query", action="store_true")
parser.add_argument('-p', "--target-partitions", type=int, default=16)
parser.add_argument('-e', "--execute", action="store_true")
parser.add_argument('-r', "--replication-factor", type=int, default=3)

return parser

parser = generate_options()
options, _ = parser.parse_known_args()
logging.basicConfig(
format="%(asctime)s %(name)-20s %(levelname)-8s %(message)s")

if options.verbose:
logging.basicConfig(level="DEBUG")
logger.setLevel(level="DEBUG")
else:
logging.basicConfig(level="ERROR")
logger.setLevel(level="INFO")

logger.info(f"starting group recovery tool with options: {options}")
cfg = read_config(options.cfg)

logger.info(f"configuration: {cfg}")

if not options.skip_query:
query_offsets(options.offsets_path, cfg)

if options.execute:
recreate_topic(options.target_partitions, options.replication_factor,
cfg)

seek_all(cfg, options.offsets_path, dry_run=not options.execute)


if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions tools/consumer_offsets_recovery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kafka-python
jproperties