diff --git a/tools/offline_log_viewer/kafka.py b/tools/offline_log_viewer/kafka.py index 124fab0ba0b6..00b4b78e0069 100644 --- a/tools/offline_log_viewer/kafka.py +++ b/tools/offline_log_viewer/kafka.py @@ -1,16 +1,52 @@ from storage import Segment +from enum import Enum +from io import BytesIO +from reader import Reader +import struct + + +class KafkaControlRecordType(Enum): + """Valid control record types defined by Kafka, currently only + used for transactional control records. + Keep this in sync with model::control_record_type + """ + tx_abort = 0 + tx_commit = 1 + unknown = -1 class KafkaLog: - def __init__(self, ntp): + def __init__(self, ntp, headers_only): self.ntp = ntp + self.headers_only = headers_only + + def get_control_record_type(self, key): + rdr = Reader(BytesIO(key)) + rdr.skip(2) # skip the 16bit version. + # Encoded as big endian + type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16()))) + return KafkaControlRecordType(type_rdr.read_int16()).name - def batch_headers(self): + def decode(self): + self.results = [] for batch in self.batches(): - yield batch.header_dict() + header = batch.header_dict() + yield header + if not self.headers_only: + records = [] + for record in batch: + attrs = header["expanded_attrs"] + is_tx_ctrl = attrs["transactional"] and attrs[ + "control_batch"] + record_dict = record.kv_dict() + records.append(record_dict) + if is_tx_ctrl: + record_dict["type"] = self.get_control_record_type( + record.key) + yield record_dict def batches(self): for path in self.ntp.segments: s = Segment(path) for batch in s: - yield batch \ No newline at end of file + yield batch diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index 7b261280e1a2..b0ff646d0d04 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -57,6 +57,11 @@ def __init__(self, length, attrs, timestamp_delta, offset_delta, key, self.value = value self.headers = headers + def kv_dict(self): + key = None if self.key == None else self.key.hex() + val = None if self.value == None else self.value.hex() + return {"k": key, "v": val} + class RecordHeader: def __init__(self, key, value): diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 46f72bf7a527..cddc928af476 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -32,40 +32,16 @@ def print_controller(store): logger.info(json.dumps(ctrl.records, indent=2)) -def print_kafka(store, topic): +def print_kafka(store, topic, headers_only): for ntp in store.ntps: - if ntp.nspace.startswith("kafka"): + if ntp.nspace in ["kafka", "kafka_internal"]: if topic and ntp.topic != topic: continue - log = KafkaLog(ntp) logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') - for header in log.batch_headers(): - logger.info(json.dumps(header, indent=2)) - - -def print_kafka_records(store, topic): - for ntp in store.ntps: - if ntp.nspace.startswith("kafka"): - if topic and ntp.topic != topic: - continue - - log = KafkaLog(ntp) - logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') - for batch in log.batches(): - logger.info(json.dumps(batch.header_dict(), indent=2)) - for record in batch: - logger.info( - json.dumps( - { - "key": - None - if record.key == None else record.key.hex(), - "value": - None if record.value == None else - record.value.hex(), - }, - indent=2)) + log = KafkaLog(ntp, headers_only=headers_only) + for result in log.decode(): + logger.info(json.dumps(result, indent=2)) def print_groups(store): @@ -118,9 +94,9 @@ def generate_options(): elif options.type == "controller": print_controller(store) elif options.type == "kafka": - print_kafka(store, options.topic) + print_kafka(store, options.topic, headers_only=True) elif options.type == "kafka_records": - print_kafka_records(store, options.topic) + print_kafka(store, options.topic, headers_only=False) elif options.type == "group": print_groups(store)