Skip to content

Commit

Permalink
tools/viewer: added ability to read consumer offsets topic
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Dec 16, 2022
1 parent 9c479ba commit f0dac10
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 3 deletions.
118 changes: 118 additions & 0 deletions tools/offline_log_viewer/consumer_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import base64
from io import BytesIO
from model import *
from reader import Endianness, Reader
from storage import Segment
import datetime


def decode_key_type(v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"


def decode_member_proto(rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret


def decode_member(rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret


def decode_metadata(rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(decode_member)
return ret


def decode_key(key_rdr):
ret = {}
v = key_rdr.read_int16()
ret['type'] = decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret


def decode_offset_commit(v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
ret['leader_epoch'] = v_rdr.read_int32()
ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['timestamp'] = v_rdr.read_int64()
return ret


def decode_record(hdr, r):
v = {}
v['epoch'] = hdr.first_ts
v['offset'] = hdr.base_offset + r.offset_delta
v['ts'] = datetime.datetime.utcfromtimestamp(
hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN)

v['key'] = decode_key(k_rdr)

if v['key']['type'] == "group_metadata":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_metadata(rdr)
else:
v['value'] = 'tombstone'
elif v['key']['type'] == "offset_commit":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_offset_commit(rdr)
else:
v['value'] = 'tombstone'

return v


class OffsetsLog:
def __init__(self, ntp):
self.ntp = ntp
self.records = []

def decode(self):
paths = []
for path in self.ntp.segments:
paths.append(path)
paths.sort()
for path in paths:
s = Segment(path)
for b in s:
if b.header.type != 1:
continue
for r in b:
self.records.append(decode_record(b.header, r))
2 changes: 1 addition & 1 deletion tools/offline_log_viewer/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def read_string(self):
def read_kafka_string(self):
len = self.read_int16()
return self.stream.read(len).decode('utf-8')

def read_kafka_bytes(self):
len = self.read_int32()
return self.stream.read(len)
Expand Down
17 changes: 15 additions & 2 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from controller import ControllerLog
from consumer_groups import GroupsLog
from consumer_offsets import OffsetsLog
from tx_coordinator import TxLog

from storage import Store
Expand Down Expand Up @@ -55,6 +56,15 @@ def print_groups(store):
logger.info("")


def print_consumer_offsets(store):
for ntp in store.ntps:
if ntp.nspace == "kafka" and ntp.topic == "__consumer_offsets":
l = OffsetsLog(ntp)
l.decode()
logger.info(json.dumps(l.records, indent=2))
logger.info("")


def print_tx_coordinator(store):
for ntp in store.ntps:
if ntp.nspace == "kafka_internal" and ntp.topic == "tx":
Expand Down Expand Up @@ -103,7 +113,8 @@ def generate_options():
parser.add_argument('--type',
type=str,
choices=[
'controller', 'kvstore', 'kafka', 'group',
'controller', 'kvstore', 'kafka',
'consumer_offsets', 'legacy-group',
'kafka_records', 'tx_coordinator'
],
required=True,
Expand Down Expand Up @@ -141,8 +152,10 @@ def generate_options():
elif options.type == "kafka_records":
validate_topic(options.path, options.topic)
print_kafka(store, options.topic, headers_only=False)
elif options.type == "group":
elif options.type == "legacy-group":
print_groups(store)
elif options.type == "consumer_offsets":
print_consumer_offsets(store)
elif options.type == "tx_coordinator":
validate_tx_coordinator(options.path)
print_tx_coordinator(store)
Expand Down

0 comments on commit f0dac10

Please sign in to comment.