Skip to content

Commit

Permalink
tests: added parsing of consumer offsets topic to one of the test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Dec 20, 2022
1 parent 3fb829c commit fae4cc3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
17 changes: 11 additions & 6 deletions tests/rptest/clients/offline_log_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@ def read_kvstore(self, node):
kvstore_json = node.account.ssh_output(cmd, combine_stderr=False)
return json.loads(kvstore_json)

def read_controller(self, node):
cmd = self._cmd("--type controller")
controller_json = node.account.ssh_output(cmd, combine_stderr=False)
def _json_cmd(self, node, suffix):
cmd = self._cmd(suffix=suffix)
json_out = node.account.ssh_output(cmd, combine_stderr=False)
try:
return json.loads(controller_json)
return json.loads(json_out)
except json.decoder.JSONDecodeError:
# Log the bad output before re-raising
self._redpanda.logger.error(
f"Invalid JSON output: {controller_json}")
self._redpanda.logger.error(f"Invalid JSON output: {json_out}")
import time
time.sleep(3600)
raise

def read_controller(self, node):
return self._json_cmd(node, "--type controller")

def read_consumer_offsets(self, node):
return self._json_cmd(node, "--type consumer_offsets")
32 changes: 31 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
Expand All @@ -29,7 +30,10 @@ def __init__(self, test_ctx, *args, **kwargs):
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={"enable_leader_balancer": False},
extra_rp_conf={
"enable_leader_balancer": False,
"default_topic_replications": 3
},
**kwargs)

def make_consumer_properties(base_properties, instance_id=None):
Expand Down Expand Up @@ -151,6 +155,32 @@ def test_basic_group_join(self, static_members):
c.wait()
c.free()

gd = RpkTool(self.redpanda).group_describe(group=group)
viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.nodes:
consumer_offsets_partitions = viewer.read_consumer_offsets(
node=node)
offsets = {}
groups = set()
for partition in consumer_offsets_partitions:
for r in partition['records']:
self.logger.info(f"{r}")
if r['key']['type'] == 'group_metadata':
groups.add(r['key']['group_id'])
elif r['key']['type'] == 'offset_commit':
tp = f"{r['key']['topic']}/{r['key']['partition']}"
if tp not in offsets:
offsets[tp] = -1
offsets[tp] = max(r['value']['committed_offset'],
offsets[tp])

assert len(groups) == 1 and group in groups
assert all([
f"{p.topic}/{p.partition}" in offsets
and offsets[f"{p.topic}/{p.partition}"] == p.current_offset
for p in gd.partitions
])

@cluster(num_nodes=6)
def test_mixed_consumers_join(self):
"""
Expand Down

0 comments on commit fae4cc3

Please sign in to comment.