Skip to content

Commit

Permalink
tools[log_viewer]: Populate txn control markers
Browse files Browse the repository at this point in the history
Moves the logic in KafkaLog like other methods,
cleans up duplicate code.

Introduces KafkaControlRecordType

For transactional && control records we extract
the exact control record type(commit/abort).
  • Loading branch information
bharathv committed Jun 30, 2022
1 parent 8d96c0c commit 1b6d55f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 35 deletions.
48 changes: 44 additions & 4 deletions tools/offline_log_viewer/kafka.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,56 @@
from storage import Segment
from enum import Enum
from io import BytesIO
from reader import Reader
import struct


class KafkaControlRecordType(Enum):
"""Valid control record types defined by Kafka, currently only
used for transactional control records.
Keep this in sync with model::control_record_type
"""
tx_abort = 0
tx_commit = 1
unknown = -1


class KafkaLog:
def __init__(self, ntp):
def __init__(self, ntp, headers_only):
self.ntp = ntp
self.headers_only = headers_only

def batch_headers(self):
def get_control_record_type(self, key):
rdr = Reader(BytesIO(key))
rdr.skip(2) # skip the 16bit version.
# Encoded as big endian
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16())))
return KafkaControlRecordType(type_rdr.read_int16()).name

def decode(self):
self.results = []
for batch in self.batches():
yield batch.header_dict()
header = batch.header_dict()
yield header
if not self.headers_only:
records = []
for record in batch:
attrs = header["expanded_attrs"]
is_tx_ctrl = attrs["transactional"] and attrs[
"control_batch"]
record_dict = record.kv_dict()
records.append(record_dict)
if is_tx_ctrl:
record_dict["type"] = self.get_control_record_type(
record.key)
yield record_dict

def result(self):
assert self.results
return self.results

def batches(self):
for path in self.ntp.segments:
s = Segment(path)
for batch in s:
yield batch
yield batch
5 changes: 5 additions & 0 deletions tools/offline_log_viewer/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def __init__(self, length, attrs, timestamp_delta, offset_delta, key,
self.value = value
self.headers = headers

def kv_dict(self):
key = None if self.key == None else self.key.hex()
val = None if self.value == None else self.value.hex()
return {"k": key, "v": val}


class RecordHeader:
def __init__(self, key, value):
Expand Down
38 changes: 7 additions & 31 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,16 @@ def print_controller(store):
logger.info(json.dumps(ctrl.records, indent=2))


def print_kafka(store, topic):
def print_kafka(store, topic, headers_only):
for ntp in store.ntps:
if ntp.nspace.startswith("kafka"):
if ntp.nspace in ["kafka", "kafka_internal"]:
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for header in log.batch_headers():
logger.info(json.dumps(header, indent=2))


def print_kafka_records(store, topic):
for ntp in store.ntps:
if ntp.nspace.startswith("kafka"):
if topic and ntp.topic != topic:
continue

log = KafkaLog(ntp)
logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
for batch in log.batches():
logger.info(json.dumps(batch.header_dict(), indent=2))
for record in batch:
logger.info(
json.dumps(
{
"key":
None
if record.key == None else record.key.hex(),
"value":
None if record.value == None else
record.value.hex(),
},
indent=2))
log = KafkaLog(ntp, headers_only=headers_only)
for result in log.decode():
logger.info(json.dumps(result, indent=2))


def print_groups(store):
Expand Down Expand Up @@ -118,9 +94,9 @@ def generate_options():
elif options.type == "controller":
print_controller(store)
elif options.type == "kafka":
print_kafka(store, options.topic)
print_kafka(store, options.topic, headers_only=True)
elif options.type == "kafka_records":
print_kafka_records(store, options.topic)
print_kafka(store, options.topic, headers_only=False)
elif options.type == "group":
print_groups(store)

Expand Down

0 comments on commit 1b6d55f

Please sign in to comment.