diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index b8f32a09fae0..54ec9f2fcc88 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -179,4 +179,11 @@ RUN mkdir -p /opt/scripts && \ curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \ chmod +x /opt/scripts/seastar-addr2line +RUN mkdir -p /opt/scripts/offline_log_viewer +COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer + +RUN mkdir -p /opt/scripts/consumer_offsets_recovery +COPY --chown=0:0 tools/consumer_offsets_recovery /opt/scripts/consumer_offsets_recovery +RUN python3 -m pip install --force --no-cache-dir -r /opt/scripts/consumer_offsets_recovery/requirements.txt + CMD service ssh start && tail -f /dev/null diff --git a/tests/docker/Dockerfile.dockerignore b/tests/docker/Dockerfile.dockerignore index 9be2c986be8f..6690f25515e9 100644 --- a/tests/docker/Dockerfile.dockerignore +++ b/tests/docker/Dockerfile.dockerignore @@ -3,4 +3,6 @@ !tests/docker/ssh !tests/setup.py !tests/python -!tests/go \ No newline at end of file +!tests/go +!tools/offline_log_viewer +!tools/consumer_offsets_recovery diff --git a/tests/rptest/clients/consumer_offsets_recovery.py b/tests/rptest/clients/consumer_offsets_recovery.py new file mode 100644 index 000000000000..4ed80afbe219 --- /dev/null +++ b/tests/rptest/clients/consumer_offsets_recovery.py @@ -0,0 +1,46 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.redpanda import RedpandaService + + +class ConsumerOffsetsRecovery: + """ + Wrap tools/consumer_offsets_recovery for use in tests + """ + def __init__(self, redpanda: RedpandaService): + self._redpanda = redpanda + self._cfg_path = '/tmp/cgr.properties' + self._offsets_cache_path = '/tmp/offsets' + + def _cmd(self, partition_count, dry_run): + cmd = "python3 /opt/scripts/consumer_offsets_recovery/main.py" + + cmd += f" --cfg {self._cfg_path}" + cmd += f" --offsets {self._offsets_cache_path}" + cmd += f" -p {partition_count}" + if not dry_run: + cmd += " --execute" + return cmd + + def _render_config(self, node): + properties = {"bootstrap_servers": self._redpanda.brokers()} + content = "" + for k, v in properties.items(): + content += f"{k}={v}\n" + + node.account.create_file(self._cfg_path, content) + + def change_partition_count(self, partition_count, node, dry_run): + node.account.remove(self._offsets_cache_path, allow_fail=True) + node.account.mkdir(self._offsets_cache_path) + self._render_config(node) + cmd = self._cmd(partition_count, dry_run) + output = node.account.ssh_output(cmd, combine_stderr=True) + self._redpanda.logger.info(f"out: {output}") diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py new file mode 100644 index 000000000000..75df420b9b01 --- /dev/null +++ b/tests/rptest/clients/offline_log_viewer.py @@ -0,0 +1,44 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json + + +class OfflineLogViewer: + """ + Wrap tools/offline_log_viewer for use in tests: this is for tests that + want to peek at the structures, but also for validating the tool itself. + """ + def __init__(self, redpanda): + self._redpanda = redpanda + + def _cmd(self, suffix): + viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py" + return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}" + + def read_kvstore(self, node): + cmd = self._cmd("--type kvstore") + kvstore_json = node.account.ssh_output(cmd, combine_stderr=False) + return json.loads(kvstore_json) + + def _json_cmd(self, node, suffix): + cmd = self._cmd(suffix=suffix) + json_out = node.account.ssh_output(cmd, combine_stderr=False) + try: + return json.loads(json_out) + except json.decoder.JSONDecodeError: + # Log the bad output before re-raising + self._redpanda.logger.error(f"Invalid JSON output: {json_out}") + raise + + def read_controller(self, node): + return self._json_cmd(node, "--type controller") + + def read_consumer_offsets(self, node): + return self._json_cmd(node, "--type consumer_offsets") diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index c2f56892a7d0..0d34aed7e2ab 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -371,10 +371,10 @@ def parse_field(field_name, string): static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$") partition_pattern_static_member = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" + "(?P[^\s]+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" ) partition_pattern_dynamic_member = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" + "(?P[^\s]+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" ) def check_lines(lines): diff --git a/tests/rptest/test_suite_ec2.yml b/tests/rptest/test_suite_ec2.yml index 9001c171d560..e11ad789d86b 100644 --- a/tests/rptest/test_suite_ec2.yml +++ b/tests/rptest/test_suite_ec2.yml @@ -13,3 +13,4 @@ ec2: - tests/wasm_redpanda_failure_recovery_test.py # no wasm - tests/wasm_partition_movement_test.py # no wasm - tests/e2e_iam_role_test.py # use static credentials + - tests/consumer_group_recovery_tool_test.py # use script available in dockerfile diff --git a/tests/rptest/tests/consumer_group_recovery_tool_test.py b/tests/rptest/tests/consumer_group_recovery_tool_test.py new file mode 100644 index 000000000000..b8db226f7d82 --- /dev/null +++ b/tests/rptest/tests/consumer_group_recovery_tool_test.py @@ -0,0 +1,111 @@ +# Copyright 2020 Redpanda Data, Inc. +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.clients.consumer_offsets_recovery import ConsumerOffsetsRecovery +from rptest.services.cluster import cluster + +from rptest.clients.rpk import RpkException, RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.kafka_cli_consumer import KafkaCliConsumer +from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.rpk_producer import RpkProducer +from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.tests.redpanda_test import RedpandaTest +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize + + +class ConsumerOffsetsRecoveryToolTest(PreallocNodesTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + self.initial_partition_count = 3 + super(ConsumerOffsetsRecoveryToolTest, self).__init__( + test_ctx, + num_brokers=3, + *args, + # disable leader balancer to make sure that group will not be realoaded because of leadership changes + extra_rp_conf={ + "group_topic_partitions": self.initial_partition_count + }, + node_prealloc_count=1, + **kwargs) + + def describe_all_groups(self): + rpk = RpkTool(self.redpanda) + all_groups = {} + for g in rpk.group_list(): + gd = rpk.group_describe(g) + all_groups[gd.name] = {} + for p in gd.partitions: + all_groups[ + gd.name][f"{p.topic}/{p.partition}"] = p.current_offset + + return all_groups + + @cluster(num_nodes=4) + def test_consumer_offsets_partition_count_change(self): + topic = TopicSpec(partition_count=16, replication_factor=3) + self.client().create_topic([topic]) + msg_size = 1024 + msg_cnt = 10000 + + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + topic.name, + msg_size, + msg_cnt, + custom_node=self.preallocated_nodes) + + producer.start(clean=False) + + wait_until(lambda: producer.produce_status.acked > 10, + timeout_sec=30, + backoff_sec=0.5) + + consumer = KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + topic.name, + msg_size, + readers=3, + nodes=self.preallocated_nodes) + consumer.start(clean=False) + + producer.wait() + consumer.wait() + + assert consumer.consumer_status.validator.valid_reads >= producer.produce_status.acked + + groups_pre_migration = self.describe_all_groups() + + cgr = ConsumerOffsetsRecovery(self.redpanda) + + # execute recovery tool, ask for 16 partitions in consumer offsets topic + # + # this is dry run, expect that partition count didn't change + cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=True) + rpk = RpkTool(self.redpanda) + tp_desc = list(rpk.describe_topic("__consumer_offsets")) + + # check if topic partition count changed + assert len(tp_desc) == self.initial_partition_count + + # now allow the tool to execute + cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=False) + + # check if topic partition count changed + wait_until( + lambda: len(list(rpk.describe_topic("__consumer_offsets"))) == 16, + 20) + + groups_post_migration = self.describe_all_groups() + + assert groups_pre_migration == groups_post_migration diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index 92bbf5e314c4..4397cc47adb3 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -8,6 +8,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.services.cluster import cluster from rptest.clients.rpk import RpkException, RpkTool @@ -32,7 +33,10 @@ def __init__(self, test_ctx, *args, **kwargs): num_brokers=3, *args, # disable leader balancer to make sure that group will not be realoaded because of leadership changes - extra_rp_conf={"enable_leader_balancer": False}, + extra_rp_conf={ + "enable_leader_balancer": False, + "default_topic_replications": 3 + }, **kwargs) def make_consumer_properties(base_properties, instance_id=None): @@ -154,6 +158,32 @@ def test_basic_group_join(self, static_members): c.wait() c.free() + gd = RpkTool(self.redpanda).group_describe(group=group) + viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + consumer_offsets_partitions = viewer.read_consumer_offsets( + node=node) + offsets = {} + groups = set() + for partition in consumer_offsets_partitions: + for r in partition['records']: + self.logger.info(f"{r}") + if r['key']['type'] == 'group_metadata': + groups.add(r['key']['group_id']) + elif r['key']['type'] == 'offset_commit': + tp = f"{r['key']['topic']}/{r['key']['partition']}" + if tp not in offsets: + offsets[tp] = -1 + offsets[tp] = max(r['value']['committed_offset'], + offsets[tp]) + + assert len(groups) == 1 and group in groups + assert all([ + f"{p.topic}/{p.partition}" in offsets + and offsets[f"{p.topic}/{p.partition}"] == p.current_offset + for p in gd.partitions + ]) + @cluster(num_nodes=6) def test_mixed_consumers_join(self): """ diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 7b5a99433427..b5222639d380 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -7,24 +7,108 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.services.cluster import cluster -from rptest.services.admin import Admin -from ducktape.utils.util import wait_until import time +import json +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from requests.exceptions import HTTPError + +from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.services.rpk_producer import RpkProducer from rptest.services.metrics_check import MetricCheck from rptest.services.redpanda import SISettings from rptest.util import wait_for_segments_removal -from ducktape.mark import parametrize +from rptest.services.admin import Admin -def topic_storage_purged(redpanda, topic: str): +def get_kvstore_topic_key_counts(redpanda): + """ + Count the keys in KVStore that relate to Kafka topics: this excludes all + internal topic items: if no Kafka topics exist, this should be zero for + all nodes. + + :returns: dict of Node to integer + """ + + viewer = OfflineLogViewer(redpanda) + + # Find the raft group IDs of internal topics + admin = Admin(redpanda) + internal_group_ids = set() + for ntp in [ + ('redpanda', 'controller', 0), + ('kafka_internal', 'id_allocator', 0), + ]: + namespace, topic, partition = ntp + try: + p = admin.get_partitions(topic=topic, + namespace=namespace, + partition=partition) + except HTTPError as e: + # OK if internal topic doesn't exist (e.g. id_allocator + # doesn't have to exist) + if e.response.status_code != 404: + raise + else: + internal_group_ids.add(p['raft_group_id']) + + result = {} + for n in redpanda.nodes: + kvstore_data = viewer.read_kvstore(node=n) + + excess_keys = [] + for shard, items in kvstore_data.items(): + keys = [i['key'] for i in items] + + for k in keys: + if k['keyspace'] == "cluster": + # Not a per-partition key + continue + + if k['data'].get('group', None) in internal_group_ids: + # One of the internal topics + continue + + if k['data'].get('ntp', {}).get('topic', None) == 'controller': + # Controller storage item + continue + + excess_keys.append(k) + + redpanda.logger.info( + f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}" + ) + + key_count = len(excess_keys) + result[n] = key_count + + return result + + +def topic_storage_purged(redpanda, topic_name): storage = redpanda.storage() - return all(map(lambda n: topic not in n.ns["kafka"].topics, storage.nodes)) + logs_removed = all( + map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes)) + + if not logs_removed: + return False + + # Once logs are removed, also do more expensive inspection of + # kvstore to check that per-partition kvstore contents are + # gone. The user doesn't care about this, but it is important + # to avoid bugs that would cause kvstore to bloat through + # topic creation/destruction cycles. + + topic_key_counts = get_kvstore_topic_key_counts(redpanda) + if any([v > 0 for v in topic_key_counts.values()]): + return False + + return True class TopicDeleteTest(RedpandaTest): @@ -44,7 +128,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) @cluster(num_nodes=3) - def topic_delete_test(self): + @parametrize(with_restart=False) + @parametrize(with_restart=True) + def topic_delete_test(self, with_restart): def produce_until_partitions(): self.kafka_tools.produce(self.topic, 1024, 1024) storage = self.redpanda.storage() @@ -55,6 +141,16 @@ def produce_until_partitions(): backoff_sec=2, err_msg="Expected partition did not materialize") + if with_restart: + # Do a restart to encourage writes and flushes, especially to + # the kvstore. + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + self.kafka_tools.delete_topic(self.topic) try: @@ -294,17 +390,12 @@ def check_compaction(): pass producer.free() - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: topic_name not in n.ns["kafka"].topics, - storage.nodes)) - try: - wait_until(lambda: topic_storage_purged(), - timeout_sec=60, - backoff_sec=2, - err_msg="Topic storage was not removed") + wait_until( + lambda: topic_storage_purged(self.redpanda, topic_name), + timeout_sec=60, + backoff_sec=2, + err_msg="Topic storage was not removed") except: # On errors, dump listing of the storage location diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index 5204a8166eed..2208b5feab29 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -17,6 +17,7 @@ from rptest.services.admin import Admin from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.tests.redpanda_test import RedpandaTest from rptest.tests.end_to_end import EndToEndTest @@ -253,6 +254,15 @@ def _consumer_offsets_present(): assert self._rand_consumer.consumer_status.validator.total_reads >= self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL assert self._cg_consumer.consumer_status.validator.valid_reads >= wrote_at_least + # Validate that the data structures written by a mixture of historical + # versions remain readable by our current debug tools + log_viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + controller_records = log_viewer.read_controller(node=node) + self.logger.info( + f"Read {len(controller_records)} controller records from node {node.name} successfully" + ) + class UpgradeWithWorkloadTest(EndToEndTest): """ diff --git a/tools/consumer_offsets_recovery/config.properties b/tools/consumer_offsets_recovery/config.properties new file mode 100644 index 000000000000..bef3cbb97058 --- /dev/null +++ b/tools/consumer_offsets_recovery/config.properties @@ -0,0 +1,6 @@ +bootstrap_servers=127.0.0.1 +# security +# security_protocol=SASL_PLAINTEXT +# sasl_mechanism=SCRAM-SHA-256 +# sasl_plain_username=admin +# sasl_plain_password=admin diff --git a/tools/consumer_offsets_recovery/main.py b/tools/consumer_offsets_recovery/main.py new file mode 100755 index 000000000000..0a17f7334d20 --- /dev/null +++ b/tools/consumer_offsets_recovery/main.py @@ -0,0 +1,163 @@ +import os +from pathlib import Path + +import subprocess +from kafka import KafkaAdminClient +from kafka import KafkaConsumer +from kafka.admin.new_topic import NewTopic +from kafka import TopicPartition +from kafka import OffsetAndMetadata +from jproperties import Properties +import logging + +logger = logging.getLogger('cg-recovery-tool') + + +def read_offsets(admin: KafkaAdminClient): + + groups_dict = {} + groups = admin.list_consumer_groups() + for g, _ in groups: + logger.info(f"reading group '{g}' offsets") + offsets = admin.list_consumer_group_offsets(group_id=g) + groups_dict[g] = offsets + + return groups_dict + + +def read_config(path): + logger.debug(f"reading configuration from {path}") + cfg = Properties() + with open(path, 'rb') as f: + cfg.load(f) + cfg_dict = {} + for k, v in cfg.items(): + cfg_dict[k] = v.data + + return cfg_dict + + +def seek_to_file(path, cfg, dry_run): + + logger.debug(f"reading offsets file: {path}") + offsets = {} + with open(path, 'r') as f: + group = "" + consumer = None + for i, l in enumerate(f): + if i == 0: + group = l.strip() + if not dry_run: + consumer = KafkaConsumer(group_id=group, **cfg) + logger.info(f"seeking group {group} to file {path}") + continue + if l == "": + continue + + topic, partition, offset = tuple([p.strip() for p in l.split(',')]) + logger.debug( + f"group: {group} partition: {topic}/{partition} offset: {offset}" + ) + tp = TopicPartition(topic=topic, partition=int(partition)) + + offsets[tp] = OffsetAndMetadata(offset=int(offset), metadata='') + if not dry_run: + consumer.commit(offsets=offsets) + + +def offset_file(group): + return f"{group}.offsets" + + +def seek_all(config, path, dry_run): + logger.info(f"reading offsets from: {path}") + for _, _, files in os.walk(path): + for f in files: + seek_to_file(Path(path).joinpath(f), config, dry_run) + + +def query_offsets(offsets_path, cfg): + admin = KafkaAdminClient(**cfg) + + current_offsets = read_offsets(admin) + path = Path(offsets_path) + logger.info(f"storing offsets in {offsets_path}") + path.mkdir(exist_ok=True) + for group_id, offsets in current_offsets.items(): + with open(path.joinpath(offset_file(group_id)), 'w') as f: + f.write(f"{group_id}\n") + for tp, md in offsets.items(): + topic, partition = tp + offset, _ = md + f.write(f"{topic},{partition},{offset}\n") + + +def recreate_topic(partitions, replication_factor, cfg): + admin = KafkaAdminClient(**cfg) + logger.info('removing __consumer_offsets topic') + + admin.delete_topics(["__consumer_offsets"]) + + new_topic = NewTopic(name="__consumer_offsets", + num_partitions=partitions, + topic_configs={ + "cleanup.policy": "compact", + "retention.bytes": -1, + "retention.ms": -1, + }, + replication_factor=replication_factor) + logger.info('recreating __consumer_offsets topic') + admin.create_topics(new_topics=[new_topic]) + + +def main(): + import argparse + + def generate_options(): + parser = argparse.ArgumentParser( + description='Redpanda group recovery tool') + parser.add_argument('--cfg', + type=str, + help='config file path', + required=True) + parser.add_argument('-o', + "--offsets-path", + type=str, + default="./offsets") + parser.add_argument('-v', "--verbose", action="store_true") + parser.add_argument('-s', "--skip-query", action="store_true") + parser.add_argument('-p', "--target-partitions", type=int, default=16) + parser.add_argument('-e', "--execute", action="store_true") + parser.add_argument('-r', "--replication-factor", type=int, default=3) + + return parser + + parser = generate_options() + options, _ = parser.parse_known_args() + logging.basicConfig( + format="%(asctime)s %(name)-20s %(levelname)-8s %(message)s") + + if options.verbose: + logging.basicConfig(level="DEBUG") + logger.setLevel(level="DEBUG") + else: + logging.basicConfig(level="ERROR") + logger.setLevel(level="INFO") + + logger.info(f"starting group recovery tool with options: {options}") + cfg = read_config(options.cfg) + + logger.info(f"configuration: {cfg}") + + if not options.skip_query: + query_offsets(options.offsets_path, cfg) + + if options.execute: + recreate_topic(options.target_partitions, options.replication_factor, + cfg) + + seek_all(cfg, options.offsets_path, dry_run=not options.execute) + + +if __name__ == '__main__': + main() diff --git a/tools/consumer_offsets_recovery/requirements.txt b/tools/consumer_offsets_recovery/requirements.txt new file mode 100644 index 000000000000..ecc258802d08 --- /dev/null +++ b/tools/consumer_offsets_recovery/requirements.txt @@ -0,0 +1,2 @@ +kafka-python +jproperties diff --git a/tools/offline_log_viewer/consumer_offsets.py b/tools/offline_log_viewer/consumer_offsets.py new file mode 100644 index 000000000000..7e1666f909bf --- /dev/null +++ b/tools/offline_log_viewer/consumer_offsets.py @@ -0,0 +1,123 @@ +import base64 +from io import BytesIO +from model import * +from reader import Endianness, Reader +from storage import Segment +import datetime + + +def decode_key_type(v): + if v == 0 or v == 1: + return "offset_commit" + elif v == 2: + return "group_metadata" + + return "unknown" + + +def decode_member_proto(rdr): + ret = {} + ret['name'] = rdr.read_string() + ret['metadata'] = rdr.read_iobuf().hex() + return ret + + +def decode_member(rdr): + ret = {} + ret['v'] = rdr.read_int16() + ret['member_id'] = rdr.read_kafka_string() + ret['instance_id'] = rdr.read_kafka_optional_string() + ret['client_id'] = rdr.read_kafka_string() + ret['client_host'] = rdr.read_kafka_string() + ret['rebalance_timeout'] = rdr.read_int32() + ret['session_timeout'] = rdr.read_int32() + ret['subscription'] = base64.b64encode( + rdr.read_kafka_bytes()).decode('utf-8') + ret['assignment'] = base64.b64encode( + rdr.read_kafka_bytes()).decode('utf-8') + + return ret + + +def decode_metadata(rdr): + ret = {} + ret['version'] = rdr.read_int16() + ret['protocol_type'] = rdr.read_kafka_string() + ret['generation_id'] = rdr.read_int32() + ret['protocol_name'] = rdr.read_kafka_optional_string() + ret['leader'] = rdr.read_kafka_optional_string() + ret['state_timestamp'] = rdr.read_int64() + ret['member_state'] = rdr.read_vector(decode_member) + return ret + + +def decode_key(key_rdr): + ret = {} + v = key_rdr.read_int16() + ret['type'] = decode_key_type(v) + ret['group_id'] = key_rdr.read_kafka_string() + if ret['type'] == 'offset_commit': + ret['topic'] = key_rdr.read_kafka_string() + ret['partition'] = key_rdr.read_int32() + + return ret + + +def decode_offset_commit(v_rdr): + ret = {} + ret['version'] = v_rdr.read_int16() + ret['committed_offset'] = v_rdr.read_int64() + if ret['version'] >= 3: + ret['leader_epoch'] = v_rdr.read_int32() + + ret['committed_metadata'] = v_rdr.read_kafka_string() + ret['commit_timestamp'] = v_rdr.read_int64() + if ret['version'] == 1: + ret['expiry_timestamp'] = v_rdr.read_int64() + + return ret + + +def decode_record(hdr, r): + v = {} + v['epoch'] = hdr.first_ts + v['offset'] = hdr.base_offset + r.offset_delta + v['ts'] = datetime.datetime.utcfromtimestamp( + hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S') + k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN) + + v['key'] = decode_key(k_rdr) + + if v['key']['type'] == "group_metadata": + if r.value: + rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN) + v['value'] = decode_metadata(rdr) + else: + v['value'] = 'tombstone' + elif v['key']['type'] == "offset_commit": + if r.value: + rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN) + v['value'] = decode_offset_commit(rdr) + else: + v['value'] = 'tombstone' + + return v + + +class OffsetsLog: + def __init__(self, ntp): + self.ntp = ntp + self.records = [] + + def decode(self): + paths = [] + for path in self.ntp.segments: + paths.append(path) + paths.sort() + for path in paths: + s = Segment(path) + for b in s: + if b.header.type != 1: + continue + for r in b: + self.records.append(decode_record(b.header, r)) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index d290106dd982..59016dd01dc8 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,14 +1,223 @@ -from io import BufferedReader, BytesIO +import logging +from io import BytesIO from model import * from reader import Reader -from storage import Batch, Segment +from storage import Segment from storage import BatchType import datetime +logger = logging.getLogger('controller') + + +def read_remote_topic_properties_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + "remote_revision": rdr.read_int64(), + "remote_partition_count": rdr.read_int32(), + }) + + +def read_topic_properties_serde(rdr: Reader, version): + + topic_properties = { + 'compression': rdr.read_optional(Reader.read_serde_enum), + 'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum), + 'compaction_strategy': rdr.read_optional(Reader.read_serde_enum), + 'timestamp_type': rdr.read_optional(Reader.read_serde_enum), + 'segment_size': rdr.read_optional(Reader.read_uint64), + 'retention_bytes': rdr.read_tristate(Reader.read_uint64), + 'retention_duration': rdr.read_tristate(Reader.read_uint64), + 'recovery': rdr.read_optional(Reader.read_bool), + 'shadow_indexing': rdr.read_optional(Reader.read_serde_enum), + } + + # introduced for remote read replicas + if version >= 1: + topic_properties |= { + 'read_replica': + rdr.read_optional(Reader.read_bool), + 'read_replica_bucket': + rdr.read_optional(Reader.read_string), + 'remote_topic_properties': + rdr.read_optional(read_remote_topic_properties_serde) + } + if version >= 2: + topic_properties |= { + 'batch_max_bytes': rdr.read_optional(Reader.read_uint32), + } + + if version >= 3: + topic_properties |= { + 'retention_local_target_bytes': + rdr.read_tristate(Reader.read_uint64), + 'retention_local_target_ms': rdr.read_tristate(Reader.read_uint64), + 'remote_delete': rdr.read_bool() + } + return topic_properties + + +def read_topic_assignment_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'partitions': + rdr.read_int32(), + 'replication_factor': + rdr.read_int16(), + 'properties': + rdr.read_envelope(read_topic_properties_serde, + max_version=3), + }, 1), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda ir, _: { + 'group': + ir.read_int64(), + 'id': + ir.read_int32(), + 'replicas': + ir.read_serde_vector(lambda iir: { + 'node_id': iir.read_int32(), + 'shard': iir.read_uint32(), + }), + })), + }, 1) + + +def read_inc_update_op_serde(rdr: Reader): + v = rdr.read_serde_enum() + if -1 < v < 3: + return ['none', 'set', 'remove'][v] + return 'error' + + +def read_property_update_serde(rdr: Reader, type_reader): + return rdr.read_envelope(lambda rdr, _: { + 'value': type_reader(rdr), + 'op': read_inc_update_op_serde(rdr), + }) + + +def read_incremental_topic_update_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'compression': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'cleanup_policy_bitflags': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'compaction_strategy': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'timestamp_type': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'segment_size': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_uint64)), + 'retention_bytes': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)), + 'retention_duration': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_int64)), + 'shadow_indexing': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + }) + + +def read_create_partitions_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'new_total_partition_count': + rdr.read_int32(), + 'custom_assignments': + rdr.read_serde_vector(lambda r: r.read_serde_vector( + Reader.read_int32)), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda rdr, _: { + 'group': rdr.read_int64(), + 'id': rdr.read_int32(), + 'replicas': rdr.read_serde_vector(read_broker_shard), + })), + }) + + +def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_string'] = 'create_topic' + cmd['key'] = {} + cmd['key']['namespace'] = k_rdr.read_string() + cmd['key']['topic'] = k_rdr.read_string() + cmd |= read_topic_assignment_serde(rdr) + elif cmd['type'] == 1: + cmd['type_string'] = 'delete_topic' + cmd['namespace'] = rdr.read_string() + cmd['topic'] = rdr.read_string() + k_rdr.read_string() + k_rdr.read_string() + elif cmd['type'] == 2: + cmd['type_string'] = 'update_partitions' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 3: + cmd['type_string'] = 'finish_partitions_update' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 4: + cmd['type_string'] = 'update_topic_properties' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['update'] = read_incremental_topic_update_serde(rdr) + elif cmd['type'] == 5: + cmd['type_string'] = 'create_partitions' + cmd |= read_create_partitions_serde(rdr) + elif cmd['type'] == 6: + cmd['type_string'] = 'create_non_replicable_topic' + cmd['topic'] = k_rdr.read_envelope( + lambda k_rdr, _: { + 'source': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + 'name': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + }) + elif cmd['type'] == 7: + cmd['type_string'] = 'cancel_moving_partition_replicas' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()}) + return cmd -def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) + +def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -68,14 +277,44 @@ def decode_topic_command(record): return cmd -def decode_config(record): +def decode_topic_command(record): rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + either_ald_or_serde = rdr.peek_int8() + assert either_ald_or_serde >= -1, "unsupported serialization format" + if either_ald_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return decode_topic_command_serde(k_rdr, rdr) + else: + return decode_topic_command_adl(k_rdr, rdr) + + +def decode_config(k_rdr: Reader, rdr: Reader): return read_raft_config(rdr) -def decode_user_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_user_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + cmd['str_type'] = decode_user_cmd_type(cmd['type']) + + if cmd['type'] == 5 or cmd['type'] == 7: + cmd['user'] = k_rdr.read_string() + cmd['cred'] = rdr.read_envelope( + lambda rdr, _: { + # obfuscate secrets + 'salt': obfuscate_secret(rdr.read_iobuf().hex()), + 'server_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'stored_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'iterations': rdr.read_int32(), + }) + elif cmd['type'] == 6: + cmd['user'] = k_rdr.read_string() + + return cmd + + +def decode_user_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_user_cmd_type(cmd['type']) @@ -98,9 +337,115 @@ def decode_user_command(record): return cmd -def decode_acl_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def read_acl_binding_serde(k_rdr: Reader): + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'resource': decode_acl_resource(k_rdr.read_serde_enum()), + 'name': k_rdr.read_string(), + 'pattern': decode_acl_pattern_type(k_rdr.read_serde_enum()) + }), + 'entry': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + }), + 'host': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + }), + 'operation': + decode_acl_operation(k_rdr.read_serde_enum()), + 'permission': + decode_acl_permission(k_rdr.read_serde_enum()), + }), + }) + + +def decode_serialized_pattern_type(v): + if 0 <= v <= 2: + return ['literal', 'prefixed', 'match'][v] + return 'error' + + +def read_acl_binding_filter_serde(k_rdr: Reader): + # pattern class does not really use serde + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': { + 'resource': + k_rdr.read_optional(lambda k_rdr: decode_acl_resource( + k_rdr.read_serde_enum())), + 'name': + k_rdr.read_envelope(Reader.read_string), + 'pattern': + k_rdr.read_optional(lambda k_rdr: + decode_serialized_pattern_type( + k_rdr.read_serde_enum())), + }, + 'acl': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + })), + 'host': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + })), + 'operation': + k_rdr.read_optional(lambda k_rdr: decode_acl_operation( + k_rdr.read_serde_enum())), + 'permission': + k_rdr.read_optional(lambda k_rdr: decode_acl_permission( + k_rdr.read_serde_enum())), + }), + }) + + +def decode_acl_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + cmd['str_type'] = decode_acls_cmd_type(cmd['type']) + if cmd['type'] == 8: + cmd['acls'] = k_rdr.read_envelope( + lambda k_rdr, _: + {'bindings': k_rdr.read_serde_vector(read_acl_binding_serde)}) + elif cmd['type'] == 9: + cmd |= k_rdr.read_envelope(lambda k_rdr, _: { + 'filters': + k_rdr.read_serde_vector(read_acl_binding_filter_serde) + }) + + return cmd + + +def decode_acl_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_acls_cmd_type(cmd['type']) @@ -114,15 +459,47 @@ def decode_acl_command(record): def read_config_kv(reader): - k = reader.read_string() v = reader.read_string() return (k, v) -def decode_config_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_config_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_name'] = 'config_delta' + cmd['version'] = k_rdr.read_int64() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'upsert': + rdr.read_serde_vector(lambda rdr: rdr.read_envelope( + lambda rdr, _: { + 'k': rdr.read_string(), + 'v': rdr.read_string() + })), + 'remove': + rdr.read_serde_vector(Reader.read_string), + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'config_status' + cmd['node_id'] = k_rdr.read_int32() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'status': + rdr.read_envelope( + lambda rdr, _: { + 'node': rdr.read_int32(), + 'version': rdr.read_int64(), + 'restart': rdr.read_bool(), + 'unknown': rdr.read_serde_vector(Reader.read_string), + 'invalid': rdr.read_serde_vector(Reader.read_string), + }), + }) + return cmd + + +def decode_config_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -145,63 +522,139 @@ def decode_config_command(record): return cmd -def decode_feature_command(record): +def decode_action_t(v): + if 1 <= v <= 3: + return ['', 'complete_preparing', 'activate', 'deactivate'][v] + return v + + +def decode_feature_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + rdr.read_int8() + if cmd['type'] == 0: + cmd['type_name'] = 'feature_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'cluster_version': + k_rdr.read_int64(), + 'actions': + k_rdr.read_serde_vector(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'feature_name': k_rdr.read_string(), + 'action': decode_action_t(k_rdr.read_serde_enum()), + })) + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'license_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'redpanda_license': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'format_version': k_rdr.read_uint8(), + 'type': k_rdr.read_serde_enum(), + 'organization': k_rdr.read_string(), + 'expiry': k_rdr.read_int64(), + }) + }) + return cmd + + +def decode_feature_command_adl(k_rdr: Reader, rdr: Reader): def decode_feature_update_action(r): action = {} action['v'] = r.read_int8() action['feature_name'] = r.read_string() - action['action'] = r.read_int16() + action['action'] = decode_action_t(r.read_int16()) return action - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) cmd = {} cmd['type'] = rdr.read_int8() + rdr.read_int8() if cmd['type'] == 0: cmd['type_name'] = 'feature_update' cmd['v'] = k_rdr.read_int8() cmd['cluster_version'] = k_rdr.read_int64() cmd['actions'] = k_rdr.read_vector(decode_feature_update_action) + return cmd + + +def decode_node_management_command(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd |= { + 'type_string': 'decommission_node', + 'node_id': k_rdr.read_int32() + } + rdr.read_int8() elif cmd['type'] == 1: - cmd['type_name'] = 'license_update' - k_rdr.read_envelope() - cmd['format_v'] = k_rdr.read_uint8() - cmd['license_type'] = k_rdr.read_uint8() - cmd['org'] = k_rdr.read_string() - cmd['expiry'] = k_rdr.read_string() - else: - cmd['type_name'] = 'unknown' + cmd |= { + 'type_string': 'recommission_node', + 'node_id': k_rdr.read_int32() + } + rdr.read_int8() + elif cmd['type'] == 2: + cmd |= { + 'type_string': 'finish_reallocations', + 'node_id': k_rdr.read_int32() + } + rdr.read_int8() + elif cmd['type'] == 3: + cmd |= { + 'type_string': 'maintenance_mode', + 'node_id': k_rdr.read_int32(), + 'enabled': rdr.read_bool() + } + elif cmd['type'] == 4: + cmd |= { + 'type_string': 'register_node_uuid', + 'uuid': k_rdr.read_uuid(), + 'id': rdr.read_optional(lambda r: r.read_int32()) + } return cmd -def decode_cluster_bootstrap_command(record): - def decode_user_and_credential(r): - user_cred = {} - user_cred['username'] = r.read_string() - cmd['salt'] = obfuscate_secret(r.read_iobuf().hex()) - cmd['server_key'] = obfuscate_secret(r.read_iobuf().hex()) - cmd['stored_key'] = obfuscate_secret(r.read_iobuf().hex()) +def decode_user_and_credential(rdr: Reader): + return rdr.read_envelope( + lambda r, _: { + 'username': r.read_string(), + 'salt': obfuscate_secret(r.read_iobuf().hex()), + 'server_key': obfuscate_secret(r.read_iobuf().hex()), + 'stored_key': obfuscate_secret(r.read_iobuf().hex()), + }) - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) + +def decode_cluster_bootstrap_command(k_rdr, rdr): cmd = {} - rdr.read_envelope() + rdr.skip(1) + k_rdr.read_int8() cmd['type'] = rdr.read_int8() - if cmd['type'] == 0 or cmd['type'] == 5: # TODO: remove 5 + if cmd['type'] == 0: cmd['type_name'] = 'bootstrap_cluster' - cmd['key'] = k_rdr.read_int8() - cmd['v'] = rdr.read_int8() - if cmd['v'] == 0: - cmd['cluster_uuid'] = ''.join([ - f'{rdr.read_uint8():02x}' + ('-' if k in [3, 5, 7, 9] else '') - for k in range(16) - ]) - cmd['bootstrap_user_cred'] = rdr.read_optional( - decode_user_and_credential) + cmd |= rdr.read_envelope( + lambda r, _: { + 'cluster_uuid': + r.read_uuid(), + 'bootstrap_user_cred': + r.read_optional(decode_user_and_credential), + 'nodes_by_uuid': + r.read_serde_map(Reader.read_uuid, Reader.read_int32) + }) return cmd +def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return serde_fn(k_rdr, rdr) + else: + return adl_fn(k_rdr, rdr) + + def decode_record(batch, record, bin_dump: bool): ret = {} header = batch.header @@ -215,20 +668,40 @@ def decode_record(batch, record, bin_dump: bool): ret['value_dump'] = record.value.__str__() ret['data'] = None + rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + if batch.type == BatchType.raft_configuration: - ret['data'] = decode_config(record) + ret['data'] = decode_config(k_rdr, rdr) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_topic_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_topic_command_adl, + decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_user_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_user_command_adl, + decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_acl_command(record) - if header.type == BatchType.cluster_config_cmd: - ret['data'] = decode_config_command(record) - if header.type == BatchType.feature_update: - ret['data'] = decode_feature_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_acl_command_adl, + decode_acl_command_serde) + if batch.type == BatchType.cluster_config_cmd: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_config_command_adl, + decode_config_command_serde) + if batch.type == BatchType.feature_update: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_feature_command_adl, + decode_feature_command_serde) + if batch.type == BatchType.node_management_cmd: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_node_management_command, + decode_node_management_command) if batch.type == BatchType.cluster_bootstrap_cmd: - ret['data'] = decode_cluster_bootstrap_command(record) + ret['data'] = decode_cluster_bootstrap_command(k_rdr, rdr) + + k_unread = k_rdr.remaining() + v_unread = rdr.remaining() + if k_unread != 0 or v_unread != 0: + ret['unread'] = {'key': k_unread, 'value': v_unread} + logger.error(f"@{ret['type']} unread bytes. k:{k_unread} v:{v_unread}") return ret @@ -243,4 +716,5 @@ def decode(self, bin_dump: bool): s = Segment(path) for b in s: for r in b: + dec = decode_record(b, r, bin_dump) self.records.append(decode_record(b, r, bin_dump)) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index c9cd3417e85b..3295f1487df2 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -97,7 +97,7 @@ def decode(self): keyspace = k_rdr.read_int8() - key_buf = self.k_stream.read() + key_buf = k_rdr.stream.read() ret['key_space'] = self._decode_ks(keyspace) ret['key_buf'] = key_buf @@ -216,6 +216,8 @@ def decode_offset_translator_key(k): def decode_storage_key_name(key_type): if key_type == 0: return "start offset" + elif key_type == 1: + return "clean segment" return "unknown" @@ -329,12 +331,20 @@ def _apply(self, entry): if entry['data'] is not None: self.kv[key] = entry['data'] + else: + try: + del self.kv[key] + except KeyError: + # Missing key, that's okay for a deletion + pass def decode(self): + snapshot_offset = None if os.path.exists(f"{self.ntp.path}/snapshot"): snap = KvSnapshot(f"{self.ntp.path}/snapshot") snap.decode() logger.info(f"snapshot last offset: {snap.last_offset}") + snapshot_offset = snap.last_offset for r in snap.data_batch: d = KvStoreRecordDecoder(r, snap.data_batch, @@ -347,6 +357,10 @@ def decode(self): s = Segment(path) for batch in s: for r in batch: + offset = batch.header.base_offset + r.offset_delta + if snapshot_offset is not None and offset <= snapshot_offset: + continue + d = KvStoreRecordDecoder(r, batch, value_is_optional_type=True) diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 11d7cecb16da..996be4b3d7fa 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -74,13 +74,26 @@ def read_broker(rdr): return br +def read_configuration_update(rdr): + return { + 'replicas_to_add': rdr.read_vector(read_vnode), + 'replicas_to_remove': rdr.read_vector(read_vnode) + } + + def read_raft_config(rdr): cfg = {} + cfg['version'] = rdr.read_int8() cfg['brokers'] = rdr.read_vector(read_broker) cfg['current_config'] = read_group_nodes(rdr) cfg['prev_config'] = rdr.read_optional(read_group_nodes) cfg['revision'] = rdr.read_int64() + + if cfg['version'] >= 4: + cfg['configuration_update'] = rdr.read_optional( + lambda ordr: read_configuration_update(ordr)) + return cfg diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 0447f4d29ace..95a8c304206d 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -1,5 +1,7 @@ +from enum import Enum import struct import collections +from io import BufferedReader, BytesIO SERDE_ENVELOPE_FORMAT = "' + return f"{ch}{str}" + def read_int8(self): - return struct.unpack("