Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata viewer consumer offsets support #7603

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions tests/rptest/clients/offline_log_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@ def read_kvstore(self, node):
kvstore_json = node.account.ssh_output(cmd, combine_stderr=False)
return json.loads(kvstore_json)

def read_controller(self, node):
cmd = self._cmd("--type controller")
controller_json = node.account.ssh_output(cmd, combine_stderr=False)
def _json_cmd(self, node, suffix):
cmd = self._cmd(suffix=suffix)
json_out = node.account.ssh_output(cmd, combine_stderr=False)
try:
return json.loads(controller_json)
return json.loads(json_out)
except json.decoder.JSONDecodeError:
# Log the bad output before re-raising
self._redpanda.logger.error(
f"Invalid JSON output: {controller_json}")
self._redpanda.logger.error(f"Invalid JSON output: {json_out}")
import time
time.sleep(3600)
raise

def read_controller(self, node):
return self._json_cmd(node, "--type controller")

def read_consumer_offsets(self, node):
return self._json_cmd(node, "--type consumer_offsets")
4 changes: 2 additions & 2 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ def parse_field(field_name, string):
static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$")

partition_pattern_static_member = re.compile(
"(?P<topic>.+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<instance_id>[^\s]*) *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<instance_id>[^\s]*) *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)
partition_pattern_dynamic_member = re.compile(
"(?P<topic>.+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)

def check_lines(lines):
Expand Down
32 changes: 31 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
Expand All @@ -32,7 +33,10 @@ def __init__(self, test_ctx, *args, **kwargs):
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={"enable_leader_balancer": False},
extra_rp_conf={
"enable_leader_balancer": False,
"default_topic_replications": 3
},
**kwargs)

def make_consumer_properties(base_properties, instance_id=None):
Expand Down Expand Up @@ -154,6 +158,32 @@ def test_basic_group_join(self, static_members):
c.wait()
c.free()

gd = RpkTool(self.redpanda).group_describe(group=group)
viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.nodes:
consumer_offsets_partitions = viewer.read_consumer_offsets(
node=node)
offsets = {}
groups = set()
for partition in consumer_offsets_partitions:
for r in partition['records']:
self.logger.info(f"{r}")
if r['key']['type'] == 'group_metadata':
groups.add(r['key']['group_id'])
elif r['key']['type'] == 'offset_commit':
tp = f"{r['key']['topic']}/{r['key']['partition']}"
if tp not in offsets:
offsets[tp] = -1
offsets[tp] = max(r['value']['committed_offset'],
offsets[tp])

assert len(groups) == 1 and group in groups
assert all([
f"{p.topic}/{p.partition}" in offsets
and offsets[f"{p.topic}/{p.partition}"] == p.current_offset
for p in gd.partitions
])

@cluster(num_nodes=6)
def test_mixed_consumers_join(self):
"""
Expand Down
123 changes: 123 additions & 0 deletions tools/offline_log_viewer/consumer_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import base64
from io import BytesIO
from model import *
from reader import Endianness, Reader
from storage import Segment
import datetime


def decode_key_type(v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"


def decode_member_proto(rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret


def decode_member(rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret


def decode_metadata(rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(decode_member)
return ret


def decode_key(key_rdr):
ret = {}
v = key_rdr.read_int16()
ret['type'] = decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret


def decode_offset_commit(v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
if ret['version'] >= 3:
ret['leader_epoch'] = v_rdr.read_int32()

ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['commit_timestamp'] = v_rdr.read_int64()
if ret['version'] == 1:
ret['expiry_timestamp'] = v_rdr.read_int64()

return ret


def decode_record(hdr, r):
v = {}
v['epoch'] = hdr.first_ts
v['offset'] = hdr.base_offset + r.offset_delta
v['ts'] = datetime.datetime.utcfromtimestamp(
hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN)

v['key'] = decode_key(k_rdr)

if v['key']['type'] == "group_metadata":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_metadata(rdr)
else:
v['value'] = 'tombstone'
elif v['key']['type'] == "offset_commit":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_offset_commit(rdr)
else:
v['value'] = 'tombstone'

return v


class OffsetsLog:
def __init__(self, ntp):
self.ntp = ntp
self.records = []

def decode(self):
paths = []
for path in self.ntp.segments:
paths.append(path)
paths.sort()
for path in paths:
s = Segment(path)
for b in s:
if b.header.type != 1:
continue
for r in b:
self.records.append(decode_record(b.header, r))
43 changes: 34 additions & 9 deletions tools/offline_log_viewer/reader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
import struct
import collections
from io import BufferedReader, BytesIO
Expand All @@ -9,10 +10,16 @@
('version', 'compat_version', 'size'))


class Endianness(Enum):
BIG_ENDIAN = 0
LITTLE_ENDIAN = 1


class Reader:
def __init__(self, stream: BytesIO):
def __init__(self, stream, endianness=Endianness.LITTLE_ENDIAN):
# BytesIO provides .getBuffer(), BufferedReader peek()
self.stream = BufferedReader(stream)
self.endianness = endianness

@staticmethod
def _decode_zig_zag(v):
Expand All @@ -32,29 +39,33 @@ def read_varint(self):

return Reader._decode_zig_zag(result)

def with_endianness(self, str):
ch = '<' if self.endianness == Endianness.LITTLE_ENDIAN else '>'
return f"{ch}{str}"

def read_int8(self):
return struct.unpack("<b", self.stream.read(1))[0]
return struct.unpack(self.with_endianness('b'), self.stream.read(1))[0]

def read_uint8(self):
return struct.unpack("<B", self.stream.read(1))[0]
return struct.unpack(self.with_endianness('B'), self.stream.read(1))[0]

def read_int16(self):
return struct.unpack("<h", self.stream.read(2))[0]
return struct.unpack(self.with_endianness('h'), self.stream.read(2))[0]

def read_uint16(self):
return struct.unpack("<H", self.stream.read(2))[0]
return struct.unpack(self.with_endianness('H'), self.stream.read(2))[0]

def read_int32(self):
return struct.unpack("<i", self.stream.read(4))[0]
return struct.unpack(self.with_endianness('i'), self.stream.read(4))[0]

def read_uint32(self):
return struct.unpack("<I", self.stream.read(4))[0]
return struct.unpack(self.with_endianness('I'), self.stream.read(4))[0]

def read_int64(self):
return struct.unpack("<q", self.stream.read(8))[0]
return struct.unpack(self.with_endianness('q'), self.stream.read(8))[0]

def read_uint64(self):
return struct.unpack("<Q", self.stream.read(8))[0]
return struct.unpack(self.with_endianness('Q'), self.stream.read(8))[0]

def read_serde_enum(self):
return self.read_int32()
Expand All @@ -70,12 +81,26 @@ def read_string(self):
len = self.read_int32()
return self.stream.read(len).decode('utf-8')

def read_kafka_string(self):
len = self.read_int16()
return self.stream.read(len).decode('utf-8')

def read_kafka_bytes(self):
len = self.read_int32()
return self.stream.read(len)

def read_optional(self, type_read):
present = self.read_int8()
if present == 0:
return None
return type_read(self)

def read_kafka_optional_string(self):
len = self.read_int16()
if len == -1:
return None
return self.stream.read(len).decode('utf-8')

def read_vector(self, type_read):
sz = self.read_int32()
ret = []
Expand Down
24 changes: 22 additions & 2 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from controller import ControllerLog
from consumer_groups import GroupsLog
from consumer_offsets import OffsetsLog
from tx_coordinator import TxLog

from storage import Store
Expand Down Expand Up @@ -66,6 +67,22 @@ def print_groups(store):
logger.info("")


def print_consumer_offsets(store):
records = []
for ntp in store.ntps:
if ntp.nspace == "kafka" and ntp.topic == "__consumer_offsets":
l = OffsetsLog(ntp)
l.decode()
records.append({
"partition_id": ntp.partition,
"records": l.records
})

# Send JSON output to stdout in case caller wants to parse it, other
# CLI output goes to stderr via logger
print(json.dumps(records, indent=2))


def print_tx_coordinator(store):
for ntp in store.ntps:
if ntp.nspace == "kafka_internal" and ntp.topic == "tx":
Expand Down Expand Up @@ -114,7 +131,8 @@ def generate_options():
parser.add_argument('--type',
type=str,
choices=[
'controller', 'kvstore', 'kafka', 'group',
'controller', 'kvstore', 'kafka',
'consumer_offsets', 'legacy-group',
'kafka_records', 'tx_coordinator'
],
required=True,
Expand Down Expand Up @@ -152,8 +170,10 @@ def generate_options():
elif options.type == "kafka_records":
validate_topic(options.path, options.topic)
print_kafka(store, options.topic, headers_only=False)
elif options.type == "group":
elif options.type == "legacy-group":
print_groups(store)
elif options.type == "consumer_offsets":
print_consumer_offsets(store)
elif options.type == "tx_coordinator":
validate_tx_coordinator(options.path)
print_tx_coordinator(store)
Expand Down