Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/metadata_viewer: Refactor metadata_viewer and add transaction metadata support #5294

Merged
merged 6 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions tools/metadata_viewer/kafka.py

This file was deleted.

2 changes: 2 additions & 0 deletions tools/offline_log_viewer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Offline tool to parse RedPanda disk logs.
Usage: `python viewer.py -h`
52 changes: 52 additions & 0 deletions tools/offline_log_viewer/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from storage import Segment
from enum import Enum
from io import BytesIO
from reader import Reader
import struct


class KafkaControlRecordType(Enum):
"""Valid control record types defined by Kafka, currently only
used for transactional control records.
Keep this in sync with model::control_record_type
"""
tx_abort = 0
tx_commit = 1
unknown = -1


class KafkaLog:
def __init__(self, ntp, headers_only):
self.ntp = ntp
self.headers_only = headers_only

def get_control_record_type(self, key):
rdr = Reader(BytesIO(key))
rdr.skip(2) # skip the 16bit version.
# Encoded as big endian
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16())))
return KafkaControlRecordType(type_rdr.read_int16()).name

def decode(self):
self.results = []
for batch in self.batches():
header = batch.header_dict()
yield header
if not self.headers_only:
records = []
for record in batch:
attrs = header["expanded_attrs"]
is_tx_ctrl = attrs["transactional"] and attrs[
"control_batch"]
record_dict = record.kv_dict()
records.append(record_dict)
if is_tx_ctrl:
record_dict["type"] = self.get_control_record_type(
record.key)
yield record_dict

def batches(self):
for path in self.ntp.segments:
s = Segment(path)
for batch in s:
yield batch
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
from enum import Enum
import os

import struct
Expand Down Expand Up @@ -56,6 +57,11 @@ def __init__(self, length, attrs, timestamp_delta, offset_delta, key,
self.value = value
self.headers = headers

def kv_dict(self):
key = None if self.key == None else self.key.hex()
val = None if self.value == None else self.value.hex()
return {"k": key, "v": val}


class RecordHeader:
def __init__(self, key, value):
Expand Down Expand Up @@ -104,7 +110,54 @@ def __next__(self):
headers)


class BatchType(Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good -- controller.py already has some of these magic numbers in, so maybe those could be replaced with references to this class at the same time as adding it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I did some sanity testing on my local controller log and kvstore data and things seem ok.

"""Keep this in sync with model/record_batch_types.h"""
raft_data = 1
raft_configuration = 2
controller = 3
kvstore = 4
checkpoint = 5
topic_management_cmd = 6
ghost_batch = 7
id_allocator = 8
tx_prepare = 9
tx_fence = 10
tm_update = 11
user_management_cmd = 12
acl_management_cmd = 13
group_prepare_tx = 14
group_commit_tx = 15
group_abort_tx = 16
node_management_cmd = 17
data_policy_management_cmd = 18
archival_metadata = 19
cluster_config_cmd = 20
feature_update = 21
unknown = -1

@classmethod
def _missing_(e, value):
return e.unknown


class Batch:
class CompressionType(Enum):
none = 0
gzip = 1
snappy = 2
lz4 = 3
zstd = 4
unknown = -1

@classmethod
def _missing_(e, value):
return e.unknown

compression_mask = 0x7
ts_type_mask = 0x8
transactional_mask = 0x10
control_mask = 0x20

def __init__(self, index, header, records):
self.index = index
self.header = header
Expand All @@ -121,6 +174,20 @@ def __init__(self, index, header, records):
if self.header.crc != crc:
raise CorruptBatchError(self)

def header_dict(self):
header = self.header._asdict()
attrs = header['attrs']
header["type_name"] = BatchType(header["type"]).name
header['expanded_attrs'] = {
'compression':
Batch.CompressionType(attrs & Batch.compression_mask).name,
'transactional':
attrs & Batch.transactional_mask == Batch.transactional_mask,
'control_batch': attrs & Batch.control_mask == Batch.control_mask,
'timestamp_type': attrs & Batch.ts_type_mask == Batch.ts_type_mask
}
return header

def last_offset(self):
return self.header.base_offset + self.header.record_count - 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,16 @@ def print_controller(store):
logger.info(json.dumps(ctrl.records, indent=2))


def print_kafka(store, topic):
def print_kafka(store, topic, headers_only):
for ntp in store.ntps:
if ntp.nspace == "kafka":
if ntp.nspace in ["kafka", "kafka_internal"]:
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for header in log.batch_headers():
logger.info(json.dumps(header, indent=2))


def print_kafka_records(store, topic):
for ntp in store.ntps:
if ntp.nspace == "kafka":
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for batch in log.batches():
logger.info(json.dumps(batch.header._asdict(), indent=2))
for record in batch:
logger.info(
json.dumps(
{
"key":
None
if record.key == None else record.key.hex(),
"value":
None if record.value == None else
record.value.hex(),
},
indent=2))
log = KafkaLog(ntp, headers_only=headers_only)
for result in log.decode():
logger.info(json.dumps(result, indent=2))


def print_groups(store):
Expand Down Expand Up @@ -118,9 +94,9 @@ def generate_options():
elif options.type == "controller":
print_controller(store)
elif options.type == "kafka":
print_kafka(store, options.topic)
print_kafka(store, options.topic, headers_only=True)
elif options.type == "kafka_records":
print_kafka_records(store, options.topic)
print_kafka(store, options.topic, headers_only=False)
elif options.type == "group":
print_groups(store)

Expand Down
Loading