diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index debf1434a34d..2f342000abce 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -179,4 +179,7 @@ RUN mkdir -p /opt/scripts && \ curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \ chmod +x /opt/scripts/seastar-addr2line +RUN mkdir -p /opt/scripts/offline_log_viewer +COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer + CMD service ssh start && tail -f /dev/null diff --git a/tests/docker/Dockerfile.dockerignore b/tests/docker/Dockerfile.dockerignore index 9be2c986be8f..8d0cc79a40a1 100644 --- a/tests/docker/Dockerfile.dockerignore +++ b/tests/docker/Dockerfile.dockerignore @@ -3,4 +3,5 @@ !tests/docker/ssh !tests/setup.py !tests/python -!tests/go \ No newline at end of file +!tests/go +!tools/offline_log_viewer diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py new file mode 100644 index 000000000000..71ba113ccccf --- /dev/null +++ b/tests/rptest/clients/offline_log_viewer.py @@ -0,0 +1,41 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json + + +class OfflineLogViewer: + """ + Wrap tools/offline_log_viewer for use in tests: this is for tests that + want to peek at the structures, but also for validating the tool itself. + """ + def __init__(self, redpanda): + self._redpanda = redpanda + + def _cmd(self, suffix): + viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py" + return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}" + + def read_kvstore(self, node): + cmd = self._cmd("--type kvstore") + kvstore_json = node.account.ssh_output(cmd, combine_stderr=False) + return json.loads(kvstore_json) + + def read_controller(self, node): + cmd = self._cmd("--type controller") + controller_json = node.account.ssh_output(cmd, combine_stderr=False) + try: + return json.loads(controller_json) + except json.decoder.JSONDecodeError: + # Log the bad output before re-raising + self._redpanda.logger.error( + f"Invalid JSON output: {controller_json}") + import time + time.sleep(3600) + raise diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 7b5a99433427..b5222639d380 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -7,24 +7,108 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.services.cluster import cluster -from rptest.services.admin import Admin -from ducktape.utils.util import wait_until import time +import json +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from requests.exceptions import HTTPError + +from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.services.rpk_producer import RpkProducer from rptest.services.metrics_check import MetricCheck from rptest.services.redpanda import SISettings from rptest.util import wait_for_segments_removal -from ducktape.mark import parametrize +from rptest.services.admin import Admin -def topic_storage_purged(redpanda, topic: str): +def get_kvstore_topic_key_counts(redpanda): + """ + Count the keys in KVStore that relate to Kafka topics: this excludes all + internal topic items: if no Kafka topics exist, this should be zero for + all nodes. + + :returns: dict of Node to integer + """ + + viewer = OfflineLogViewer(redpanda) + + # Find the raft group IDs of internal topics + admin = Admin(redpanda) + internal_group_ids = set() + for ntp in [ + ('redpanda', 'controller', 0), + ('kafka_internal', 'id_allocator', 0), + ]: + namespace, topic, partition = ntp + try: + p = admin.get_partitions(topic=topic, + namespace=namespace, + partition=partition) + except HTTPError as e: + # OK if internal topic doesn't exist (e.g. id_allocator + # doesn't have to exist) + if e.response.status_code != 404: + raise + else: + internal_group_ids.add(p['raft_group_id']) + + result = {} + for n in redpanda.nodes: + kvstore_data = viewer.read_kvstore(node=n) + + excess_keys = [] + for shard, items in kvstore_data.items(): + keys = [i['key'] for i in items] + + for k in keys: + if k['keyspace'] == "cluster": + # Not a per-partition key + continue + + if k['data'].get('group', None) in internal_group_ids: + # One of the internal topics + continue + + if k['data'].get('ntp', {}).get('topic', None) == 'controller': + # Controller storage item + continue + + excess_keys.append(k) + + redpanda.logger.info( + f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}" + ) + + key_count = len(excess_keys) + result[n] = key_count + + return result + + +def topic_storage_purged(redpanda, topic_name): storage = redpanda.storage() - return all(map(lambda n: topic not in n.ns["kafka"].topics, storage.nodes)) + logs_removed = all( + map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes)) + + if not logs_removed: + return False + + # Once logs are removed, also do more expensive inspection of + # kvstore to check that per-partition kvstore contents are + # gone. The user doesn't care about this, but it is important + # to avoid bugs that would cause kvstore to bloat through + # topic creation/destruction cycles. + + topic_key_counts = get_kvstore_topic_key_counts(redpanda) + if any([v > 0 for v in topic_key_counts.values()]): + return False + + return True class TopicDeleteTest(RedpandaTest): @@ -44,7 +128,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) @cluster(num_nodes=3) - def topic_delete_test(self): + @parametrize(with_restart=False) + @parametrize(with_restart=True) + def topic_delete_test(self, with_restart): def produce_until_partitions(): self.kafka_tools.produce(self.topic, 1024, 1024) storage = self.redpanda.storage() @@ -55,6 +141,16 @@ def produce_until_partitions(): backoff_sec=2, err_msg="Expected partition did not materialize") + if with_restart: + # Do a restart to encourage writes and flushes, especially to + # the kvstore. + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + self.kafka_tools.delete_topic(self.topic) try: @@ -294,17 +390,12 @@ def check_compaction(): pass producer.free() - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: topic_name not in n.ns["kafka"].topics, - storage.nodes)) - try: - wait_until(lambda: topic_storage_purged(), - timeout_sec=60, - backoff_sec=2, - err_msg="Topic storage was not removed") + wait_until( + lambda: topic_storage_purged(self.redpanda, topic_name), + timeout_sec=60, + backoff_sec=2, + err_msg="Topic storage was not removed") except: # On errors, dump listing of the storage location diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index c90795d3f970..204f2cce078e 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -17,6 +17,7 @@ from rptest.services.admin import Admin from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.tests.redpanda_test import RedpandaTest from rptest.tests.end_to_end import EndToEndTest @@ -259,6 +260,15 @@ def _consumer_offsets_present(): assert self._rand_consumer.consumer_status.validator.total_reads >= self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL assert self._cg_consumer.consumer_status.validator.valid_reads >= wrote_at_least + # Validate that the data structures written by a mixture of historical + # versions remain readable by our current debug tools + log_viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + controller_records = log_viewer.read_controller(node=node) + self.logger.info( + f"Read {len(controller_records)} controller records from node {node.name} successfully" + ) + class UpgradeWithWorkloadTest(EndToEndTest): """ diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index d290106dd982..b68d53f89a04 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,14 +1,199 @@ -from io import BufferedReader, BytesIO +import logging +from io import BytesIO from model import * from reader import Reader -from storage import Batch, Segment +from storage import Segment from storage import BatchType import datetime +logger = logging.getLogger('controller') + + +def read_topic_properties_serde(rdr: Reader, version): + assert 0 <= version <= 1 + topic_properties = { + 'compression': rdr.read_optional(Reader.read_serde_enum), + 'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum), + 'compaction_strategy': rdr.read_optional(Reader.read_serde_enum), + 'timestamp_type': rdr.read_optional(Reader.read_serde_enum), + 'segment_size': rdr.read_optional(Reader.read_uint64), + 'retention_bytes': rdr.read_tristate(Reader.read_uint64), + 'retention_duration': rdr.read_tristate(Reader.read_uint64), + 'recovery': rdr.read_optional(Reader.read_bool), + 'shadow_indexing': rdr.read_optional(Reader.read_serde_enum), + } + + # introduced for remote read replicas + if version == 1: + topic_properties |= { + 'read_replica': + rdr.read_optional(Reader.read_bool), + 'read_replica_bucket': + rdr.read_optional(Reader.read_string), + 'remote_topic_properties': + rdr.read_optional( + lambda r: { + 'remote_revision': r.read_int64(), + 'remote_partition_count': r.read_int32() + }), + } + return topic_properties + + +def read_topic_assignment_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'partitions': + rdr.read_int32(), + 'replication_factor': + rdr.read_int16(), + 'properties': + rdr.read_envelope(read_topic_properties_serde, + max_version=1), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda r, _: { + 'group': + r.read_int64(), + 'id': + r.read_int32(), + 'replicas': + r.read_serde_vector(lambda r: { + 'node_id': r.read_int32(), + 'shard': r.read_uint32(), + }), + })), + }) + + +def read_inc_update_op_serde(rdr: Reader): + v = rdr.read_serde_enum() + if -1 < v < 3: + return ['none', 'set', 'remove'][v] + return 'error' + + +def read_property_update_serde(rdr: Reader, type_reader): + return rdr.read_envelope(lambda rdr, _: { + 'value': type_reader(rdr), + 'op': read_inc_update_op_serde(rdr), + }) + + +def read_incremental_topic_update_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'compression': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'cleanup_policy_bitflags': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'compaction_strategy': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'timestamp_type': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'segment_size': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_uint64)), + 'retention_bytes': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)), + 'retention_duration': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_int64)), + 'shadow_indexing': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + }) + + +def read_create_partitions_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'new_total_partition_count': + rdr.read_int32(), + 'custom_assignments': + rdr.read_serde_vector(lambda r: r.read_serde_vector( + Reader.read_int32)), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda rdr, _: { + 'group': rdr.read_int64(), + 'id': rdr.read_int32(), + 'replicas': rdr.read_serde_vector(read_broker_shard), + })), + }) + + +def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_string'] = 'create_topic' + # FIXME: this fails to read messages written on tip of dev ead54dda + #cmd |= read_topic_assignment_serde(rdr) + elif cmd['type'] == 1: + cmd['type_string'] = 'delete_topic' + cmd['namespace'] = rdr.read_string() + cmd['topic'] = rdr.read_string() + elif cmd['type'] == 2: + cmd['type_string'] = 'update_partitions' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 3: + cmd['type_string'] = 'finish_partitions_update' + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 4: + cmd['type_string'] = 'update_topic_properties' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['update'] = read_incremental_topic_update_serde(rdr) + elif cmd['type'] == 5: + cmd['type_string'] = 'create_partitions' + cmd |= read_create_partitions_serde(rdr) + elif cmd['type'] == 6: + cmd['type_string'] = 'create_non_replicable_topic' + cmd['topic'] = k_rdr.read_envelope( + lambda k_rdr, _: { + 'source': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + 'name': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + }) + elif cmd['type'] == 7: + cmd['type_string'] = 'cancel_moving_partition_replicas' + cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()}) + return cmd -def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) + +def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -68,14 +253,44 @@ def decode_topic_command(record): return cmd -def decode_config(record): +def decode_topic_command(record): rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + either_ald_or_serde = rdr.peek_int8() + assert either_ald_or_serde >= -1, "unsupported serialization format" + if either_ald_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return decode_topic_command_serde(k_rdr, rdr) + else: + return decode_topic_command_adl(k_rdr, rdr) + + +def decode_config(k_rdr: Reader, rdr: Reader): return read_raft_config(rdr) -def decode_user_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_user_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + cmd['str_type'] = decode_user_cmd_type(cmd['type']) + + if cmd['type'] == 5 or cmd['type'] == 7: + cmd['user'] = k_rdr.read_string() + cmd['cred'] = rdr.read_envelope( + lambda rdr, _: { + # obfuscate secrets + 'salt': obfuscate_secret(rdr.read_iobuf().hex()), + 'server_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'stored_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'iterations': rdr.read_int32(), + }) + elif cmd['type'] == 6: + cmd['user'] = k_rdr.read_string() + + return cmd + + +def decode_user_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_user_cmd_type(cmd['type']) @@ -98,9 +313,115 @@ def decode_user_command(record): return cmd -def decode_acl_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def read_acl_binding_serde(k_rdr: Reader): + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'resource': decode_acl_resource(k_rdr.read_serde_enum()), + 'name': k_rdr.read_string(), + 'pattern': decode_acl_pattern_type(k_rdr.read_serde_enum()) + }), + 'entry': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + }), + 'host': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + }), + 'operation': + decode_acl_operation(k_rdr.read_serde_enum()), + 'permission': + decode_acl_permission(k_rdr.read_serde_enum()), + }), + }) + + +def decode_serialized_pattern_type(v): + if 0 <= v <= 2: + return ['literal', 'prefixed', 'match'][v] + return 'error' + + +def read_acl_binding_filter_serde(k_rdr: Reader): + # pattern class does not really use serde + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': { + 'resource': + k_rdr.read_optional(lambda k_rdr: decode_acl_resource( + k_rdr.read_serde_enum())), + 'name': + k_rdr.read_envelope(Reader.read_string), + 'pattern': + k_rdr.read_optional(lambda k_rdr: + decode_serialized_pattern_type( + k_rdr.read_serde_enum())), + }, + 'acl': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + })), + 'host': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + })), + 'operation': + k_rdr.read_optional(lambda k_rdr: decode_acl_operation( + k_rdr.read_serde_enum())), + 'permission': + k_rdr.read_optional(lambda k_rdr: decode_acl_permission( + k_rdr.read_serde_enum())), + }), + }) + + +def decode_acl_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + cmd['str_type'] = decode_acls_cmd_type(cmd['type']) + if cmd['type'] == 8: + cmd['acls'] = k_rdr.read_envelope( + lambda k_rdr, _: + {'bindings': k_rdr.read_serde_vector(read_acl_binding_serde)}) + elif cmd['type'] == 9: + cmd |= k_rdr.read_envelope(lambda k_rdr, _: { + 'filters': + k_rdr.read_serde_vector(read_acl_binding_filter_serde) + }) + + return cmd + + +def decode_acl_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_acls_cmd_type(cmd['type']) @@ -114,15 +435,47 @@ def decode_acl_command(record): def read_config_kv(reader): - k = reader.read_string() v = reader.read_string() return (k, v) -def decode_config_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_config_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_name'] = 'config_delta' + cmd['version'] = k_rdr.read_int64() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'upsert': + rdr.read_serde_vector(lambda rdr: rdr.read_envelope( + lambda rdr, _: { + 'k': rdr.read_string(), + 'v': rdr.read_string() + })), + 'remove': + rdr.read_serde_vector(Reader.read_string), + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'config_status' + cmd['node_id'] = k_rdr.read_int32() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'status': + rdr.read_envelope( + lambda rdr, _: { + 'node': rdr.read_int32(), + 'version': rdr.read_int64(), + 'restart': rdr.read_bool(), + 'unknown': rdr.read_serde_vector(Reader.read_string), + 'invalid': rdr.read_serde_vector(Reader.read_string), + }), + }) + return cmd + + +def decode_config_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -145,16 +498,51 @@ def decode_config_command(record): return cmd -def decode_feature_command(record): +def decode_action_t(v): + if 1 <= v <= 3: + return ['', 'complete_preparing', 'activate', 'deactivate'][v] + return v + + +def decode_feature_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd['type_name'] = 'feature_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'cluster_version': + k_rdr.read_int64(), + 'actions': + k_rdr.read_serde_vector(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'feature_name': k_rdr.read_string(), + 'action': decode_action_t(k_rdr.read_serde_enum()), + })) + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'license_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'redpanda_license': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'format_version': k_rdr.read_uint8(), + 'type': k_rdr.read_serde_enum(), + 'organization': k_rdr.read_string(), + 'expiry': k_rdr.read_int64(), + }) + }) + return cmd + + +def decode_feature_command_adl(k_rdr: Reader, rdr: Reader): def decode_feature_update_action(r): action = {} action['v'] = r.read_int8() action['feature_name'] = r.read_string() - action['action'] = r.read_int16() + action['action'] = decode_action_t(r.read_int16()) return action - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -162,15 +550,32 @@ def decode_feature_update_action(r): cmd['v'] = k_rdr.read_int8() cmd['cluster_version'] = k_rdr.read_int64() cmd['actions'] = k_rdr.read_vector(decode_feature_update_action) + return cmd + + +def decode_node_management_command(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd |= { + 'type_string': 'decommission_node', + 'node_id': k_rdr.read_int32() + } elif cmd['type'] == 1: - cmd['type_name'] = 'license_update' - k_rdr.read_envelope() - cmd['format_v'] = k_rdr.read_uint8() - cmd['license_type'] = k_rdr.read_uint8() - cmd['org'] = k_rdr.read_string() - cmd['expiry'] = k_rdr.read_string() - else: - cmd['type_name'] = 'unknown' + cmd |= { + 'type_string': 'recommission_node', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 2: + cmd |= { + 'type_string': 'finish_reallocations', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 3: + cmd |= { + 'type_string': 'maintenance_mode', + 'node_id': k_rdr.read_int32(), + 'enabled': rdr.read_bool() + } return cmd @@ -202,6 +607,17 @@ def decode_user_and_credential(r): return cmd +def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return serde_fn(k_rdr, rdr) + else: + return adl_fn(k_rdr, rdr) + + def decode_record(batch, record, bin_dump: bool): ret = {} header = batch.header @@ -215,21 +631,42 @@ def decode_record(batch, record, bin_dump: bool): ret['value_dump'] = record.value.__str__() ret['data'] = None + rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + if batch.type == BatchType.raft_configuration: - ret['data'] = decode_config(record) + ret['data'] = decode_config(k_rdr, rdr) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_topic_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_topic_command_adl, + decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_user_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_user_command_adl, + decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_acl_command(record) - if header.type == BatchType.cluster_config_cmd: - ret['data'] = decode_config_command(record) - if header.type == BatchType.feature_update: - ret['data'] = decode_feature_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_acl_command_adl, + decode_acl_command_serde) + if batch.type == BatchType.cluster_config_cmd: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_config_command_adl, + decode_config_command_serde) + if batch.type == BatchType.feature_update: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_feature_command_adl, + decode_feature_command_serde) + if batch.type == BatchType.node_management_cmd: + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_node_management_command, + decode_node_management_command) + # TODO: serde for cluster_bootstrap_cmd if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) + k_unread = k_rdr.remaining() + v_unread = rdr.remaining() + if k_unread != 0 or v_unread != 0: + ret['unread'] = {'key': k_unread, 'value': v_unread} + logger.error(f"@{ret['type']} unread bytes. k:{k_unread} v:{v_unread}") + return ret diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index c9cd3417e85b..3295f1487df2 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -97,7 +97,7 @@ def decode(self): keyspace = k_rdr.read_int8() - key_buf = self.k_stream.read() + key_buf = k_rdr.stream.read() ret['key_space'] = self._decode_ks(keyspace) ret['key_buf'] = key_buf @@ -216,6 +216,8 @@ def decode_offset_translator_key(k): def decode_storage_key_name(key_type): if key_type == 0: return "start offset" + elif key_type == 1: + return "clean segment" return "unknown" @@ -329,12 +331,20 @@ def _apply(self, entry): if entry['data'] is not None: self.kv[key] = entry['data'] + else: + try: + del self.kv[key] + except KeyError: + # Missing key, that's okay for a deletion + pass def decode(self): + snapshot_offset = None if os.path.exists(f"{self.ntp.path}/snapshot"): snap = KvSnapshot(f"{self.ntp.path}/snapshot") snap.decode() logger.info(f"snapshot last offset: {snap.last_offset}") + snapshot_offset = snap.last_offset for r in snap.data_batch: d = KvStoreRecordDecoder(r, snap.data_batch, @@ -347,6 +357,10 @@ def decode(self): s = Segment(path) for batch in s: for r in batch: + offset = batch.header.base_offset + r.offset_delta + if snapshot_offset is not None and offset <= snapshot_offset: + continue + d = KvStoreRecordDecoder(r, batch, value_is_optional_type=True) diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 11d7cecb16da..996be4b3d7fa 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -74,13 +74,26 @@ def read_broker(rdr): return br +def read_configuration_update(rdr): + return { + 'replicas_to_add': rdr.read_vector(read_vnode), + 'replicas_to_remove': rdr.read_vector(read_vnode) + } + + def read_raft_config(rdr): cfg = {} + cfg['version'] = rdr.read_int8() cfg['brokers'] = rdr.read_vector(read_broker) cfg['current_config'] = read_group_nodes(rdr) cfg['prev_config'] = rdr.read_optional(read_group_nodes) cfg['revision'] = rdr.read_int64() + + if cfg['version'] >= 4: + cfg['configuration_update'] = rdr.read_optional( + lambda ordr: read_configuration_update(ordr)) + return cfg diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 0447f4d29ace..70cbb62342b9 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -1,5 +1,6 @@ import struct import collections +from io import BufferedReader, BytesIO SERDE_ENVELOPE_FORMAT = "