From e1681649fdfd026357e0c2be8ef9d47b29313d93 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:23:23 +0100 Subject: [PATCH 01/22] tools: write offline_log_viewer kvstore/controller output to stdout On stderr it gets mixed with logs. On stdout it is straightforward to capture and decode json for use in tests. --- tools/offline_log_viewer/viewer.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 6011eca6723e..725c0adc8092 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -17,13 +17,21 @@ def print_kv_store(store): + # Map of partition ID to list of kvstore items + result = {} + for ntp in store.ntps: if ntp.nspace == "redpanda" and ntp.topic == "kvstore": logger.info(f"inspecting {ntp}") kv = KvStore(ntp) kv.decode() items = kv.items() - logger.info(json.dumps(items, indent=2)) + + result[ntp.partition] = items + + # Send JSON output to stdout in case caller wants to parse it, other + # CLI output goes to stderr via logger + print(json.dumps(result, indent=2)) def print_controller(store, bin_dump: bool): @@ -31,7 +39,10 @@ def print_controller(store, bin_dump: bool): if ntp.nspace == "redpanda" and ntp.topic == "controller": ctrl = ControllerLog(ntp) ctrl.decode(bin_dump) - logger.info(json.dumps(ctrl.records, indent=2)) + + # Send JSON output to stdout in case caller wants to parse it, other + # CLI output goes to stderr via logger + print(json.dumps(ctrl.records, indent=2)) def print_kafka(store, topic, headers_only): From 9f00c0229f112a50a2d0b9ddd5e217b367c725fb Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:24:55 +0100 Subject: [PATCH 02/22] tools: make offline_log_viewer aware of clean_segment kvstore key --- tools/offline_log_viewer/kvstore.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index c9cd3417e85b..98f7cf93023d 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -216,6 +216,8 @@ def decode_offset_translator_key(k): def decode_storage_key_name(key_type): if key_type == 0: return "start offset" + elif key_type == 1: + return "clean segment" return "unknown" From 963fc3d128dce5a9d689586996817ca8b94436e2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:34:58 +0100 Subject: [PATCH 03/22] tools: offline_log_viewer don't replay pre-snapshot kvstore messages This was wasteful: all messages from before the snapshot should already be accounted for in the state we loaded from the snapshot. --- tools/offline_log_viewer/kvstore.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 98f7cf93023d..744e8d597bff 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -333,10 +333,12 @@ def _apply(self, entry): self.kv[key] = entry['data'] def decode(self): + snapshot_offset = None if os.path.exists(f"{self.ntp.path}/snapshot"): snap = KvSnapshot(f"{self.ntp.path}/snapshot") snap.decode() logger.info(f"snapshot last offset: {snap.last_offset}") + snapshot_offset = snap.last_offset for r in snap.data_batch: d = KvStoreRecordDecoder(r, snap.data_batch, @@ -349,6 +351,10 @@ def decode(self): s = Segment(path) for batch in s: for r in batch: + offset = batch.header.base_offset + r.offset_delta + if snapshot_offset is not None and offset <= snapshot_offset: + continue + d = KvStoreRecordDecoder(r, batch, value_is_optional_type=True) From 7f666f7d0a438afdce059c75aa666a4f4cda1e92 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:38:04 +0100 Subject: [PATCH 04/22] tools/metadata_viewer: handle kvstore deletions A None value is a deletion. Previously, we ignored these and thereby the kvstore dump would include all keys ever created, even if they had since been deleted. --- tools/offline_log_viewer/kvstore.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 744e8d597bff..7c3a7ed5e74b 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -331,6 +331,12 @@ def _apply(self, entry): if entry['data'] is not None: self.kv[key] = entry['data'] + else: + try: + del self.kv[key] + except KeyError: + # Missing key, that's okay for a deletion + pass def decode(self): snapshot_offset = None From b492385d565eb0bd9d371da0a2c96fdf2ec9a9bb Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:17:40 +0100 Subject: [PATCH 05/22] tools: handle short header read in offline_log_viewer --- tools/offline_log_viewer/storage.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index 4fc4f5c5bb89..51ad230a724c 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -218,7 +218,10 @@ def from_stream(f, index): return None assert len(data) == records_size return Batch(index, header, data) - assert len(data) == 0 + + if len(data) < HEADER_SIZE: + # Short read, probably log being actively written or unclean shutdown + return None def __len__(self): return self.header.record_count From 37b76a1678b18667cf94d96a66174afe971325da Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:39:08 +0200 Subject: [PATCH 06/22] tools/offline_log_viewer: added decode_adl_or_serde to dispatch decoding functions --- tools/offline_log_viewer/controller.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index d290106dd982..b074f20b7993 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -173,7 +173,6 @@ def decode_feature_update_action(r): cmd['type_name'] = 'unknown' return cmd - def decode_cluster_bootstrap_command(record): def decode_user_and_credential(r): user_cred = {} @@ -201,6 +200,18 @@ def decode_user_and_credential(r): return cmd +def decode_adl_or_serde(record, adl_fn, serde_fn): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return serde_fn(k_rdr, rdr) + else: + return adl_fn(k_rdr, rdr) + def decode_record(batch, record, bin_dump: bool): ret = {} From 18328b1a333de8fb1ed1b4ed10213d17e93238e2 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:31:42 +0200 Subject: [PATCH 07/22] tools/offline_log_viewer: fixed variable used to read BatchType (was batch.header.type instead of batch.type) --- tools/offline_log_viewer/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index b074f20b7993..3ac561640f55 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -234,9 +234,9 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_user_command(record) if batch.type == BatchType.acl_management_cmd: ret['data'] = decode_acl_command(record) - if header.type == BatchType.cluster_config_cmd: + if batch.type == BatchType.cluster_config_cmd: ret['data'] = decode_config_command(record) - if header.type == BatchType.feature_update: + if batch.type == BatchType.feature_update: ret['data'] = decode_feature_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) From 4bbd0c5eb32795076ccbcea953df53695d2bbb10 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:35:20 +0200 Subject: [PATCH 08/22] tools/offline_log_viewer: added peek_int8, read_serde_enum ... and augmented Reader.read_envelope to accept a type_read Callable[rdr: Reader, version: Int]: Dict and a max_version parameter. If a type_read is used, it is invoked with reader and envelope if envelope.version <= max_version parameter. the result is joined with a dict {'envelope': envelope} and returned, otherwise an error dict is returned. If no type_read is used, envelope is returned immediately --- tools/offline_log_viewer/reader.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 0447f4d29ace..d3f2c2f0bb25 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -54,6 +54,9 @@ def read_int64(self): def read_uint64(self): return struct.unpack(" Date: Tue, 30 Aug 2022 20:50:10 +0200 Subject: [PATCH 09/22] tools/offline_log_viewer: added serde decoding for topic_commands the new code tries to mimic the structure of the adl decoding functions. the new code is aware of the version number embedded in the binary stream and will skip the field if the version is not supported, while reporting the error. --- tools/offline_log_viewer/controller.py | 205 ++++++++++++++++++++++++- 1 file changed, 200 insertions(+), 5 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 3ac561640f55..3aaf2123f340 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,3 +1,4 @@ +import logging from io import BufferedReader, BytesIO from model import * from reader import Reader @@ -6,9 +7,190 @@ import datetime -def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) +def read_topic_properties_serde(rdr: Reader, version): + assert 0 <= version <= 1 + topic_properties = { + 'compression': rdr.read_optional(Reader.read_serde_enum), + 'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum), + 'compaction_strategy': rdr.read_optional(Reader.read_serde_enum), + 'timestamp_type': rdr.read_optional(Reader.read_serde_enum), + 'segment_size': rdr.read_optional(Reader.read_uint64), + 'retention_bytes': rdr.read_tristate(Reader.read_uint64), + 'retention_duration': rdr.read_tristate(Reader.read_uint64), + 'recovery': rdr.read_optional(Reader.read_bool), + 'shadow_indexing': rdr.read_optional(Reader.read_serde_enum), + } + + # introduced for remote read replicas + if version == 1: + topic_properties |= { + 'read_replica': + rdr.read_optional(Reader.read_bool), + 'read_replica_bucket': + rdr.read_optional(Reader.read_string), + 'remote_topic_properties': + rdr.read_optional( + lambda r: { + 'remote_revision': r.read_int64(), + 'remote_partition_count': r.read_int32() + }), + } + return topic_properties + + +def read_topic_assignment_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'partitions': + rdr.read_int32(), + 'replication_factor': + rdr.read_int16(), + 'properties': + rdr.read_envelope(read_topic_properties_serde, + max_version=1), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda r, _: { + 'group': + r.read_int64(), + 'id': + r.read_int32(), + 'replicas': + r.read_serde_vector(lambda r: { + 'node_id': r.read_int32(), + 'shard': r.read_uint32(), + }), + })), + }) + + +def read_inc_update_op_serde(rdr: Reader): + v = rdr.read_serde_enum() + if -1 < v < 3: + return ['none', 'set', 'remove'][v] + return 'error' + + +def read_property_update_serde(rdr: Reader, type_reader): + return rdr.read_envelope(lambda rdr, _: { + 'value': type_reader(rdr), + 'op': read_inc_update_op_serde(rdr), + }) + + +def read_incremental_topic_update_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'compression': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'cleanup_policy_bitflags': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'compaction_strategy': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'timestamp_type': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'segment_size': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_uint64)), + 'retention_bytes': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)), + 'retention_duration': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_int64)), + 'shadow_indexing': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + }) + + +def read_create_partitions_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'new_total_partition_count': + rdr.read_int32(), + 'custom_assignments': + rdr.read_serde_vector(lambda r: r.read_serde_vector( + Reader.read_int32)), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda rdr, _: { + 'group': rdr.read_int64(), + 'id': rdr.read_int32(), + 'replicas': rdr.read_serde_vector(read_broker_shard), + })), + }) + + +def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_string'] = 'create_topic' + cmd |= read_topic_assignment_serde(rdr) + elif cmd['type'] == 1: + cmd['type_string'] = 'delete_topic' + cmd['namespace'] = rdr.read_string() + cmd['topic'] = rdr.read_string() + elif cmd['type'] == 2: + cmd['type_string'] = 'update_partitions' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 3: + cmd['type_string'] = 'finish_partitions_update' + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 4: + cmd['type_string'] = 'update_topic_properties' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['update'] = read_incremental_topic_update_serde(rdr) + elif cmd['type'] == 5: + cmd['type_string'] = 'create_partitions' + cmd |= read_create_partitions_serde(rdr) + elif cmd['type'] == 6: + cmd['type_string'] = 'create_non_replicable_topic' + cmd['topic'] = k_rdr.read_envelope( + lambda k_rdr, _: { + 'source': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + 'name': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + }) + elif cmd['type'] == 7: + cmd['type_string'] = 'cancel_moving_partition_replicas' + cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()}) + return cmd + + +def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -68,6 +250,19 @@ def decode_topic_command(record): return cmd +def decode_topic_command(record): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_ald_or_serde = rdr.peek_int8() + assert either_ald_or_serde >= -1, "unsupported serialization format" + if either_ald_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return decode_topic_command_serde(k_rdr, rdr) + else: + return decode_topic_command_adl(k_rdr, rdr) + + def decode_config(record): rdr = Reader(BytesIO(record.value)) return read_raft_config(rdr) @@ -114,7 +309,6 @@ def decode_acl_command(record): def read_config_kv(reader): - k = reader.read_string() v = reader.read_string() return (k, v) @@ -229,7 +423,8 @@ def decode_record(batch, record, bin_dump: bool): if batch.type == BatchType.raft_configuration: ret['data'] = decode_config(record) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_topic_command(record) + ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, + decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: ret['data'] = decode_user_command(record) if batch.type == BatchType.acl_management_cmd: From ce43d16640b8d4e84788218d1f7c9d99ee31d6aa Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:52:57 +0200 Subject: [PATCH 10/22] tools/offline_log_viewer: added decode_user_command for serde --- tools/offline_log_viewer/controller.py | 27 ++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 3aaf2123f340..70d9c2818c02 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -268,9 +268,27 @@ def decode_config(record): return read_raft_config(rdr) -def decode_user_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_user_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + cmd['str_type'] = decode_user_cmd_type(cmd['type']) + + if cmd['type'] == 5 or cmd['type'] == 7: + cmd['user'] = k_rdr.read_string() + cmd['cred'] = rdr.read_envelope( + lambda rdr, _: { + # obfuscate secrets + 'salt': obfuscate_secret(rdr.read_iobuf().hex()), + 'server_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'stored_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'iterations': rdr.read_int32(), + }) + elif cmd['type'] == 6: + cmd['user'] = k_rdr.read_string() + + return cmd + + +def decode_user_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_user_cmd_type(cmd['type']) @@ -426,7 +444,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_user_command(record) + ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, + decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: ret['data'] = decode_acl_command(record) if batch.type == BatchType.cluster_config_cmd: From 8b924defb565e0c129ae9fe728176f81b62259c5 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:58:34 +0200 Subject: [PATCH 11/22] tools/offline_log_viewer: added decode_acl_command for serde --- tools/offline_log_viewer/controller.py | 115 ++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 70d9c2818c02..c509eb3ba6c6 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -311,9 +311,115 @@ def decode_user_command_adl(k_rdr: Reader, rdr: Reader): return cmd -def decode_acl_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def read_acl_binding_serde(k_rdr: Reader): + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'resource': decode_acl_resource(k_rdr.read_serde_enum()), + 'name': k_rdr.read_string(), + 'pattern': decode_acl_pattern_type(k_rdr.read_serde_enum()) + }), + 'entry': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + }), + 'host': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + }), + 'operation': + decode_acl_operation(k_rdr.read_serde_enum()), + 'permission': + decode_acl_permission(k_rdr.read_serde_enum()), + }), + }) + + +def decode_serialized_pattern_type(v): + if 0 <= v <= 2: + return ['literal', 'prefixed', 'match'][v] + return 'error' + + +def read_acl_binding_filter_serde(k_rdr: Reader): + # pattern class does not really use serde + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': { + 'resource': + k_rdr.read_optional(lambda k_rdr: decode_acl_resource( + k_rdr.read_serde_enum())), + 'name': + k_rdr.read_envelope(Reader.read_string), + 'pattern': + k_rdr.read_optional(lambda k_rdr: + decode_serialized_pattern_type( + k_rdr.read_serde_enum())), + }, + 'acl': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + })), + 'host': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + })), + 'operation': + k_rdr.read_optional(lambda k_rdr: decode_acl_operation( + k_rdr.read_serde_enum())), + 'permission': + k_rdr.read_optional(lambda k_rdr: decode_acl_permission( + k_rdr.read_serde_enum())), + }), + }) + + +def decode_acl_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + cmd['str_type'] = decode_acls_cmd_type(cmd['type']) + if cmd['type'] == 8: + cmd['acls'] = k_rdr.read_envelope( + lambda k_rdr, _: + {'bindings': k_rdr.read_serde_vector(read_acl_binding_serde)}) + elif cmd['type'] == 9: + cmd |= k_rdr.read_envelope(lambda k_rdr, _: { + 'filters': + k_rdr.read_serde_vector(read_acl_binding_filter_serde) + }) + + return cmd + + +def decode_acl_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_acls_cmd_type(cmd['type']) @@ -447,7 +553,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_acl_command(record) + ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, + decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: ret['data'] = decode_config_command(record) if batch.type == BatchType.feature_update: From 29f9e7e8ec7c3cf7e23a901761bfe3b9ff2d039e Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 21:03:20 +0200 Subject: [PATCH 12/22] tools/offline_log_viewer: added decode_config_command for serde for cluster_config_command --- tools/offline_log_viewer/controller.py | 42 +++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index c509eb3ba6c6..7a1d0910ee83 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -438,9 +438,42 @@ def read_config_kv(reader): return (k, v) -def decode_config_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_config_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_name'] = 'config_delta' + cmd['version'] = k_rdr.read_int64() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'upsert': + rdr.read_serde_vector(lambda rdr: rdr.read_envelope( + lambda rdr, _: { + 'k': rdr.read_string(), + 'v': rdr.read_string() + })), + 'remove': + rdr.read_serde_vector(Reader.read_string), + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'config_status' + cmd['node_id'] = k_rdr.read_int32() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'status': + rdr.read_envelope( + lambda rdr, _: { + 'node': rdr.read_int32(), + 'version': rdr.read_int64(), + 'restart': rdr.read_bool(), + 'unknown': rdr.read_serde_vector(Reader.read_string), + 'invalid': rdr.read_serde_vector(Reader.read_string), + }), + }) + return cmd + + +def decode_config_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -556,7 +589,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: - ret['data'] = decode_config_command(record) + ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, + decode_config_command_serde) if batch.type == BatchType.feature_update: ret['data'] = decode_feature_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: From 98e4df6870f355ade9e86e61b64f9c0cb995bddf Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 21:05:23 +0200 Subject: [PATCH 13/22] tools/offline_log_viewer: decode_feature_command, decode_node_management_command --- tools/offline_log_viewer/controller.py | 89 ++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 7a1d0910ee83..93529cae18b6 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -496,16 +496,51 @@ def decode_config_command_adl(k_rdr: Reader, rdr: Reader): return cmd -def decode_feature_command(record): +def decode_action_t(v): + if 1 <= v <= 3: + return ['', 'complete_preparing', 'activate', 'deactivate'][v] + return v + + +def decode_feature_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd['type_name'] = 'feature_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'cluster_version': + k_rdr.read_int64(), + 'actions': + k_rdr.read_serde_vector(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'feature_name': k_rdr.read_string(), + 'action': decode_action_t(k_rdr.read_serde_enum()), + })) + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'license_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'redpanda_license': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'format_version': k_rdr.read_uint8(), + 'type': k_rdr.read_serde_enum(), + 'organization': k_rdr.read_string(), + 'expiry': k_rdr.read_int64(), + }) + }) + return cmd + + +def decode_feature_command_adl(k_rdr: Reader, rdr: Reader): def decode_feature_update_action(r): action = {} action['v'] = r.read_int8() action['feature_name'] = r.read_string() - action['action'] = r.read_int16() + action['action'] = decode_action_t(r.read_int16()) return action - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -513,15 +548,39 @@ def decode_feature_update_action(r): cmd['v'] = k_rdr.read_int8() cmd['cluster_version'] = k_rdr.read_int64() cmd['actions'] = k_rdr.read_vector(decode_feature_update_action) + return cmd + + +def decode_node_management_command(record): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd |= { + 'type_string': 'decommission_node', + 'node_id': k_rdr.read_int32() + } elif cmd['type'] == 1: - cmd['type_name'] = 'license_update' - k_rdr.read_envelope() - cmd['format_v'] = k_rdr.read_uint8() - cmd['license_type'] = k_rdr.read_uint8() - cmd['org'] = k_rdr.read_string() - cmd['expiry'] = k_rdr.read_string() - else: - cmd['type_name'] = 'unknown' + cmd |= { + 'type_string': 'recommission_node', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 2: + cmd |= { + 'type_string': 'finish_reallocations', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 3: + cmd |= { + 'type_string': 'maintenance_mode', + 'node_id': k_rdr.read_int32(), + 'enabled': rdr.read_bool() + } return cmd def decode_cluster_bootstrap_command(record): @@ -592,7 +651,11 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, decode_config_command_serde) if batch.type == BatchType.feature_update: - ret['data'] = decode_feature_command(record) + ret['data'] = decode_adl_or_serde(record, decode_feature_command_adl, + decode_feature_command_serde) + # TODO: serde for node_management_cmd, cluster_bootstrap_cmd + if batch.type == BatchType.node_management_cmd: + ret['data'] = decode_node_management_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) From 8fe06549b02f29cfd81fb357221f16ef26bc6766 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 6 Sep 2022 16:13:33 +0200 Subject: [PATCH 14/22] tools/offline_log_viewer: added method to get number of unprocessed bytes --- tools/offline_log_viewer/controller.py | 2 +- tools/offline_log_viewer/reader.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 93529cae18b6..8d9642d88c15 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -251,7 +251,7 @@ def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) + rdr = Reader(BytesIO(record.value)) k_rdr = Reader(BytesIO(record.key)) either_ald_or_serde = rdr.peek_int8() assert either_ald_or_serde >= -1, "unsupported serialization format" diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index d3f2c2f0bb25..70cbb62342b9 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -1,5 +1,6 @@ import struct import collections +from io import BufferedReader, BytesIO SERDE_ENVELOPE_FORMAT = " Date: Tue, 6 Sep 2022 17:34:14 +0200 Subject: [PATCH 15/22] tools/offline_log_viewer: added check that all the bytes of a record are processed in case of unread byte, an 'unread': {'k': ..., 'v': ...} field is added and an error is logged this check is not exact but is a good proxy to know if the decoding code is correct --- tools/offline_log_viewer/controller.py | 51 ++++++++++++++------------ 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 8d9642d88c15..6d7860aa57ee 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,11 +1,13 @@ import logging -from io import BufferedReader, BytesIO +from io import BytesIO from model import * from reader import Reader -from storage import Batch, Segment +from storage import Segment from storage import BatchType import datetime +logger = logging.getLogger('controller') + def read_topic_properties_serde(rdr: Reader, version): assert 0 <= version <= 1 @@ -263,8 +265,7 @@ def decode_topic_command(record): return decode_topic_command_adl(k_rdr, rdr) -def decode_config(record): - rdr = Reader(BytesIO(record.value)) +def decode_config(k_rdr: Reader, rdr: Reader): return read_raft_config(rdr) @@ -551,14 +552,7 @@ def decode_feature_update_action(r): return cmd -def decode_node_management_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) - either_adl_or_serde = rdr.peek_int8() - assert either_adl_or_serde >= -1, "unsupported serialization format" - if either_adl_or_serde == -1: - # serde encoding flag, consume it and proceed - rdr.skip(1) +def decode_node_management_command(k_rdr: Reader, rdr: Reader): cmd = {'type': rdr.read_int8()} if cmd['type'] == 0: cmd |= { @@ -610,9 +604,7 @@ def decode_user_and_credential(r): return cmd -def decode_adl_or_serde(record, adl_fn, serde_fn): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) +def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): either_adl_or_serde = rdr.peek_int8() assert either_adl_or_serde >= -1, "unsupported serialization format" if either_adl_or_serde == -1: @@ -636,29 +628,42 @@ def decode_record(batch, record, bin_dump: bool): ret['value_dump'] = record.value.__str__() ret['data'] = None + rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + if batch.type == BatchType.raft_configuration: - ret['data'] = decode_config(record) + ret['data'] = decode_config(k_rdr, rdr) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_topic_command_adl, decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_user_command_adl, decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_acl_command_adl, decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: - ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_config_command_adl, decode_config_command_serde) if batch.type == BatchType.feature_update: - ret['data'] = decode_adl_or_serde(record, decode_feature_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_feature_command_adl, decode_feature_command_serde) - # TODO: serde for node_management_cmd, cluster_bootstrap_cmd if batch.type == BatchType.node_management_cmd: - ret['data'] = decode_node_management_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_node_management_command, + decode_node_management_command) + # TODO: serde for cluster_bootstrap_cmd if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) + k_unread = k_rdr.remaining() + v_unread = rdr.remaining() + if k_unread != 0 or v_unread != 0: + ret['unread'] = {'key': k_unread, 'value': v_unread} + logger.error(f"@{ret['type']} unread bytes. k:{k_unread} v:{v_unread}") + return ret From d8d85c4cdffeeefc495c1d4bef907114dac990bf Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 14:01:40 +0000 Subject: [PATCH 16/22] tools/offline_log_viewer: support v=4 group_configuration --- tools/offline_log_viewer/model.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 11d7cecb16da..7e602fa824d5 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -74,6 +74,13 @@ def read_broker(rdr): return br +def read_configuration_update(rdr): + return { + 'replicas_to_add': rdr.read_vector(read_vnode), + 'replicas_to_remove': rdr.read_vector(read_vnode) + } + + def read_raft_config(rdr): cfg = {} cfg['version'] = rdr.read_int8() @@ -81,6 +88,11 @@ def read_raft_config(rdr): cfg['current_config'] = read_group_nodes(rdr) cfg['prev_config'] = rdr.read_optional(read_group_nodes) cfg['revision'] = rdr.read_int64() + + if cfg['version'] >= 4: + cfg['configuration_update'] = rdr.read_optional( + lambda ordr: read_configuration_update(ordr)) + return cfg From a9ab9fa7f80222db71ceeb10317cfe4b16116485 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 11:38:56 +0000 Subject: [PATCH 17/22] tools: fix kvstore reads in offline_log_viewer --- tools/offline_log_viewer/kvstore.py | 2 +- tools/offline_log_viewer/model.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 7c3a7ed5e74b..3295f1487df2 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -97,7 +97,7 @@ def decode(self): keyspace = k_rdr.read_int8() - key_buf = self.k_stream.read() + key_buf = k_rdr.stream.read() ret['key_space'] = self._decode_ks(keyspace) ret['key_buf'] = key_buf diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 7e602fa824d5..996be4b3d7fa 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -83,6 +83,7 @@ def read_configuration_update(rdr): def read_raft_config(rdr): cfg = {} + cfg['version'] = rdr.read_int8() cfg['brokers'] = rdr.read_vector(read_broker) cfg['current_config'] = read_group_nodes(rdr) From 8557ac42ae0e5ea5e1faea7d9c141b29ff31b0f9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:12:45 +0100 Subject: [PATCH 18/22] tests: include offline_log_viewer in Dockerfile --- tests/docker/Dockerfile | 3 +++ tests/docker/Dockerfile.dockerignore | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index debf1434a34d..2f342000abce 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -179,4 +179,7 @@ RUN mkdir -p /opt/scripts && \ curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \ chmod +x /opt/scripts/seastar-addr2line +RUN mkdir -p /opt/scripts/offline_log_viewer +COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer + CMD service ssh start && tail -f /dev/null diff --git a/tests/docker/Dockerfile.dockerignore b/tests/docker/Dockerfile.dockerignore index 9be2c986be8f..8d0cc79a40a1 100644 --- a/tests/docker/Dockerfile.dockerignore +++ b/tests/docker/Dockerfile.dockerignore @@ -3,4 +3,5 @@ !tests/docker/ssh !tests/setup.py !tests/python -!tests/go \ No newline at end of file +!tests/go +!tools/offline_log_viewer From a7e6983de44983fdb990296e664bb9079b0efc8f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 13:47:49 +0000 Subject: [PATCH 19/22] tests: add clients.offline_log_viewer --- tests/rptest/clients/offline_log_viewer.py | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/rptest/clients/offline_log_viewer.py diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py new file mode 100644 index 000000000000..71ba113ccccf --- /dev/null +++ b/tests/rptest/clients/offline_log_viewer.py @@ -0,0 +1,41 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json + + +class OfflineLogViewer: + """ + Wrap tools/offline_log_viewer for use in tests: this is for tests that + want to peek at the structures, but also for validating the tool itself. + """ + def __init__(self, redpanda): + self._redpanda = redpanda + + def _cmd(self, suffix): + viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py" + return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}" + + def read_kvstore(self, node): + cmd = self._cmd("--type kvstore") + kvstore_json = node.account.ssh_output(cmd, combine_stderr=False) + return json.loads(kvstore_json) + + def read_controller(self, node): + cmd = self._cmd("--type controller") + controller_json = node.account.ssh_output(cmd, combine_stderr=False) + try: + return json.loads(controller_json) + except json.decoder.JSONDecodeError: + # Log the bad output before re-raising + self._redpanda.logger.error( + f"Invalid JSON output: {controller_json}") + import time + time.sleep(3600) + raise From fac7d517fcb3a7c1dc6c89ae98eff7a0bc872705 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:18:30 +0100 Subject: [PATCH 20/22] tests: validate kvstore cleanup in topic_delete_test Partitions have various kvstore items, stored by the raft + storage layers. When the topic is deleted, they should be deleted too. Use our offline log decoder tool to inspect the kvstore out-of-band and validate the keys go away when the topic is deleted. --- tests/rptest/tests/topic_delete_test.py | 125 ++++++++++++++++++++---- 1 file changed, 108 insertions(+), 17 deletions(-) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 7b5a99433427..b5222639d380 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -7,24 +7,108 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.services.cluster import cluster -from rptest.services.admin import Admin -from ducktape.utils.util import wait_until import time +import json +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from requests.exceptions import HTTPError + +from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.services.rpk_producer import RpkProducer from rptest.services.metrics_check import MetricCheck from rptest.services.redpanda import SISettings from rptest.util import wait_for_segments_removal -from ducktape.mark import parametrize +from rptest.services.admin import Admin -def topic_storage_purged(redpanda, topic: str): +def get_kvstore_topic_key_counts(redpanda): + """ + Count the keys in KVStore that relate to Kafka topics: this excludes all + internal topic items: if no Kafka topics exist, this should be zero for + all nodes. + + :returns: dict of Node to integer + """ + + viewer = OfflineLogViewer(redpanda) + + # Find the raft group IDs of internal topics + admin = Admin(redpanda) + internal_group_ids = set() + for ntp in [ + ('redpanda', 'controller', 0), + ('kafka_internal', 'id_allocator', 0), + ]: + namespace, topic, partition = ntp + try: + p = admin.get_partitions(topic=topic, + namespace=namespace, + partition=partition) + except HTTPError as e: + # OK if internal topic doesn't exist (e.g. id_allocator + # doesn't have to exist) + if e.response.status_code != 404: + raise + else: + internal_group_ids.add(p['raft_group_id']) + + result = {} + for n in redpanda.nodes: + kvstore_data = viewer.read_kvstore(node=n) + + excess_keys = [] + for shard, items in kvstore_data.items(): + keys = [i['key'] for i in items] + + for k in keys: + if k['keyspace'] == "cluster": + # Not a per-partition key + continue + + if k['data'].get('group', None) in internal_group_ids: + # One of the internal topics + continue + + if k['data'].get('ntp', {}).get('topic', None) == 'controller': + # Controller storage item + continue + + excess_keys.append(k) + + redpanda.logger.info( + f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}" + ) + + key_count = len(excess_keys) + result[n] = key_count + + return result + + +def topic_storage_purged(redpanda, topic_name): storage = redpanda.storage() - return all(map(lambda n: topic not in n.ns["kafka"].topics, storage.nodes)) + logs_removed = all( + map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes)) + + if not logs_removed: + return False + + # Once logs are removed, also do more expensive inspection of + # kvstore to check that per-partition kvstore contents are + # gone. The user doesn't care about this, but it is important + # to avoid bugs that would cause kvstore to bloat through + # topic creation/destruction cycles. + + topic_key_counts = get_kvstore_topic_key_counts(redpanda) + if any([v > 0 for v in topic_key_counts.values()]): + return False + + return True class TopicDeleteTest(RedpandaTest): @@ -44,7 +128,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) @cluster(num_nodes=3) - def topic_delete_test(self): + @parametrize(with_restart=False) + @parametrize(with_restart=True) + def topic_delete_test(self, with_restart): def produce_until_partitions(): self.kafka_tools.produce(self.topic, 1024, 1024) storage = self.redpanda.storage() @@ -55,6 +141,16 @@ def produce_until_partitions(): backoff_sec=2, err_msg="Expected partition did not materialize") + if with_restart: + # Do a restart to encourage writes and flushes, especially to + # the kvstore. + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + self.kafka_tools.delete_topic(self.topic) try: @@ -294,17 +390,12 @@ def check_compaction(): pass producer.free() - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: topic_name not in n.ns["kafka"].topics, - storage.nodes)) - try: - wait_until(lambda: topic_storage_purged(), - timeout_sec=60, - backoff_sec=2, - err_msg="Topic storage was not removed") + wait_until( + lambda: topic_storage_purged(self.redpanda, topic_name), + timeout_sec=60, + backoff_sec=2, + err_msg="Topic storage was not removed") except: # On errors, dump listing of the storage location From 18131351cbc86b4c64068751bd36d8674773e179 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 13:48:03 +0000 Subject: [PATCH 21/22] tests: check controller log decode after multi-version upgrade --- tests/rptest/tests/upgrade_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index c90795d3f970..204f2cce078e 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -17,6 +17,7 @@ from rptest.services.admin import Admin from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.tests.redpanda_test import RedpandaTest from rptest.tests.end_to_end import EndToEndTest @@ -259,6 +260,15 @@ def _consumer_offsets_present(): assert self._rand_consumer.consumer_status.validator.total_reads >= self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL assert self._cg_consumer.consumer_status.validator.valid_reads >= wrote_at_least + # Validate that the data structures written by a mixture of historical + # versions remain readable by our current debug tools + log_viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + controller_records = log_viewer.read_controller(node=node) + self.logger.info( + f"Read {len(controller_records)} controller records from node {node.name} successfully" + ) + class UpgradeWithWorkloadTest(EndToEndTest): """ From 189e4753461eba7c020ddd31f8e9d9417ebea244 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 12:19:08 +0000 Subject: [PATCH 22/22] tools: disable broken controller command decode This needs fixing, but better to comment it out and have the rest of the command work. --- tools/offline_log_viewer/controller.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 6d7860aa57ee..b68d53f89a04 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -149,7 +149,8 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): cmd['type'] = rdr.read_int8() if cmd['type'] == 0: cmd['type_string'] = 'create_topic' - cmd |= read_topic_assignment_serde(rdr) + # FIXME: this fails to read messages written on tip of dev ead54dda + #cmd |= read_topic_assignment_serde(rdr) elif cmd['type'] == 1: cmd['type_string'] = 'delete_topic' cmd['namespace'] = rdr.read_string() @@ -577,6 +578,7 @@ def decode_node_management_command(k_rdr: Reader, rdr: Reader): } return cmd + def decode_cluster_bootstrap_command(record): def decode_user_and_credential(r): user_cred = {} @@ -604,6 +606,7 @@ def decode_user_and_credential(r): return cmd + def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): either_adl_or_serde = rdr.peek_int8() assert either_adl_or_serde >= -1, "unsupported serialization format"