Skip to content

Commit

Permalink
Merge pull request #5294 from bharathv/3142
Browse files Browse the repository at this point in the history
tools/metadata_viewer: Refactor metadata_viewer and add transaction metadata support
  • Loading branch information
bharathv committed Jul 1, 2022
2 parents 52a93cf + 64f1b39 commit 0dec535
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 271 deletions.
16 changes: 0 additions & 16 deletions tools/metadata_viewer/kafka.py

This file was deleted.

2 changes: 2 additions & 0 deletions tools/offline_log_viewer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Offline tool to parse RedPanda disk logs.
Usage: `python viewer.py -h`
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from io import BufferedReader, BytesIO
from model import *
from reader import Reader
from storage import Segment
from storage import Batch, Segment
from storage import BatchType
import datetime


Expand Down Expand Up @@ -166,57 +167,31 @@ def decode_feature_update_action(r):
return cmd


def decode_record(header, record):
def decode_record(batch, record):
ret = {}
ret['type'] = type_str(header)
header = batch.header
ret['type'] = batch.type.name
ret['epoch'] = header.first_ts
ret['offset'] = header.base_offset + record.offset_delta
ret['ts'] = datetime.datetime.utcfromtimestamp(
header.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
ret['data'] = None

if header.type == 2:
if batch.type == BatchType.raft_configuration:
ret['data'] = decode_config(record)
if header.type == 6:
if batch.type == BatchType.topic_management_cmd:
ret['data'] = decode_topic_command(record)
if header.type == 12:
if batch.type == BatchType.user_management_cmd:
ret['data'] = decode_user_command(record)
if header.type == 13:
if batch.type == BatchType.acl_management_cmd:
ret['data'] = decode_acl_command(record)
if header.type == 20:
if header.type == BatchType.cluster_config_cmd:
ret['data'] = decode_config_command(record)
if header.type == 21:
if header.type == BatchType.feature_update:
ret['data'] = decode_feature_command(record)
return ret


def type_str(header):
if header.type == 1:
return "data"
if header.type == 2:
return "configuration"
if header.type == 3:
return "old controller"
if header.type == 4:
return "kv store"
if header.type == 5:
return "checkpoint"
if header.type == 6:
return "topic command"
if header.type == 12:
return "user management command"
if header.type == 13:
return "acl management command"
if header.type == 17:
return "node management command"
if header.type == 20:
return "cluster config management command"
if header.type == 21:
return "feature management command"

return f"unknown {header.type}"


class ControllerLog:
def __init__(self, ntp):
self.ntp = ntp
Expand All @@ -227,4 +202,4 @@ def decode(self):
s = Segment(path)
for b in s:
for r in b:
self.records.append(decode_record(b.header, r))
self.records.append(decode_record(b, r))
52 changes: 52 additions & 0 deletions tools/offline_log_viewer/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from storage import Segment
from enum import Enum
from io import BytesIO
from reader import Reader
import struct


class KafkaControlRecordType(Enum):
"""Valid control record types defined by Kafka, currently only
used for transactional control records.
Keep this in sync with model::control_record_type
"""
tx_abort = 0
tx_commit = 1
unknown = -1


class KafkaLog:
def __init__(self, ntp, headers_only):
self.ntp = ntp
self.headers_only = headers_only

def get_control_record_type(self, key):
rdr = Reader(BytesIO(key))
rdr.skip(2) # skip the 16bit version.
# Encoded as big endian
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16())))
return KafkaControlRecordType(type_rdr.read_int16()).name

def decode(self):
self.results = []
for batch in self.batches():
header = batch.header_dict()
yield header
if not self.headers_only:
records = []
for record in batch:
attrs = header["expanded_attrs"]
is_tx_ctrl = attrs["transactional"] and attrs[
"control_batch"]
record_dict = record.kv_dict()
records.append(record_dict)
if is_tx_ctrl:
record_dict["type"] = self.get_control_record_type(
record.key)
yield record_dict

def batches(self):
for path in self.ntp.segments:
s = Segment(path)
for batch in s:
yield batch
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import struct
from model import *
from reader import Reader
from storage import Header, Record, Segment
from storage import BatchType, Header, Record, Segment
import collections
import datetime

Expand All @@ -15,6 +15,7 @@ class SnapshotBatch:
def __init__(self, header, records):
self.header = header
self.records = records
self.type = BatchType(header[3])

def __iter__(self):
for r in self.records:
Expand Down Expand Up @@ -61,10 +62,10 @@ def from_stream(f):


class KvStoreRecordDecoder:
def __init__(self, record, header, value_is_optional_type):
def __init__(self, record, batch, value_is_optional_type):
self.record = record
self.header = header
self.batch_type = header.type
self.header = batch.header
self.batch_type = batch.type
self.offset_delta = record.offset_delta
self.v_stream = BytesIO(self.record.value)
self.k_stream = BytesIO(self.record.key)
Expand All @@ -85,7 +86,7 @@ def _decode_ks(self, ks):

def decode(self):

assert self.batch_type == 4
assert self.batch_type == BatchType.kvstore
ret = {}
ret['epoch'] = self.header.first_ts
ret['offset'] = self.header.base_offset + self.offset_delta
Expand Down Expand Up @@ -336,7 +337,7 @@ def decode(self):
logger.info(f"snapshot last offset: {snap.last_offset}")
for r in snap.data_batch:
d = KvStoreRecordDecoder(r,
snap.data_batch.header,
snap.data_batch,
value_is_optional_type=False)
self._apply(d.decode())
else:
Expand All @@ -347,7 +348,7 @@ def decode(self):
for batch in s:
for r in batch:
d = KvStoreRecordDecoder(r,
batch.header,
batch,
value_is_optional_type=True)
self._apply(d.decode())

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
from enum import Enum
import os

import struct
Expand Down Expand Up @@ -56,6 +57,11 @@ def __init__(self, length, attrs, timestamp_delta, offset_delta, key,
self.value = value
self.headers = headers

def kv_dict(self):
key = None if self.key == None else self.key.hex()
val = None if self.value == None else self.value.hex()
return {"k": key, "v": val}


class RecordHeader:
def __init__(self, key, value):
Expand Down Expand Up @@ -104,12 +110,60 @@ def __next__(self):
headers)


class BatchType(Enum):
"""Keep this in sync with model/record_batch_types.h"""
raft_data = 1
raft_configuration = 2
controller = 3
kvstore = 4
checkpoint = 5
topic_management_cmd = 6
ghost_batch = 7
id_allocator = 8
tx_prepare = 9
tx_fence = 10
tm_update = 11
user_management_cmd = 12
acl_management_cmd = 13
group_prepare_tx = 14
group_commit_tx = 15
group_abort_tx = 16
node_management_cmd = 17
data_policy_management_cmd = 18
archival_metadata = 19
cluster_config_cmd = 20
feature_update = 21
unknown = -1

@classmethod
def _missing_(e, value):
return e.unknown


class Batch:
class CompressionType(Enum):
none = 0
gzip = 1
snappy = 2
lz4 = 3
zstd = 4
unknown = -1

@classmethod
def _missing_(e, value):
return e.unknown

compression_mask = 0x7
ts_type_mask = 0x8
transactional_mask = 0x10
control_mask = 0x20

def __init__(self, index, header, records):
self.index = index
self.header = header
self.term = None
self.records = records
self.type = BatchType(header[3])

header_crc_bytes = struct.pack(
"<" + HDR_FMT_RP_PREFIX_NO_CRC + HDR_FMT_CRC, *self.header[1:])
Expand All @@ -121,6 +175,20 @@ def __init__(self, index, header, records):
if self.header.crc != crc:
raise CorruptBatchError(self)

def header_dict(self):
header = self.header._asdict()
attrs = header['attrs']
header["type_name"] = self.type.name
header['expanded_attrs'] = {
'compression':
Batch.CompressionType(attrs & Batch.compression_mask).name,
'transactional':
attrs & Batch.transactional_mask == Batch.transactional_mask,
'control_batch': attrs & Batch.control_mask == Batch.control_mask,
'timestamp_type': attrs & Batch.ts_type_mask == Batch.ts_type_mask
}
return header

def last_offset(self):
return self.header.base_offset + self.header.record_count - 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,16 @@ def print_controller(store):
logger.info(json.dumps(ctrl.records, indent=2))


def print_kafka(store, topic):
def print_kafka(store, topic, headers_only):
for ntp in store.ntps:
if ntp.nspace == "kafka":
if ntp.nspace in ["kafka", "kafka_internal"]:
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for header in log.batch_headers():
logger.info(json.dumps(header, indent=2))


def print_kafka_records(store, topic):
for ntp in store.ntps:
if ntp.nspace == "kafka":
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for batch in log.batches():
logger.info(json.dumps(batch.header._asdict(), indent=2))
for record in batch:
logger.info(
json.dumps(
{
"key":
None
if record.key == None else record.key.hex(),
"value":
None if record.value == None else
record.value.hex(),
},
indent=2))
log = KafkaLog(ntp, headers_only=headers_only)
for result in log.decode():
logger.info(json.dumps(result, indent=2))


def print_groups(store):
Expand Down Expand Up @@ -118,9 +94,9 @@ def generate_options():
elif options.type == "controller":
print_controller(store)
elif options.type == "kafka":
print_kafka(store, options.topic)
print_kafka(store, options.topic, headers_only=True)
elif options.type == "kafka_records":
print_kafka_records(store, options.topic)
print_kafka(store, options.topic, headers_only=False)
elif options.type == "group":
print_groups(store)

Expand Down
Loading

0 comments on commit 0dec535

Please sign in to comment.