From 60e28ae7b49a1c59e2edbbd9e8df879d932f3c3d Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 29 Jun 2022 15:03:47 -0700 Subject: [PATCH 1/6] tools[log_viewer]: cleanup metadata_viewer * renames to offline_log_viewer since it is not limited to metadata anymore and we can parse actual record data. * tools/storage.py is obselete. --- tools/offline_log_viewer/README.md | 2 + .../consumer_groups.py | 0 .../controller.py | 0 .../kafka.py | 0 .../kvstore.py | 0 .../model.py | 0 .../reader.py | 0 .../requirements.txt | 0 .../storage.py | 0 .../viewer.py | 0 tools/storage.py | 180 ------------------ 11 files changed, 2 insertions(+), 180 deletions(-) create mode 100644 tools/offline_log_viewer/README.md rename tools/{metadata_viewer => offline_log_viewer}/consumer_groups.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/controller.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/kafka.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/kvstore.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/model.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/reader.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/requirements.txt (100%) rename tools/{metadata_viewer => offline_log_viewer}/storage.py (100%) rename tools/{metadata_viewer => offline_log_viewer}/viewer.py (100%) delete mode 100755 tools/storage.py diff --git a/tools/offline_log_viewer/README.md b/tools/offline_log_viewer/README.md new file mode 100644 index 000000000000..acd999d8b88f --- /dev/null +++ b/tools/offline_log_viewer/README.md @@ -0,0 +1,2 @@ +Offline tool to parse RedPanda disk logs. +Usage: `python viewer.py -h` \ No newline at end of file diff --git a/tools/metadata_viewer/consumer_groups.py b/tools/offline_log_viewer/consumer_groups.py similarity index 100% rename from tools/metadata_viewer/consumer_groups.py rename to tools/offline_log_viewer/consumer_groups.py diff --git a/tools/metadata_viewer/controller.py b/tools/offline_log_viewer/controller.py similarity index 100% rename from tools/metadata_viewer/controller.py rename to tools/offline_log_viewer/controller.py diff --git a/tools/metadata_viewer/kafka.py b/tools/offline_log_viewer/kafka.py similarity index 100% rename from tools/metadata_viewer/kafka.py rename to tools/offline_log_viewer/kafka.py diff --git a/tools/metadata_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py similarity index 100% rename from tools/metadata_viewer/kvstore.py rename to tools/offline_log_viewer/kvstore.py diff --git a/tools/metadata_viewer/model.py b/tools/offline_log_viewer/model.py similarity index 100% rename from tools/metadata_viewer/model.py rename to tools/offline_log_viewer/model.py diff --git a/tools/metadata_viewer/reader.py b/tools/offline_log_viewer/reader.py similarity index 100% rename from tools/metadata_viewer/reader.py rename to tools/offline_log_viewer/reader.py diff --git a/tools/metadata_viewer/requirements.txt b/tools/offline_log_viewer/requirements.txt similarity index 100% rename from tools/metadata_viewer/requirements.txt rename to tools/offline_log_viewer/requirements.txt diff --git a/tools/metadata_viewer/storage.py b/tools/offline_log_viewer/storage.py similarity index 100% rename from tools/metadata_viewer/storage.py rename to tools/offline_log_viewer/storage.py diff --git a/tools/metadata_viewer/viewer.py b/tools/offline_log_viewer/viewer.py similarity index 100% rename from tools/metadata_viewer/viewer.py rename to tools/offline_log_viewer/viewer.py diff --git a/tools/storage.py b/tools/storage.py deleted file mode 100755 index dba95ed2bcce..000000000000 --- a/tools/storage.py +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import collections -import glob -import re -import struct -import crc32c -import logging - -logger = logging.getLogger('rp') - -# https://docs.python.org/3.8/library/struct.html#format-strings -# -# redpanda header prefix: -# - little endian encoded -# - batch size, base offset, type crc -# -# note that the crc that is stored is the crc reported by kafka which happens to -# be computed over the big endian encoding of the same data. thus to verify the -# crc we need to rebuild part of the header in big endian before adding to crc. -HDR_FMT_RP_PREFIX_NO_CRC = "iqbI" -HDR_FMT_RP_PREFIX = "" + HDR_FMT_CRC, *self.header[5:]) - - @staticmethod - def from_file(f, index): - data = f.read(HEADER_SIZE) - if len(data) == HEADER_SIZE: - header = Header(*struct.unpack(HDR_FMT_RP, data)) - # it appears that we may have hit a truncation point if all of the - # fields in the header are zeros - if all(map(lambda v: v == 0, header)): - return - records_size = header.batch_size - HEADER_SIZE - data = f.read(records_size) - assert len(data) == records_size - return Batch(index, header, data) - assert len(data) == 0 - - -class Segment: - def __init__(self, path): - self.path = path - self.__read_batches() - - def __read_batches(self): - index = 1 - with open(self.path, "rb") as f: - while True: - batch = Batch.from_file(f, index) - if not batch: - break - index += 1 - - def dump(self): - if self.batches: - next_offset = self.batches[0].header.base_offset - index = 0 - for batch in self.batches: - if index < 3 or index == (len(self.batches) - 1): - logger.info(batch.header.base_offset) - if index == 3 and len(self.batches) > 4: - logger.info("...") - if batch.header.base_offset != next_offset: - logger.info("hole discovered at offset {} expected {}".format( - batch.header.base_offset, next_offset)) - break - next_offset = batch.last_offset() + 1 - index += 1 - - -class Ntp: - def __init__(self, base_dir, namespace, topic, partition, ntp_id): - self.base_dir = base_dir - self.nspace = namespace - self.topic = topic - self.partition = partition - self.ntp_id = ntp_id - self.path = os.path.join(self.base_dir, self.nspace, self.topic, - f"{self.partition}_{self.ntp_id}") - pattern = os.path.join(self.path, "*.log") - self.segments = glob.iglob(pattern) - - def __str__(self): - return "{0.nspace}/{0.topic}/{0.partition}_{0.ntp_id}".format(self) - - -class Store: - def __init__(self, base_dir): - self.base_dir = os.path.abspath(base_dir) - self.ntps = [] - self.__search() - - def __search(self): - dirs = os.walk(self.base_dir) - for ntpd in (p[0] for p in dirs if not p[1]): - head, part_ntp_id = os.path.split(ntpd) - [part, ntp_id] = part_ntp_id.split("_") - head, topic = os.path.split(head) - head, nspace = os.path.split(head) - assert head == self.base_dir - ntp = Ntp(self.base_dir, nspace, topic, int(part), int(ntp_id)) - self.ntps.append(ntp) - - -def main(): - import argparse - - def generate_options(): - parser = argparse.ArgumentParser(description='Redpanda log analyzer') - parser.add_argument('--path', - type=str, - help='Path to the log desired to be analyzed') - return parser - - parser = generate_options() - options, program_options = parser.parse_known_args() - logger.info("%s" % options) - if not os.path.exists(options.path): - logger.error("Path doesn't exist %s" % options.path) - sys.exit(1) - store = Store(options.path) - for ntp in store.ntps: - for path in ntp.segments: - try: - s = Segment(path) - except CorruptBatchError as e: - logger.error( - "corruption detected in batch {} of segment: {}".format( - e.batch.index, path)) - logger.error("header of corrupt batch: {}".format( - e.batch.header)) - sys.exit(1) - logger.info("successfully decoded segment: {}".format(path)) - - -if __name__ == '__main__': - main() From e9e2ece1091b9c274a6cb8f41576a9d7c5ac4cbb Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 29 Jun 2022 15:53:32 -0700 Subject: [PATCH 2/6] tools[log_viewer]: Expand batch attrs Includes compression type and whether the batch is transactional/control type. --- tools/offline_log_viewer/kafka.py | 2 +- tools/offline_log_viewer/storage.py | 30 +++++++++++++++++++++++++++++ tools/offline_log_viewer/viewer.py | 2 +- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/kafka.py b/tools/offline_log_viewer/kafka.py index 9506189c19bc..124fab0ba0b6 100644 --- a/tools/offline_log_viewer/kafka.py +++ b/tools/offline_log_viewer/kafka.py @@ -7,7 +7,7 @@ def __init__(self, ntp): def batch_headers(self): for batch in self.batches(): - yield batch.header._asdict() + yield batch.header_dict() def batches(self): for path in self.ntp.segments: diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index f1bde99eb9a7..27628fe6ce31 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -105,6 +105,23 @@ def __next__(self): class Batch: + class CompressionType(Enum): + none = 0 + gzip = 1 + snappy = 2 + lz4 = 3 + zstd = 4 + unknown = -1 + + @classmethod + def _missing_(e, value): + return e.unknown + + compression_mask = 0x7 + ts_type_mask = 0x8 + transactional_mask = 0x10 + control_mask = 0x20 + def __init__(self, index, header, records): self.index = index self.header = header @@ -121,6 +138,19 @@ def __init__(self, index, header, records): if self.header.crc != crc: raise CorruptBatchError(self) + def header_dict(self): + header = self.header._asdict() + attrs = header['attrs'] + header['expanded_attrs'] = { + 'compression': + Batch.CompressionType(attrs & Batch.compression_mask).name, + 'transactional': + attrs & Batch.transactional_mask == Batch.transactional_mask, + 'control_batch': attrs & Batch.control_mask == Batch.control_mask, + 'timestamp_type': attrs & Batch.ts_type_mask == Batch.ts_type_mask + } + return header + def last_offset(self): return self.header.base_offset + self.header.record_count - 1 diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index eda3d1ca5772..264d34039b9e 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -53,7 +53,7 @@ def print_kafka_records(store, topic): log = KafkaLog(ntp) logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') for batch in log.batches(): - logger.info(json.dumps(batch.header._asdict(), indent=2)) + logger.info(json.dumps(batch.header_dict(), indent=2)) for record in batch: logger.info( json.dumps( From 9b51495172e325a2a9658b65dbb261b1a8c01992 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 29 Jun 2022 19:22:27 -0700 Subject: [PATCH 3/6] tools[log_viewer]: support kafka_internal ns Some topics are in kafka_internal ns like tx (transaction coordinator) and id_allocator. --- tools/offline_log_viewer/viewer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 264d34039b9e..46f72bf7a527 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -34,7 +34,7 @@ def print_controller(store): def print_kafka(store, topic): for ntp in store.ntps: - if ntp.nspace == "kafka": + if ntp.nspace.startswith("kafka"): if topic and ntp.topic != topic: continue @@ -46,7 +46,7 @@ def print_kafka(store, topic): def print_kafka_records(store, topic): for ntp in store.ntps: - if ntp.nspace == "kafka": + if ntp.nspace.startswith("kafka"): if topic and ntp.topic != topic: continue From 8d96c0cd2b9baf94051be026013dfb90feed64e6 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 29 Jun 2022 21:35:49 -0700 Subject: [PATCH 4/6] tools[log_viewer]: Map record header types to enum Example: INFO:viewer:{ "header_crc": 1126531443, "batch_size": 90, "base_offset": 3, "type": 9, "crc": 1766141844, "attrs": 32, "delta": 0, "first_ts": 1656554802864, "max_ts": 1656554802864, "producer_id": 1002, "producer_epoch": 1, "base_seq": -1, "record_count": 1, "type_name": "tx_prepare", <==== "expanded_attrs": { "compression": "none", "transactional": false, "control_batch": true, "timestamp_type": false } } --- tools/offline_log_viewer/storage.py | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index 27628fe6ce31..7b261280e1a2 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -1,4 +1,5 @@ import collections +from enum import Enum import os import struct @@ -104,6 +105,36 @@ def __next__(self): headers) +class BatchType(Enum): + """Keep this in sync with model/record_batch_types.h""" + raft_data = 1 + raft_configuration = 2 + controller = 3 + kvstore = 4 + checkpoint = 5 + topic_management_cmd = 6 + ghost_batch = 7 + id_allocator = 8 + tx_prepare = 9 + tx_fence = 10 + tm_update = 11 + user_management_cmd = 12 + acl_management_cmd = 13 + group_prepare_tx = 14 + group_commit_tx = 15 + group_abort_tx = 16 + node_management_cmd = 17 + data_policy_management_cmd = 18 + archival_metadata = 19 + cluster_config_cmd = 20 + feature_update = 21 + unknown = -1 + + @classmethod + def _missing_(e, value): + return e.unknown + + class Batch: class CompressionType(Enum): none = 0 @@ -141,6 +172,7 @@ def __init__(self, index, header, records): def header_dict(self): header = self.header._asdict() attrs = header['attrs'] + header["type_name"] = BatchType(header["type"]).name header['expanded_attrs'] = { 'compression': Batch.CompressionType(attrs & Batch.compression_mask).name, From 45f4288f0e45b4a31b4f053947aa155654a41844 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 29 Jun 2022 22:23:15 -0700 Subject: [PATCH 5/6] tools[log_viewer]: Populate txn control markers Moves the logic in KafkaLog like other methods, cleans up duplicate code. Introduces KafkaControlRecordType For transactional && control records we extract the exact control record type(commit/abort). --- tools/offline_log_viewer/kafka.py | 44 ++++++++++++++++++++++++++--- tools/offline_log_viewer/storage.py | 5 ++++ tools/offline_log_viewer/viewer.py | 38 +++++-------------------- 3 files changed, 52 insertions(+), 35 deletions(-) diff --git a/tools/offline_log_viewer/kafka.py b/tools/offline_log_viewer/kafka.py index 124fab0ba0b6..00b4b78e0069 100644 --- a/tools/offline_log_viewer/kafka.py +++ b/tools/offline_log_viewer/kafka.py @@ -1,16 +1,52 @@ from storage import Segment +from enum import Enum +from io import BytesIO +from reader import Reader +import struct + + +class KafkaControlRecordType(Enum): + """Valid control record types defined by Kafka, currently only + used for transactional control records. + Keep this in sync with model::control_record_type + """ + tx_abort = 0 + tx_commit = 1 + unknown = -1 class KafkaLog: - def __init__(self, ntp): + def __init__(self, ntp, headers_only): self.ntp = ntp + self.headers_only = headers_only + + def get_control_record_type(self, key): + rdr = Reader(BytesIO(key)) + rdr.skip(2) # skip the 16bit version. + # Encoded as big endian + type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16()))) + return KafkaControlRecordType(type_rdr.read_int16()).name - def batch_headers(self): + def decode(self): + self.results = [] for batch in self.batches(): - yield batch.header_dict() + header = batch.header_dict() + yield header + if not self.headers_only: + records = [] + for record in batch: + attrs = header["expanded_attrs"] + is_tx_ctrl = attrs["transactional"] and attrs[ + "control_batch"] + record_dict = record.kv_dict() + records.append(record_dict) + if is_tx_ctrl: + record_dict["type"] = self.get_control_record_type( + record.key) + yield record_dict def batches(self): for path in self.ntp.segments: s = Segment(path) for batch in s: - yield batch \ No newline at end of file + yield batch diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index 7b261280e1a2..b0ff646d0d04 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -57,6 +57,11 @@ def __init__(self, length, attrs, timestamp_delta, offset_delta, key, self.value = value self.headers = headers + def kv_dict(self): + key = None if self.key == None else self.key.hex() + val = None if self.value == None else self.value.hex() + return {"k": key, "v": val} + class RecordHeader: def __init__(self, key, value): diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 46f72bf7a527..cddc928af476 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -32,40 +32,16 @@ def print_controller(store): logger.info(json.dumps(ctrl.records, indent=2)) -def print_kafka(store, topic): +def print_kafka(store, topic, headers_only): for ntp in store.ntps: - if ntp.nspace.startswith("kafka"): + if ntp.nspace in ["kafka", "kafka_internal"]: if topic and ntp.topic != topic: continue - log = KafkaLog(ntp) logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') - for header in log.batch_headers(): - logger.info(json.dumps(header, indent=2)) - - -def print_kafka_records(store, topic): - for ntp in store.ntps: - if ntp.nspace.startswith("kafka"): - if topic and ntp.topic != topic: - continue - - log = KafkaLog(ntp) - logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') - for batch in log.batches(): - logger.info(json.dumps(batch.header_dict(), indent=2)) - for record in batch: - logger.info( - json.dumps( - { - "key": - None - if record.key == None else record.key.hex(), - "value": - None if record.value == None else - record.value.hex(), - }, - indent=2)) + log = KafkaLog(ntp, headers_only=headers_only) + for result in log.decode(): + logger.info(json.dumps(result, indent=2)) def print_groups(store): @@ -118,9 +94,9 @@ def generate_options(): elif options.type == "controller": print_controller(store) elif options.type == "kafka": - print_kafka(store, options.topic) + print_kafka(store, options.topic, headers_only=True) elif options.type == "kafka_records": - print_kafka_records(store, options.topic) + print_kafka(store, options.topic, headers_only=False) elif options.type == "group": print_groups(store) From 64f1b3987ac67374607d347b554f19de5b6a4f16 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 1 Jul 2022 08:28:58 -0700 Subject: [PATCH 6/6] tools/log_viewer: Switch to BatchType enum Switch to the enum in various other places for readability. --- tools/offline_log_viewer/controller.py | 49 +++++++------------------- tools/offline_log_viewer/kvstore.py | 15 ++++---- tools/offline_log_viewer/storage.py | 3 +- 3 files changed, 22 insertions(+), 45 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 4ea789057851..ab8f45d60c8e 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,7 +1,8 @@ from io import BufferedReader, BytesIO from model import * from reader import Reader -from storage import Segment +from storage import Batch, Segment +from storage import BatchType import datetime @@ -166,57 +167,31 @@ def decode_feature_update_action(r): return cmd -def decode_record(header, record): +def decode_record(batch, record): ret = {} - ret['type'] = type_str(header) + header = batch.header + ret['type'] = batch.type.name ret['epoch'] = header.first_ts ret['offset'] = header.base_offset + record.offset_delta ret['ts'] = datetime.datetime.utcfromtimestamp( header.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S') ret['data'] = None - if header.type == 2: + if batch.type == BatchType.raft_configuration: ret['data'] = decode_config(record) - if header.type == 6: + if batch.type == BatchType.topic_management_cmd: ret['data'] = decode_topic_command(record) - if header.type == 12: + if batch.type == BatchType.user_management_cmd: ret['data'] = decode_user_command(record) - if header.type == 13: + if batch.type == BatchType.acl_management_cmd: ret['data'] = decode_acl_command(record) - if header.type == 20: + if header.type == BatchType.cluster_config_cmd: ret['data'] = decode_config_command(record) - if header.type == 21: + if header.type == BatchType.feature_update: ret['data'] = decode_feature_command(record) return ret -def type_str(header): - if header.type == 1: - return "data" - if header.type == 2: - return "configuration" - if header.type == 3: - return "old controller" - if header.type == 4: - return "kv store" - if header.type == 5: - return "checkpoint" - if header.type == 6: - return "topic command" - if header.type == 12: - return "user management command" - if header.type == 13: - return "acl management command" - if header.type == 17: - return "node management command" - if header.type == 20: - return "cluster config management command" - if header.type == 21: - return "feature management command" - - return f"unknown {header.type}" - - class ControllerLog: def __init__(self, ntp): self.ntp = ntp @@ -227,4 +202,4 @@ def decode(self): s = Segment(path) for b in s: for r in b: - self.records.append(decode_record(b.header, r)) + self.records.append(decode_record(b, r)) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 24f920dd102d..c9cd3417e85b 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -4,7 +4,7 @@ import struct from model import * from reader import Reader -from storage import Header, Record, Segment +from storage import BatchType, Header, Record, Segment import collections import datetime @@ -15,6 +15,7 @@ class SnapshotBatch: def __init__(self, header, records): self.header = header self.records = records + self.type = BatchType(header[3]) def __iter__(self): for r in self.records: @@ -61,10 +62,10 @@ def from_stream(f): class KvStoreRecordDecoder: - def __init__(self, record, header, value_is_optional_type): + def __init__(self, record, batch, value_is_optional_type): self.record = record - self.header = header - self.batch_type = header.type + self.header = batch.header + self.batch_type = batch.type self.offset_delta = record.offset_delta self.v_stream = BytesIO(self.record.value) self.k_stream = BytesIO(self.record.key) @@ -85,7 +86,7 @@ def _decode_ks(self, ks): def decode(self): - assert self.batch_type == 4 + assert self.batch_type == BatchType.kvstore ret = {} ret['epoch'] = self.header.first_ts ret['offset'] = self.header.base_offset + self.offset_delta @@ -336,7 +337,7 @@ def decode(self): logger.info(f"snapshot last offset: {snap.last_offset}") for r in snap.data_batch: d = KvStoreRecordDecoder(r, - snap.data_batch.header, + snap.data_batch, value_is_optional_type=False) self._apply(d.decode()) else: @@ -347,7 +348,7 @@ def decode(self): for batch in s: for r in batch: d = KvStoreRecordDecoder(r, - batch.header, + batch, value_is_optional_type=True) self._apply(d.decode()) diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index b0ff646d0d04..39b8ebb5405b 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -163,6 +163,7 @@ def __init__(self, index, header, records): self.header = header self.term = None self.records = records + self.type = BatchType(header[3]) header_crc_bytes = struct.pack( "<" + HDR_FMT_RP_PREFIX_NO_CRC + HDR_FMT_CRC, *self.header[1:]) @@ -177,7 +178,7 @@ def __init__(self, index, header, records): def header_dict(self): header = self.header._asdict() attrs = header['attrs'] - header["type_name"] = BatchType(header["type"]).name + header["type_name"] = self.type.name header['expanded_attrs'] = { 'compression': Batch.CompressionType(attrs & Batch.compression_mask).name,