From 08136e0678a1538e3a68d6a5b8eddbd5d59a52db Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:23:23 +0100 Subject: [PATCH 01/33] tools: write offline_log_viewer kvstore/controller output to stdout On stderr it gets mixed with logs. On stdout it is straightforward to capture and decode json for use in tests. (cherry picked from commit e1681649fdfd026357e0c2be8ef9d47b29313d93) --- tools/offline_log_viewer/viewer.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 6011eca6723e..725c0adc8092 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -17,13 +17,21 @@ def print_kv_store(store): + # Map of partition ID to list of kvstore items + result = {} + for ntp in store.ntps: if ntp.nspace == "redpanda" and ntp.topic == "kvstore": logger.info(f"inspecting {ntp}") kv = KvStore(ntp) kv.decode() items = kv.items() - logger.info(json.dumps(items, indent=2)) + + result[ntp.partition] = items + + # Send JSON output to stdout in case caller wants to parse it, other + # CLI output goes to stderr via logger + print(json.dumps(result, indent=2)) def print_controller(store, bin_dump: bool): @@ -31,7 +39,10 @@ def print_controller(store, bin_dump: bool): if ntp.nspace == "redpanda" and ntp.topic == "controller": ctrl = ControllerLog(ntp) ctrl.decode(bin_dump) - logger.info(json.dumps(ctrl.records, indent=2)) + + # Send JSON output to stdout in case caller wants to parse it, other + # CLI output goes to stderr via logger + print(json.dumps(ctrl.records, indent=2)) def print_kafka(store, topic, headers_only): From 7767288c08a1dc4ef57fb02622c790e2d87dba22 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:24:55 +0100 Subject: [PATCH 02/33] tools: make offline_log_viewer aware of clean_segment kvstore key (cherry picked from commit 9f00c0229f112a50a2d0b9ddd5e217b367c725fb) --- tools/offline_log_viewer/kvstore.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index c9cd3417e85b..98f7cf93023d 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -216,6 +216,8 @@ def decode_offset_translator_key(k): def decode_storage_key_name(key_type): if key_type == 0: return "start offset" + elif key_type == 1: + return "clean segment" return "unknown" From c64ca07370ab4f74179c309e59b1caff06ea5e4a Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:34:58 +0100 Subject: [PATCH 03/33] tools: offline_log_viewer don't replay pre-snapshot kvstore messages This was wasteful: all messages from before the snapshot should already be accounted for in the state we loaded from the snapshot. (cherry picked from commit 963fc3d128dce5a9d689586996817ca8b94436e2) --- tools/offline_log_viewer/kvstore.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 98f7cf93023d..744e8d597bff 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -333,10 +333,12 @@ def _apply(self, entry): self.kv[key] = entry['data'] def decode(self): + snapshot_offset = None if os.path.exists(f"{self.ntp.path}/snapshot"): snap = KvSnapshot(f"{self.ntp.path}/snapshot") snap.decode() logger.info(f"snapshot last offset: {snap.last_offset}") + snapshot_offset = snap.last_offset for r in snap.data_batch: d = KvStoreRecordDecoder(r, snap.data_batch, @@ -349,6 +351,10 @@ def decode(self): s = Segment(path) for batch in s: for r in batch: + offset = batch.header.base_offset + r.offset_delta + if snapshot_offset is not None and offset <= snapshot_offset: + continue + d = KvStoreRecordDecoder(r, batch, value_is_optional_type=True) From 50947da4d816a8a3f221aea02e9e0b33f9f0a171 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 15:38:04 +0100 Subject: [PATCH 04/33] tools/metadata_viewer: handle kvstore deletions A None value is a deletion. Previously, we ignored these and thereby the kvstore dump would include all keys ever created, even if they had since been deleted. (cherry picked from commit 7f666f7d0a438afdce059c75aa666a4f4cda1e92) --- tools/offline_log_viewer/kvstore.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 744e8d597bff..7c3a7ed5e74b 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -331,6 +331,12 @@ def _apply(self, entry): if entry['data'] is not None: self.kv[key] = entry['data'] + else: + try: + del self.kv[key] + except KeyError: + # Missing key, that's okay for a deletion + pass def decode(self): snapshot_offset = None From d1bc4cc4e4b4a9c404ccd9f087c339fc24b56b13 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:17:40 +0100 Subject: [PATCH 05/33] tools: handle short header read in offline_log_viewer (cherry picked from commit b492385d565eb0bd9d371da0a2c96fdf2ec9a9bb) --- tools/offline_log_viewer/storage.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/storage.py b/tools/offline_log_viewer/storage.py index 4fc4f5c5bb89..51ad230a724c 100644 --- a/tools/offline_log_viewer/storage.py +++ b/tools/offline_log_viewer/storage.py @@ -218,7 +218,10 @@ def from_stream(f, index): return None assert len(data) == records_size return Batch(index, header, data) - assert len(data) == 0 + + if len(data) < HEADER_SIZE: + # Short read, probably log being actively written or unclean shutdown + return None def __len__(self): return self.header.record_count From 8e391128ce1f1829a33f5aac3a5d6e138f772ff7 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:39:08 +0200 Subject: [PATCH 06/33] tools/offline_log_viewer: added decode_adl_or_serde to dispatch decoding functions (cherry picked from commit 37b76a1678b18667cf94d96a66174afe971325da) --- tools/offline_log_viewer/controller.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index d290106dd982..b074f20b7993 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -173,7 +173,6 @@ def decode_feature_update_action(r): cmd['type_name'] = 'unknown' return cmd - def decode_cluster_bootstrap_command(record): def decode_user_and_credential(r): user_cred = {} @@ -201,6 +200,18 @@ def decode_user_and_credential(r): return cmd +def decode_adl_or_serde(record, adl_fn, serde_fn): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return serde_fn(k_rdr, rdr) + else: + return adl_fn(k_rdr, rdr) + def decode_record(batch, record, bin_dump: bool): ret = {} From 274ad4f94b9767da6501e3a4dff403c2f23223d0 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:31:42 +0200 Subject: [PATCH 07/33] tools/offline_log_viewer: fixed variable used to read BatchType (was batch.header.type instead of batch.type) (cherry picked from commit 18328b1a333de8fb1ed1b4ed10213d17e93238e2) --- tools/offline_log_viewer/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index b074f20b7993..3ac561640f55 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -234,9 +234,9 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_user_command(record) if batch.type == BatchType.acl_management_cmd: ret['data'] = decode_acl_command(record) - if header.type == BatchType.cluster_config_cmd: + if batch.type == BatchType.cluster_config_cmd: ret['data'] = decode_config_command(record) - if header.type == BatchType.feature_update: + if batch.type == BatchType.feature_update: ret['data'] = decode_feature_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) From 9efbdfa57dd521b8c16bd67c71a67e1508c9ea7c Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:35:20 +0200 Subject: [PATCH 08/33] tools/offline_log_viewer: added peek_int8, read_serde_enum ... and augmented Reader.read_envelope to accept a type_read Callable[rdr: Reader, version: Int]: Dict and a max_version parameter. If a type_read is used, it is invoked with reader and envelope if envelope.version <= max_version parameter. the result is joined with a dict {'envelope': envelope} and returned, otherwise an error dict is returned. If no type_read is used, envelope is returned immediately (cherry picked from commit 4bbd0c5eb32795076ccbcea953df53695d2bbb10) --- tools/offline_log_viewer/reader.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 0447f4d29ace..d3f2c2f0bb25 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -54,6 +54,9 @@ def read_int64(self): def read_uint64(self): return struct.unpack(" Date: Tue, 30 Aug 2022 20:50:10 +0200 Subject: [PATCH 09/33] tools/offline_log_viewer: added serde decoding for topic_commands the new code tries to mimic the structure of the adl decoding functions. the new code is aware of the version number embedded in the binary stream and will skip the field if the version is not supported, while reporting the error. (cherry picked from commit 021317ad3315d891041ee38b24bf880ba7faf14c) --- tools/offline_log_viewer/controller.py | 205 ++++++++++++++++++++++++- 1 file changed, 200 insertions(+), 5 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 3ac561640f55..3aaf2123f340 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,3 +1,4 @@ +import logging from io import BufferedReader, BytesIO from model import * from reader import Reader @@ -6,9 +7,190 @@ import datetime -def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) +def read_topic_properties_serde(rdr: Reader, version): + assert 0 <= version <= 1 + topic_properties = { + 'compression': rdr.read_optional(Reader.read_serde_enum), + 'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum), + 'compaction_strategy': rdr.read_optional(Reader.read_serde_enum), + 'timestamp_type': rdr.read_optional(Reader.read_serde_enum), + 'segment_size': rdr.read_optional(Reader.read_uint64), + 'retention_bytes': rdr.read_tristate(Reader.read_uint64), + 'retention_duration': rdr.read_tristate(Reader.read_uint64), + 'recovery': rdr.read_optional(Reader.read_bool), + 'shadow_indexing': rdr.read_optional(Reader.read_serde_enum), + } + + # introduced for remote read replicas + if version == 1: + topic_properties |= { + 'read_replica': + rdr.read_optional(Reader.read_bool), + 'read_replica_bucket': + rdr.read_optional(Reader.read_string), + 'remote_topic_properties': + rdr.read_optional( + lambda r: { + 'remote_revision': r.read_int64(), + 'remote_partition_count': r.read_int32() + }), + } + return topic_properties + + +def read_topic_assignment_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'partitions': + rdr.read_int32(), + 'replication_factor': + rdr.read_int16(), + 'properties': + rdr.read_envelope(read_topic_properties_serde, + max_version=1), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda r, _: { + 'group': + r.read_int64(), + 'id': + r.read_int32(), + 'replicas': + r.read_serde_vector(lambda r: { + 'node_id': r.read_int32(), + 'shard': r.read_uint32(), + }), + })), + }) + + +def read_inc_update_op_serde(rdr: Reader): + v = rdr.read_serde_enum() + if -1 < v < 3: + return ['none', 'set', 'remove'][v] + return 'error' + + +def read_property_update_serde(rdr: Reader, type_reader): + return rdr.read_envelope(lambda rdr, _: { + 'value': type_reader(rdr), + 'op': read_inc_update_op_serde(rdr), + }) + + +def read_incremental_topic_update_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'compression': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'cleanup_policy_bitflags': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'compaction_strategy': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'timestamp_type': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + 'segment_size': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_uint64)), + 'retention_bytes': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)), + 'retention_duration': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_int64)), + 'shadow_indexing': + read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum)), + }) + + +def read_create_partitions_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + 'cfg': + rdr.read_envelope( + lambda rdr, _: { + 'namespace': + rdr.read_string(), + 'topic': + rdr.read_string(), + 'new_total_partition_count': + rdr.read_int32(), + 'custom_assignments': + rdr.read_serde_vector(lambda r: r.read_serde_vector( + Reader.read_int32)), + }), + 'assignments': + rdr.read_serde_vector(lambda r: r.read_envelope( + lambda rdr, _: { + 'group': rdr.read_int64(), + 'id': rdr.read_int32(), + 'replicas': rdr.read_serde_vector(read_broker_shard), + })), + }) + + +def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_string'] = 'create_topic' + cmd |= read_topic_assignment_serde(rdr) + elif cmd['type'] == 1: + cmd['type_string'] = 'delete_topic' + cmd['namespace'] = rdr.read_string() + cmd['topic'] = rdr.read_string() + elif cmd['type'] == 2: + cmd['type_string'] = 'update_partitions' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 3: + cmd['type_string'] = 'finish_partitions_update' + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() + cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) + elif cmd['type'] == 4: + cmd['type_string'] = 'update_topic_properties' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['update'] = read_incremental_topic_update_serde(rdr) + elif cmd['type'] == 5: + cmd['type_string'] = 'create_partitions' + cmd |= read_create_partitions_serde(rdr) + elif cmd['type'] == 6: + cmd['type_string'] = 'create_non_replicable_topic' + cmd['topic'] = k_rdr.read_envelope( + lambda k_rdr, _: { + 'source': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + 'name': { + 'namespace': k_rdr.read_string(), + 'topic': k_rdr.read_string(), + }, + }) + elif cmd['type'] == 7: + cmd['type_string'] = 'cancel_moving_partition_replicas' + cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()}) + return cmd + + +def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -68,6 +250,19 @@ def decode_topic_command(record): return cmd +def decode_topic_command(record): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_ald_or_serde = rdr.peek_int8() + assert either_ald_or_serde >= -1, "unsupported serialization format" + if either_ald_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + return decode_topic_command_serde(k_rdr, rdr) + else: + return decode_topic_command_adl(k_rdr, rdr) + + def decode_config(record): rdr = Reader(BytesIO(record.value)) return read_raft_config(rdr) @@ -114,7 +309,6 @@ def decode_acl_command(record): def read_config_kv(reader): - k = reader.read_string() v = reader.read_string() return (k, v) @@ -229,7 +423,8 @@ def decode_record(batch, record, bin_dump: bool): if batch.type == BatchType.raft_configuration: ret['data'] = decode_config(record) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_topic_command(record) + ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, + decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: ret['data'] = decode_user_command(record) if batch.type == BatchType.acl_management_cmd: From 5f37e92e5e260246f59ae9056948250367e80cf8 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:52:57 +0200 Subject: [PATCH 10/33] tools/offline_log_viewer: added decode_user_command for serde (cherry picked from commit ce43d16640b8d4e84788218d1f7c9d99ee31d6aa) --- tools/offline_log_viewer/controller.py | 27 ++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 3aaf2123f340..70d9c2818c02 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -268,9 +268,27 @@ def decode_config(record): return read_raft_config(rdr) -def decode_user_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_user_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + cmd['str_type'] = decode_user_cmd_type(cmd['type']) + + if cmd['type'] == 5 or cmd['type'] == 7: + cmd['user'] = k_rdr.read_string() + cmd['cred'] = rdr.read_envelope( + lambda rdr, _: { + # obfuscate secrets + 'salt': obfuscate_secret(rdr.read_iobuf().hex()), + 'server_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'stored_key': obfuscate_secret(rdr.read_iobuf().hex()), + 'iterations': rdr.read_int32(), + }) + elif cmd['type'] == 6: + cmd['user'] = k_rdr.read_string() + + return cmd + + +def decode_user_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_user_cmd_type(cmd['type']) @@ -426,7 +444,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_user_command(record) + ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, + decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: ret['data'] = decode_acl_command(record) if batch.type == BatchType.cluster_config_cmd: From 8883e8735734181cc0a9796f41e86d66055e1b1e Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 20:58:34 +0200 Subject: [PATCH 11/33] tools/offline_log_viewer: added decode_acl_command for serde (cherry picked from commit 8b924defb565e0c129ae9fe728176f81b62259c5) --- tools/offline_log_viewer/controller.py | 115 ++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 70d9c2818c02..c509eb3ba6c6 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -311,9 +311,115 @@ def decode_user_command_adl(k_rdr: Reader, rdr: Reader): return cmd -def decode_acl_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def read_acl_binding_serde(k_rdr: Reader): + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'resource': decode_acl_resource(k_rdr.read_serde_enum()), + 'name': k_rdr.read_string(), + 'pattern': decode_acl_pattern_type(k_rdr.read_serde_enum()) + }), + 'entry': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + }), + 'host': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + }), + 'operation': + decode_acl_operation(k_rdr.read_serde_enum()), + 'permission': + decode_acl_permission(k_rdr.read_serde_enum()), + }), + }) + + +def decode_serialized_pattern_type(v): + if 0 <= v <= 2: + return ['literal', 'prefixed', 'match'][v] + return 'error' + + +def read_acl_binding_filter_serde(k_rdr: Reader): + # pattern class does not really use serde + return k_rdr.read_envelope( + lambda k_rdr, _: { + 'pattern': { + 'resource': + k_rdr.read_optional(lambda k_rdr: decode_acl_resource( + k_rdr.read_serde_enum())), + 'name': + k_rdr.read_envelope(Reader.read_string), + 'pattern': + k_rdr.read_optional(lambda k_rdr: + decode_serialized_pattern_type( + k_rdr.read_serde_enum())), + }, + 'acl': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'principal': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'type': + decode_acl_principal_type(k_rdr.read_serde_enum()), + 'name': + k_rdr.read_string() + })), + 'host': + k_rdr.read_optional(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'addr': + k_rdr.read_optional( + lambda k_rdr: { + 'ipv4': k_rdr.read_bool(), + 'data': k_rdr.read_iobuf().hex() + }) + })), + 'operation': + k_rdr.read_optional(lambda k_rdr: decode_acl_operation( + k_rdr.read_serde_enum())), + 'permission': + k_rdr.read_optional(lambda k_rdr: decode_acl_permission( + k_rdr.read_serde_enum())), + }), + }) + + +def decode_acl_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + cmd['str_type'] = decode_acls_cmd_type(cmd['type']) + if cmd['type'] == 8: + cmd['acls'] = k_rdr.read_envelope( + lambda k_rdr, _: + {'bindings': k_rdr.read_serde_vector(read_acl_binding_serde)}) + elif cmd['type'] == 9: + cmd |= k_rdr.read_envelope(lambda k_rdr, _: { + 'filters': + k_rdr.read_serde_vector(read_acl_binding_filter_serde) + }) + + return cmd + + +def decode_acl_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() cmd['str_type'] = decode_acls_cmd_type(cmd['type']) @@ -447,7 +553,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_acl_command(record) + ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, + decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: ret['data'] = decode_config_command(record) if batch.type == BatchType.feature_update: From b3ef720a377f536f42cd275840fde2f76eaaeaa5 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 21:03:20 +0200 Subject: [PATCH 12/33] tools/offline_log_viewer: added decode_config_command for serde for cluster_config_command (cherry picked from commit 29f9e7e8ec7c3cf7e23a901761bfe3b9ff2d039e) --- tools/offline_log_viewer/controller.py | 42 +++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index c509eb3ba6c6..7a1d0910ee83 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -438,9 +438,42 @@ def read_config_kv(reader): return (k, v) -def decode_config_command(record): - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_config_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {} + cmd['type'] = rdr.read_int8() + if cmd['type'] == 0: + cmd['type_name'] = 'config_delta' + cmd['version'] = k_rdr.read_int64() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'upsert': + rdr.read_serde_vector(lambda rdr: rdr.read_envelope( + lambda rdr, _: { + 'k': rdr.read_string(), + 'v': rdr.read_string() + })), + 'remove': + rdr.read_serde_vector(Reader.read_string), + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'config_status' + cmd['node_id'] = k_rdr.read_int32() + cmd |= rdr.read_envelope( + lambda rdr, _: { + 'status': + rdr.read_envelope( + lambda rdr, _: { + 'node': rdr.read_int32(), + 'version': rdr.read_int64(), + 'restart': rdr.read_bool(), + 'unknown': rdr.read_serde_vector(Reader.read_string), + 'invalid': rdr.read_serde_vector(Reader.read_string), + }), + }) + return cmd + + +def decode_config_command_adl(k_rdr: Reader, rdr: Reader): cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -556,7 +589,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: - ret['data'] = decode_config_command(record) + ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, + decode_config_command_serde) if batch.type == BatchType.feature_update: ret['data'] = decode_feature_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: From b949c1e54d056616847e9436b7bedd3ef8894cfe Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 30 Aug 2022 21:05:23 +0200 Subject: [PATCH 13/33] tools/offline_log_viewer: decode_feature_command, decode_node_management_command (cherry picked from commit 98e4df6870f355ade9e86e61b64f9c0cb995bddf) --- tools/offline_log_viewer/controller.py | 89 ++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 7a1d0910ee83..93529cae18b6 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -496,16 +496,51 @@ def decode_config_command_adl(k_rdr: Reader, rdr: Reader): return cmd -def decode_feature_command(record): +def decode_action_t(v): + if 1 <= v <= 3: + return ['', 'complete_preparing', 'activate', 'deactivate'][v] + return v + + +def decode_feature_command_serde(k_rdr: Reader, rdr: Reader): + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd['type_name'] = 'feature_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'cluster_version': + k_rdr.read_int64(), + 'actions': + k_rdr.read_serde_vector(lambda k_rdr: k_rdr.read_envelope( + lambda k_rdr, _: { + 'feature_name': k_rdr.read_string(), + 'action': decode_action_t(k_rdr.read_serde_enum()), + })) + }) + elif cmd['type'] == 1: + cmd['type_name'] = 'license_update' + cmd |= k_rdr.read_envelope( + lambda k_rdr, _: { + 'redpanda_license': + k_rdr.read_envelope( + lambda k_rdr, _: { + 'format_version': k_rdr.read_uint8(), + 'type': k_rdr.read_serde_enum(), + 'organization': k_rdr.read_string(), + 'expiry': k_rdr.read_int64(), + }) + }) + return cmd + + +def decode_feature_command_adl(k_rdr: Reader, rdr: Reader): def decode_feature_update_action(r): action = {} action['v'] = r.read_int8() action['feature_name'] = r.read_string() - action['action'] = r.read_int16() + action['action'] = decode_action_t(r.read_int16()) return action - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) cmd = {} cmd['type'] = rdr.read_int8() if cmd['type'] == 0: @@ -513,15 +548,39 @@ def decode_feature_update_action(r): cmd['v'] = k_rdr.read_int8() cmd['cluster_version'] = k_rdr.read_int64() cmd['actions'] = k_rdr.read_vector(decode_feature_update_action) + return cmd + + +def decode_node_management_command(record): + rdr = Reader(BufferedReader(BytesIO(record.value))) + k_rdr = Reader(BytesIO(record.key)) + either_adl_or_serde = rdr.peek_int8() + assert either_adl_or_serde >= -1, "unsupported serialization format" + if either_adl_or_serde == -1: + # serde encoding flag, consume it and proceed + rdr.skip(1) + cmd = {'type': rdr.read_int8()} + if cmd['type'] == 0: + cmd |= { + 'type_string': 'decommission_node', + 'node_id': k_rdr.read_int32() + } elif cmd['type'] == 1: - cmd['type_name'] = 'license_update' - k_rdr.read_envelope() - cmd['format_v'] = k_rdr.read_uint8() - cmd['license_type'] = k_rdr.read_uint8() - cmd['org'] = k_rdr.read_string() - cmd['expiry'] = k_rdr.read_string() - else: - cmd['type_name'] = 'unknown' + cmd |= { + 'type_string': 'recommission_node', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 2: + cmd |= { + 'type_string': 'finish_reallocations', + 'node_id': k_rdr.read_int32() + } + elif cmd['type'] == 3: + cmd |= { + 'type_string': 'maintenance_mode', + 'node_id': k_rdr.read_int32(), + 'enabled': rdr.read_bool() + } return cmd def decode_cluster_bootstrap_command(record): @@ -592,7 +651,11 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, decode_config_command_serde) if batch.type == BatchType.feature_update: - ret['data'] = decode_feature_command(record) + ret['data'] = decode_adl_or_serde(record, decode_feature_command_adl, + decode_feature_command_serde) + # TODO: serde for node_management_cmd, cluster_bootstrap_cmd + if batch.type == BatchType.node_management_cmd: + ret['data'] = decode_node_management_command(record) if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) From 6783ce85a0ef85f5a1862c44915cb3784ec3086c Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 6 Sep 2022 16:13:33 +0200 Subject: [PATCH 14/33] tools/offline_log_viewer: added method to get number of unprocessed bytes (cherry picked from commit 8fe06549b02f29cfd81fb357221f16ef26bc6766) --- tools/offline_log_viewer/controller.py | 2 +- tools/offline_log_viewer/reader.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 93529cae18b6..8d9642d88c15 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -251,7 +251,7 @@ def decode_topic_command_adl(k_rdr: Reader, rdr: Reader): def decode_topic_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) + rdr = Reader(BytesIO(record.value)) k_rdr = Reader(BytesIO(record.key)) either_ald_or_serde = rdr.peek_int8() assert either_ald_or_serde >= -1, "unsupported serialization format" diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index d3f2c2f0bb25..70cbb62342b9 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -1,5 +1,6 @@ import struct import collections +from io import BufferedReader, BytesIO SERDE_ENVELOPE_FORMAT = " Date: Tue, 6 Sep 2022 17:34:14 +0200 Subject: [PATCH 15/33] tools/offline_log_viewer: added check that all the bytes of a record are processed in case of unread byte, an 'unread': {'k': ..., 'v': ...} field is added and an error is logged this check is not exact but is a good proxy to know if the decoding code is correct (cherry picked from commit ba26cd7c8d4b3a1cfce7ca42a03e941242159a6a) --- tools/offline_log_viewer/controller.py | 51 ++++++++++++++------------ 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 8d9642d88c15..6d7860aa57ee 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -1,11 +1,13 @@ import logging -from io import BufferedReader, BytesIO +from io import BytesIO from model import * from reader import Reader -from storage import Batch, Segment +from storage import Segment from storage import BatchType import datetime +logger = logging.getLogger('controller') + def read_topic_properties_serde(rdr: Reader, version): assert 0 <= version <= 1 @@ -263,8 +265,7 @@ def decode_topic_command(record): return decode_topic_command_adl(k_rdr, rdr) -def decode_config(record): - rdr = Reader(BytesIO(record.value)) +def decode_config(k_rdr: Reader, rdr: Reader): return read_raft_config(rdr) @@ -551,14 +552,7 @@ def decode_feature_update_action(r): return cmd -def decode_node_management_command(record): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) - either_adl_or_serde = rdr.peek_int8() - assert either_adl_or_serde >= -1, "unsupported serialization format" - if either_adl_or_serde == -1: - # serde encoding flag, consume it and proceed - rdr.skip(1) +def decode_node_management_command(k_rdr: Reader, rdr: Reader): cmd = {'type': rdr.read_int8()} if cmd['type'] == 0: cmd |= { @@ -610,9 +604,7 @@ def decode_user_and_credential(r): return cmd -def decode_adl_or_serde(record, adl_fn, serde_fn): - rdr = Reader(BufferedReader(BytesIO(record.value))) - k_rdr = Reader(BytesIO(record.key)) +def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): either_adl_or_serde = rdr.peek_int8() assert either_adl_or_serde >= -1, "unsupported serialization format" if either_adl_or_serde == -1: @@ -636,29 +628,42 @@ def decode_record(batch, record, bin_dump: bool): ret['value_dump'] = record.value.__str__() ret['data'] = None + rdr = Reader(BytesIO(record.value)) + k_rdr = Reader(BytesIO(record.key)) + if batch.type == BatchType.raft_configuration: - ret['data'] = decode_config(record) + ret['data'] = decode_config(k_rdr, rdr) if batch.type == BatchType.topic_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_topic_command_adl, decode_topic_command_serde) if batch.type == BatchType.user_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_user_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_user_command_adl, decode_user_command_serde) if batch.type == BatchType.acl_management_cmd: - ret['data'] = decode_adl_or_serde(record, decode_acl_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_acl_command_adl, decode_acl_command_serde) if batch.type == BatchType.cluster_config_cmd: - ret['data'] = decode_adl_or_serde(record, decode_config_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_config_command_adl, decode_config_command_serde) if batch.type == BatchType.feature_update: - ret['data'] = decode_adl_or_serde(record, decode_feature_command_adl, + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_feature_command_adl, decode_feature_command_serde) - # TODO: serde for node_management_cmd, cluster_bootstrap_cmd if batch.type == BatchType.node_management_cmd: - ret['data'] = decode_node_management_command(record) + ret['data'] = decode_adl_or_serde(k_rdr, rdr, + decode_node_management_command, + decode_node_management_command) + # TODO: serde for cluster_bootstrap_cmd if batch.type == BatchType.cluster_bootstrap_cmd: ret['data'] = decode_cluster_bootstrap_command(record) + k_unread = k_rdr.remaining() + v_unread = rdr.remaining() + if k_unread != 0 or v_unread != 0: + ret['unread'] = {'key': k_unread, 'value': v_unread} + logger.error(f"@{ret['type']} unread bytes. k:{k_unread} v:{v_unread}") + return ret From 2b152d62a167d82fc438804deb723d5850bed914 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 14:01:40 +0000 Subject: [PATCH 16/33] tools/offline_log_viewer: support v=4 group_configuration (cherry picked from commit d8d85c4cdffeeefc495c1d4bef907114dac990bf) --- tools/offline_log_viewer/model.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 11d7cecb16da..7e602fa824d5 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -74,6 +74,13 @@ def read_broker(rdr): return br +def read_configuration_update(rdr): + return { + 'replicas_to_add': rdr.read_vector(read_vnode), + 'replicas_to_remove': rdr.read_vector(read_vnode) + } + + def read_raft_config(rdr): cfg = {} cfg['version'] = rdr.read_int8() @@ -81,6 +88,11 @@ def read_raft_config(rdr): cfg['current_config'] = read_group_nodes(rdr) cfg['prev_config'] = rdr.read_optional(read_group_nodes) cfg['revision'] = rdr.read_int64() + + if cfg['version'] >= 4: + cfg['configuration_update'] = rdr.read_optional( + lambda ordr: read_configuration_update(ordr)) + return cfg From 052c4f8860884a12854dd83026b127c46d74b75a Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 11:38:56 +0000 Subject: [PATCH 17/33] tools: fix kvstore reads in offline_log_viewer (cherry picked from commit a9ab9fa7f80222db71ceeb10317cfe4b16116485) --- tools/offline_log_viewer/kvstore.py | 2 +- tools/offline_log_viewer/model.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 7c3a7ed5e74b..3295f1487df2 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -97,7 +97,7 @@ def decode(self): keyspace = k_rdr.read_int8() - key_buf = self.k_stream.read() + key_buf = k_rdr.stream.read() ret['key_space'] = self._decode_ks(keyspace) ret['key_buf'] = key_buf diff --git a/tools/offline_log_viewer/model.py b/tools/offline_log_viewer/model.py index 7e602fa824d5..996be4b3d7fa 100644 --- a/tools/offline_log_viewer/model.py +++ b/tools/offline_log_viewer/model.py @@ -83,6 +83,7 @@ def read_configuration_update(rdr): def read_raft_config(rdr): cfg = {} + cfg['version'] = rdr.read_int8() cfg['brokers'] = rdr.read_vector(read_broker) cfg['current_config'] = read_group_nodes(rdr) From 8642bcf31899c29acda0554c8ec506a1dac8afd8 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:12:45 +0100 Subject: [PATCH 18/33] tests: include offline_log_viewer in Dockerfile (cherry picked from commit 8557ac42ae0e5ea5e1faea7d9c141b29ff31b0f9) --- tests/docker/Dockerfile | 3 +++ tests/docker/Dockerfile.dockerignore | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index b8f32a09fae0..cac65675c029 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -179,4 +179,7 @@ RUN mkdir -p /opt/scripts && \ curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \ chmod +x /opt/scripts/seastar-addr2line +RUN mkdir -p /opt/scripts/offline_log_viewer +COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer + CMD service ssh start && tail -f /dev/null diff --git a/tests/docker/Dockerfile.dockerignore b/tests/docker/Dockerfile.dockerignore index 9be2c986be8f..8d0cc79a40a1 100644 --- a/tests/docker/Dockerfile.dockerignore +++ b/tests/docker/Dockerfile.dockerignore @@ -3,4 +3,5 @@ !tests/docker/ssh !tests/setup.py !tests/python -!tests/go \ No newline at end of file +!tests/go +!tools/offline_log_viewer From 4701853525b00b84040d5c33d31d6668f62ee0c1 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 13:47:49 +0000 Subject: [PATCH 19/33] tests: add clients.offline_log_viewer (cherry picked from commit a7e6983de44983fdb990296e664bb9079b0efc8f) --- tests/rptest/clients/offline_log_viewer.py | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/rptest/clients/offline_log_viewer.py diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py new file mode 100644 index 000000000000..71ba113ccccf --- /dev/null +++ b/tests/rptest/clients/offline_log_viewer.py @@ -0,0 +1,41 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json + + +class OfflineLogViewer: + """ + Wrap tools/offline_log_viewer for use in tests: this is for tests that + want to peek at the structures, but also for validating the tool itself. + """ + def __init__(self, redpanda): + self._redpanda = redpanda + + def _cmd(self, suffix): + viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py" + return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}" + + def read_kvstore(self, node): + cmd = self._cmd("--type kvstore") + kvstore_json = node.account.ssh_output(cmd, combine_stderr=False) + return json.loads(kvstore_json) + + def read_controller(self, node): + cmd = self._cmd("--type controller") + controller_json = node.account.ssh_output(cmd, combine_stderr=False) + try: + return json.loads(controller_json) + except json.decoder.JSONDecodeError: + # Log the bad output before re-raising + self._redpanda.logger.error( + f"Invalid JSON output: {controller_json}") + import time + time.sleep(3600) + raise From c16eff6ac3e4425bc968ec7a137cb4a181c2c719 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Jul 2022 17:18:30 +0100 Subject: [PATCH 20/33] tests: validate kvstore cleanup in topic_delete_test Partitions have various kvstore items, stored by the raft + storage layers. When the topic is deleted, they should be deleted too. Use our offline log decoder tool to inspect the kvstore out-of-band and validate the keys go away when the topic is deleted. (cherry picked from commit fac7d517fcb3a7c1dc6c89ae98eff7a0bc872705) --- tests/rptest/tests/topic_delete_test.py | 125 ++++++++++++++++++++---- 1 file changed, 108 insertions(+), 17 deletions(-) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 7b5a99433427..b5222639d380 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -7,24 +7,108 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.services.cluster import cluster -from rptest.services.admin import Admin -from ducktape.utils.util import wait_until import time +import json +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from requests.exceptions import HTTPError + +from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.services.rpk_producer import RpkProducer from rptest.services.metrics_check import MetricCheck from rptest.services.redpanda import SISettings from rptest.util import wait_for_segments_removal -from ducktape.mark import parametrize +from rptest.services.admin import Admin -def topic_storage_purged(redpanda, topic: str): +def get_kvstore_topic_key_counts(redpanda): + """ + Count the keys in KVStore that relate to Kafka topics: this excludes all + internal topic items: if no Kafka topics exist, this should be zero for + all nodes. + + :returns: dict of Node to integer + """ + + viewer = OfflineLogViewer(redpanda) + + # Find the raft group IDs of internal topics + admin = Admin(redpanda) + internal_group_ids = set() + for ntp in [ + ('redpanda', 'controller', 0), + ('kafka_internal', 'id_allocator', 0), + ]: + namespace, topic, partition = ntp + try: + p = admin.get_partitions(topic=topic, + namespace=namespace, + partition=partition) + except HTTPError as e: + # OK if internal topic doesn't exist (e.g. id_allocator + # doesn't have to exist) + if e.response.status_code != 404: + raise + else: + internal_group_ids.add(p['raft_group_id']) + + result = {} + for n in redpanda.nodes: + kvstore_data = viewer.read_kvstore(node=n) + + excess_keys = [] + for shard, items in kvstore_data.items(): + keys = [i['key'] for i in items] + + for k in keys: + if k['keyspace'] == "cluster": + # Not a per-partition key + continue + + if k['data'].get('group', None) in internal_group_ids: + # One of the internal topics + continue + + if k['data'].get('ntp', {}).get('topic', None) == 'controller': + # Controller storage item + continue + + excess_keys.append(k) + + redpanda.logger.info( + f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}" + ) + + key_count = len(excess_keys) + result[n] = key_count + + return result + + +def topic_storage_purged(redpanda, topic_name): storage = redpanda.storage() - return all(map(lambda n: topic not in n.ns["kafka"].topics, storage.nodes)) + logs_removed = all( + map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes)) + + if not logs_removed: + return False + + # Once logs are removed, also do more expensive inspection of + # kvstore to check that per-partition kvstore contents are + # gone. The user doesn't care about this, but it is important + # to avoid bugs that would cause kvstore to bloat through + # topic creation/destruction cycles. + + topic_key_counts = get_kvstore_topic_key_counts(redpanda) + if any([v > 0 for v in topic_key_counts.values()]): + return False + + return True class TopicDeleteTest(RedpandaTest): @@ -44,7 +128,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) @cluster(num_nodes=3) - def topic_delete_test(self): + @parametrize(with_restart=False) + @parametrize(with_restart=True) + def topic_delete_test(self, with_restart): def produce_until_partitions(): self.kafka_tools.produce(self.topic, 1024, 1024) storage = self.redpanda.storage() @@ -55,6 +141,16 @@ def produce_until_partitions(): backoff_sec=2, err_msg="Expected partition did not materialize") + if with_restart: + # Do a restart to encourage writes and flushes, especially to + # the kvstore. + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + self.kafka_tools.delete_topic(self.topic) try: @@ -294,17 +390,12 @@ def check_compaction(): pass producer.free() - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: topic_name not in n.ns["kafka"].topics, - storage.nodes)) - try: - wait_until(lambda: topic_storage_purged(), - timeout_sec=60, - backoff_sec=2, - err_msg="Topic storage was not removed") + wait_until( + lambda: topic_storage_purged(self.redpanda, topic_name), + timeout_sec=60, + backoff_sec=2, + err_msg="Topic storage was not removed") except: # On errors, dump listing of the storage location From 54233af39b6f1f654a407d2a2e44242db0a7c93d Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 13:48:03 +0000 Subject: [PATCH 21/33] tests: check controller log decode after multi-version upgrade (cherry picked from commit 18131351cbc86b4c64068751bd36d8674773e179) --- tests/rptest/tests/upgrade_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index 5204a8166eed..2208b5feab29 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -17,6 +17,7 @@ from rptest.services.admin import Admin from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.tests.redpanda_test import RedpandaTest from rptest.tests.end_to_end import EndToEndTest @@ -253,6 +254,15 @@ def _consumer_offsets_present(): assert self._rand_consumer.consumer_status.validator.total_reads >= self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL assert self._cg_consumer.consumer_status.validator.valid_reads >= wrote_at_least + # Validate that the data structures written by a mixture of historical + # versions remain readable by our current debug tools + log_viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + controller_records = log_viewer.read_controller(node=node) + self.logger.info( + f"Read {len(controller_records)} controller records from node {node.name} successfully" + ) + class UpgradeWithWorkloadTest(EndToEndTest): """ From 3e4eb8414e6a71a4635c7d27ea3a6bec3d17a9fb Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Dec 2022 12:19:08 +0000 Subject: [PATCH 22/33] tools: disable broken controller command decode This needs fixing, but better to comment it out and have the rest of the command work. (cherry picked from commit 189e4753461eba7c020ddd31f8e9d9417ebea244) --- tools/offline_log_viewer/controller.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 6d7860aa57ee..b68d53f89a04 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -149,7 +149,8 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): cmd['type'] = rdr.read_int8() if cmd['type'] == 0: cmd['type_string'] = 'create_topic' - cmd |= read_topic_assignment_serde(rdr) + # FIXME: this fails to read messages written on tip of dev ead54dda + #cmd |= read_topic_assignment_serde(rdr) elif cmd['type'] == 1: cmd['type_string'] = 'delete_topic' cmd['namespace'] = rdr.read_string() @@ -577,6 +578,7 @@ def decode_node_management_command(k_rdr: Reader, rdr: Reader): } return cmd + def decode_cluster_bootstrap_command(record): def decode_user_and_credential(r): user_cred = {} @@ -604,6 +606,7 @@ def decode_user_and_credential(r): return cmd + def decode_adl_or_serde(k_rdr: Reader, rdr: Reader, adl_fn, serde_fn): either_adl_or_serde = rdr.peek_int8() assert either_adl_or_serde >= -1, "unsupported serialization format" From 6d0592ca7cdae38975402b3f610769f84f4d7696 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 2 Dec 2022 07:54:00 +0100 Subject: [PATCH 23/33] tools: make reader capable of reading big endian encoding Signed-off-by: Michal Maslanka (cherry picked from commit 065d1f47759ccd14ba703d7bf422db3274910dcc) --- tools/offline_log_viewer/reader.py | 43 +++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 70cbb62342b9..46bc7762119a 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -1,3 +1,4 @@ +from enum import Enum import struct import collections from io import BufferedReader, BytesIO @@ -9,10 +10,16 @@ ('version', 'compat_version', 'size')) +class Endianness(Enum): + BIG_ENDIAN = 0 + LITTLE_ENDIAN = 1 + + class Reader: - def __init__(self, stream: BytesIO): + def __init__(self, stream, endianness=Endianness.LITTLE_ENDIAN): # BytesIO provides .getBuffer(), BufferedReader peek() self.stream = BufferedReader(stream) + self.endianness = endianness @staticmethod def _decode_zig_zag(v): @@ -32,29 +39,33 @@ def read_varint(self): return Reader._decode_zig_zag(result) + def with_endianness(self, str): + ch = '<' if self.endianness == Endianness.LITTLE_ENDIAN else '>' + return f"{ch}{str}" + def read_int8(self): - return struct.unpack(" Date: Fri, 2 Dec 2022 08:03:48 +0100 Subject: [PATCH 24/33] tools/viewer: added ability to read consumer offsets topic Signed-off-by: Michal Maslanka (cherry picked from commit 1f6da2234fbe0c4e9061c09488bb617068421980) --- tools/offline_log_viewer/consumer_offsets.py | 123 +++++++++++++++++++ tools/offline_log_viewer/viewer.py | 24 +++- 2 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 tools/offline_log_viewer/consumer_offsets.py diff --git a/tools/offline_log_viewer/consumer_offsets.py b/tools/offline_log_viewer/consumer_offsets.py new file mode 100644 index 000000000000..7e1666f909bf --- /dev/null +++ b/tools/offline_log_viewer/consumer_offsets.py @@ -0,0 +1,123 @@ +import base64 +from io import BytesIO +from model import * +from reader import Endianness, Reader +from storage import Segment +import datetime + + +def decode_key_type(v): + if v == 0 or v == 1: + return "offset_commit" + elif v == 2: + return "group_metadata" + + return "unknown" + + +def decode_member_proto(rdr): + ret = {} + ret['name'] = rdr.read_string() + ret['metadata'] = rdr.read_iobuf().hex() + return ret + + +def decode_member(rdr): + ret = {} + ret['v'] = rdr.read_int16() + ret['member_id'] = rdr.read_kafka_string() + ret['instance_id'] = rdr.read_kafka_optional_string() + ret['client_id'] = rdr.read_kafka_string() + ret['client_host'] = rdr.read_kafka_string() + ret['rebalance_timeout'] = rdr.read_int32() + ret['session_timeout'] = rdr.read_int32() + ret['subscription'] = base64.b64encode( + rdr.read_kafka_bytes()).decode('utf-8') + ret['assignment'] = base64.b64encode( + rdr.read_kafka_bytes()).decode('utf-8') + + return ret + + +def decode_metadata(rdr): + ret = {} + ret['version'] = rdr.read_int16() + ret['protocol_type'] = rdr.read_kafka_string() + ret['generation_id'] = rdr.read_int32() + ret['protocol_name'] = rdr.read_kafka_optional_string() + ret['leader'] = rdr.read_kafka_optional_string() + ret['state_timestamp'] = rdr.read_int64() + ret['member_state'] = rdr.read_vector(decode_member) + return ret + + +def decode_key(key_rdr): + ret = {} + v = key_rdr.read_int16() + ret['type'] = decode_key_type(v) + ret['group_id'] = key_rdr.read_kafka_string() + if ret['type'] == 'offset_commit': + ret['topic'] = key_rdr.read_kafka_string() + ret['partition'] = key_rdr.read_int32() + + return ret + + +def decode_offset_commit(v_rdr): + ret = {} + ret['version'] = v_rdr.read_int16() + ret['committed_offset'] = v_rdr.read_int64() + if ret['version'] >= 3: + ret['leader_epoch'] = v_rdr.read_int32() + + ret['committed_metadata'] = v_rdr.read_kafka_string() + ret['commit_timestamp'] = v_rdr.read_int64() + if ret['version'] == 1: + ret['expiry_timestamp'] = v_rdr.read_int64() + + return ret + + +def decode_record(hdr, r): + v = {} + v['epoch'] = hdr.first_ts + v['offset'] = hdr.base_offset + r.offset_delta + v['ts'] = datetime.datetime.utcfromtimestamp( + hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S') + k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN) + + v['key'] = decode_key(k_rdr) + + if v['key']['type'] == "group_metadata": + if r.value: + rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN) + v['value'] = decode_metadata(rdr) + else: + v['value'] = 'tombstone' + elif v['key']['type'] == "offset_commit": + if r.value: + rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN) + v['value'] = decode_offset_commit(rdr) + else: + v['value'] = 'tombstone' + + return v + + +class OffsetsLog: + def __init__(self, ntp): + self.ntp = ntp + self.records = [] + + def decode(self): + paths = [] + for path in self.ntp.segments: + paths.append(path) + paths.sort() + for path in paths: + s = Segment(path) + for b in s: + if b.header.type != 1: + continue + for r in b: + self.records.append(decode_record(b.header, r)) diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 725c0adc8092..6da1ef34b7a3 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -5,6 +5,7 @@ from controller import ControllerLog from consumer_groups import GroupsLog +from consumer_offsets import OffsetsLog from tx_coordinator import TxLog from storage import Store @@ -66,6 +67,22 @@ def print_groups(store): logger.info("") +def print_consumer_offsets(store): + records = [] + for ntp in store.ntps: + if ntp.nspace == "kafka" and ntp.topic == "__consumer_offsets": + l = OffsetsLog(ntp) + l.decode() + records.append({ + "partition_id": ntp.partition, + "records": l.records + }) + + # Send JSON output to stdout in case caller wants to parse it, other + # CLI output goes to stderr via logger + print(json.dumps(records, indent=2)) + + def print_tx_coordinator(store): for ntp in store.ntps: if ntp.nspace == "kafka_internal" and ntp.topic == "tx": @@ -114,7 +131,8 @@ def generate_options(): parser.add_argument('--type', type=str, choices=[ - 'controller', 'kvstore', 'kafka', 'group', + 'controller', 'kvstore', 'kafka', + 'consumer_offsets', 'legacy-group', 'kafka_records', 'tx_coordinator' ], required=True, @@ -152,8 +170,10 @@ def generate_options(): elif options.type == "kafka_records": validate_topic(options.path, options.topic) print_kafka(store, options.topic, headers_only=False) - elif options.type == "group": + elif options.type == "legacy-group": print_groups(store) + elif options.type == "consumer_offsets": + print_consumer_offsets(store) elif options.type == "tx_coordinator": validate_tx_coordinator(options.path) print_tx_coordinator(store) From a09ac0f2eb2a152550604ed7f8a8dae92272c14e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 20 Dec 2022 16:42:14 +0100 Subject: [PATCH 25/33] tests/rpk: do not match whitespaces when parsing group describe Signed-off-by: Michal Maslanka (cherry picked from commit 8db3de7bcda9993ea650a228868e90a12037e54e) --- tests/rptest/clients/rpk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index c2f56892a7d0..0d34aed7e2ab 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -371,10 +371,10 @@ def parse_field(field_name, string): static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$") partition_pattern_static_member = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" + "(?P[^\s]+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*) *(?P[^\s]*)? *(?P[^\s]*)?" ) partition_pattern_dynamic_member = re.compile( - "(?P.+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" + "(?P[^\s]+) +(?P\d+) +(?P\d+|-) +(?P\d+|-) +(?P-?\d+|-) *(?P[^\s]*)? *(?P[^\s]*)? *(?P[^\s]*)?" ) def check_lines(lines): From fce3a38e373ff902dd1d6e126ba55bd28b7ff0c7 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 20 Dec 2022 11:05:01 +0100 Subject: [PATCH 26/33] tests: added parsing of consumer offsets topic to one of the test cases Signed-off-by: Michal Maslanka (cherry picked from commit d7d49aaf2601eed7cf0a769baa61b8056d102501) --- tests/rptest/clients/offline_log_viewer.py | 17 ++++++++---- tests/rptest/tests/consumer_group_test.py | 32 +++++++++++++++++++++- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py index 71ba113ccccf..c2e1938869d5 100644 --- a/tests/rptest/clients/offline_log_viewer.py +++ b/tests/rptest/clients/offline_log_viewer.py @@ -27,15 +27,20 @@ def read_kvstore(self, node): kvstore_json = node.account.ssh_output(cmd, combine_stderr=False) return json.loads(kvstore_json) - def read_controller(self, node): - cmd = self._cmd("--type controller") - controller_json = node.account.ssh_output(cmd, combine_stderr=False) + def _json_cmd(self, node, suffix): + cmd = self._cmd(suffix=suffix) + json_out = node.account.ssh_output(cmd, combine_stderr=False) try: - return json.loads(controller_json) + return json.loads(json_out) except json.decoder.JSONDecodeError: # Log the bad output before re-raising - self._redpanda.logger.error( - f"Invalid JSON output: {controller_json}") + self._redpanda.logger.error(f"Invalid JSON output: {json_out}") import time time.sleep(3600) raise + + def read_controller(self, node): + return self._json_cmd(node, "--type controller") + + def read_consumer_offsets(self, node): + return self._json_cmd(node, "--type consumer_offsets") diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index fc96b7b10b05..87efe3cc994f 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -8,6 +8,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.services.cluster import cluster from rptest.clients.rpk import RpkException, RpkTool @@ -29,7 +30,10 @@ def __init__(self, test_ctx, *args, **kwargs): num_brokers=3, *args, # disable leader balancer to make sure that group will not be realoaded because of leadership changes - extra_rp_conf={"enable_leader_balancer": False}, + extra_rp_conf={ + "enable_leader_balancer": False, + "default_topic_replications": 3 + }, **kwargs) def make_consumer_properties(base_properties, instance_id=None): @@ -151,6 +155,32 @@ def test_basic_group_join(self, static_members): c.wait() c.free() + gd = RpkTool(self.redpanda).group_describe(group=group) + viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + consumer_offsets_partitions = viewer.read_consumer_offsets( + node=node) + offsets = {} + groups = set() + for partition in consumer_offsets_partitions: + for r in partition['records']: + self.logger.info(f"{r}") + if r['key']['type'] == 'group_metadata': + groups.add(r['key']['group_id']) + elif r['key']['type'] == 'offset_commit': + tp = f"{r['key']['topic']}/{r['key']['partition']}" + if tp not in offsets: + offsets[tp] = -1 + offsets[tp] = max(r['value']['committed_offset'], + offsets[tp]) + + assert len(groups) == 1 and group in groups + assert all([ + f"{p.topic}/{p.partition}" in offsets + and offsets[f"{p.topic}/{p.partition}"] == p.current_offset + for p in gd.partitions + ]) + @cluster(num_nodes=6) def test_mixed_consumers_join(self): """ From 64217049324e755ecafb7e17d9caf134fa418d69 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 22 Dec 2022 13:14:15 +0100 Subject: [PATCH 27/33] k/group_metadata: added support for v1 offset metadata value Redpanda maintains on disk format compatibility with Kafka for `__consumer_offsets` topic messages. Added support for `expiry_timestamp` field that was part of `OffsetCommitValue` version 1. Previously redpanda implementation defaulted to the most recent version 3. Expiry timestamp may be set by clients using older versions of `OffsetCommit` requests. To maintain compatibility we must persist the value for older messages. Signed-off-by: Michal Maslanka (cherry picked from commit 913a5f7c42c36417ea301885715d70a6cbe69b28) --- src/v/kafka/server/group_metadata.cc | 21 +++++++++++++------ src/v/kafka/server/group_metadata.h | 6 +++++- .../group_metadata_serialization_test.cc | 11 ++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index 95dfe3eaf408..6971a3350e9b 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -201,26 +201,32 @@ void offset_metadata_value::encode( response_writer& writer, const offset_metadata_value& v) { writer.write(v.version); writer.write(v.offset); - writer.write(v.leader_epoch); + if (v.version >= group_metadata_version{3}) { + writer.write(v.leader_epoch); + } writer.write(v.metadata); writer.write(v.commit_timestamp); + if (v.version == group_metadata_version{1}) { + writer.write(v.expiry_timestamp); + } } offset_metadata_value offset_metadata_value::decode(request_reader& reader) { offset_metadata_value ret; auto version = read_metadata_version(reader); validate_version_range( - version, "offset_metadata_value", offset_metadata_value::version); + version, "offset_metadata_value", offset_metadata_value::latest_version); + ret.version = version; ret.offset = model::offset(reader.read_int64()); if (version >= group_metadata_version{3}) { ret.leader_epoch = kafka::leader_epoch(reader.read_int32()); } ret.metadata = reader.read_string(); ret.commit_timestamp = model::timestamp(reader.read_int64()); - // read and ignore expiry_timestamp only present in version 1 + // read expiry_timestamp only present in version 1 if (version == group_metadata_version{1}) { - reader.read_int64(); + ret.expiry_timestamp = model::timestamp(reader.read_int64()); } return ret; @@ -568,11 +574,14 @@ std::ostream& operator<<(std::ostream& o, const offset_metadata_key& v) { std::ostream& operator<<(std::ostream& o, const offset_metadata_value& v) { fmt::print( o, - "{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestap: {}}}", + "{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestamp: {}, " + "expiry_timestamp: {}, version: {}}}", v.offset, v.leader_epoch, v.metadata, - v.commit_timestamp); + v.commit_timestamp, + v.expiry_timestamp, + v.version); return o; } namespace old { diff --git a/src/v/kafka/server/group_metadata.h b/src/v/kafka/server/group_metadata.h index f238fb6f34fe..bb0b7dab3300 100644 --- a/src/v/kafka/server/group_metadata.h +++ b/src/v/kafka/server/group_metadata.h @@ -229,11 +229,15 @@ struct offset_metadata_key { * The value type for offset commit records, consistent with Kafka format */ struct offset_metadata_value { - static constexpr group_metadata_version version{3}; + static constexpr group_metadata_version latest_version{3}; + group_metadata_version version = latest_version; model::offset offset; + // present only in version >= 3 kafka::leader_epoch leader_epoch = invalid_leader_epoch; ss::sstring metadata; model::timestamp commit_timestamp; + // present only in version 1 + model::timestamp expiry_timestamp{-1}; friend std::ostream& operator<<(std::ostream&, const offset_metadata_value&); diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index c121d4383aa0..ebdd0d19424a 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -112,7 +112,18 @@ FIXTURE_TEST(metadata_rt_test, fixture) { offset_key.partition = random_named_int(); roundtrip_test(offset_key); + // version 1 + kafka::offset_metadata_value offset_md_v1; + offset_md_v1.version = kafka::group_metadata_version(1); + offset_md_v1.offset = random_named_int(); + offset_md_v1.metadata = random_named_string(); + offset_md_v1.commit_timestamp = model::timestamp::now(); + offset_md_v1.expiry_timestamp = model::timestamp::now(); + + roundtrip_test(offset_md_v1); + + // version 3 kafka::offset_metadata_value offset_md; offset_md.offset = random_named_int(); From 8480ac8a17e2ab8031fe5591d97643cada8bc8dd Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 2 Dec 2022 11:09:55 +0100 Subject: [PATCH 28/33] tools: introduced simple consumer offsets recovery tool Introduced tool that may be helpful when dealing with consumer offsets topic issue. The tool is designed to make changing the `__consumer_offsets` topic partition count. It can be also used to restore consumer group state. Signed-off-by: Michal Maslanka (cherry picked from commit 82d136ad1f2a887381d1d2fef25701385461d400) --- .../config.properties | 6 + tools/consumer_offsets_recovery/main.py | 163 ++++++++++++++++++ .../requirements.txt | 2 + 3 files changed, 171 insertions(+) create mode 100644 tools/consumer_offsets_recovery/config.properties create mode 100755 tools/consumer_offsets_recovery/main.py create mode 100644 tools/consumer_offsets_recovery/requirements.txt diff --git a/tools/consumer_offsets_recovery/config.properties b/tools/consumer_offsets_recovery/config.properties new file mode 100644 index 000000000000..bef3cbb97058 --- /dev/null +++ b/tools/consumer_offsets_recovery/config.properties @@ -0,0 +1,6 @@ +bootstrap_servers=127.0.0.1 +# security +# security_protocol=SASL_PLAINTEXT +# sasl_mechanism=SCRAM-SHA-256 +# sasl_plain_username=admin +# sasl_plain_password=admin diff --git a/tools/consumer_offsets_recovery/main.py b/tools/consumer_offsets_recovery/main.py new file mode 100755 index 000000000000..0a17f7334d20 --- /dev/null +++ b/tools/consumer_offsets_recovery/main.py @@ -0,0 +1,163 @@ +import os +from pathlib import Path + +import subprocess +from kafka import KafkaAdminClient +from kafka import KafkaConsumer +from kafka.admin.new_topic import NewTopic +from kafka import TopicPartition +from kafka import OffsetAndMetadata +from jproperties import Properties +import logging + +logger = logging.getLogger('cg-recovery-tool') + + +def read_offsets(admin: KafkaAdminClient): + + groups_dict = {} + groups = admin.list_consumer_groups() + for g, _ in groups: + logger.info(f"reading group '{g}' offsets") + offsets = admin.list_consumer_group_offsets(group_id=g) + groups_dict[g] = offsets + + return groups_dict + + +def read_config(path): + logger.debug(f"reading configuration from {path}") + cfg = Properties() + with open(path, 'rb') as f: + cfg.load(f) + cfg_dict = {} + for k, v in cfg.items(): + cfg_dict[k] = v.data + + return cfg_dict + + +def seek_to_file(path, cfg, dry_run): + + logger.debug(f"reading offsets file: {path}") + offsets = {} + with open(path, 'r') as f: + group = "" + consumer = None + for i, l in enumerate(f): + if i == 0: + group = l.strip() + if not dry_run: + consumer = KafkaConsumer(group_id=group, **cfg) + logger.info(f"seeking group {group} to file {path}") + continue + if l == "": + continue + + topic, partition, offset = tuple([p.strip() for p in l.split(',')]) + logger.debug( + f"group: {group} partition: {topic}/{partition} offset: {offset}" + ) + tp = TopicPartition(topic=topic, partition=int(partition)) + + offsets[tp] = OffsetAndMetadata(offset=int(offset), metadata='') + if not dry_run: + consumer.commit(offsets=offsets) + + +def offset_file(group): + return f"{group}.offsets" + + +def seek_all(config, path, dry_run): + logger.info(f"reading offsets from: {path}") + for _, _, files in os.walk(path): + for f in files: + seek_to_file(Path(path).joinpath(f), config, dry_run) + + +def query_offsets(offsets_path, cfg): + admin = KafkaAdminClient(**cfg) + + current_offsets = read_offsets(admin) + path = Path(offsets_path) + logger.info(f"storing offsets in {offsets_path}") + path.mkdir(exist_ok=True) + for group_id, offsets in current_offsets.items(): + with open(path.joinpath(offset_file(group_id)), 'w') as f: + f.write(f"{group_id}\n") + for tp, md in offsets.items(): + topic, partition = tp + offset, _ = md + f.write(f"{topic},{partition},{offset}\n") + + +def recreate_topic(partitions, replication_factor, cfg): + admin = KafkaAdminClient(**cfg) + logger.info('removing __consumer_offsets topic') + + admin.delete_topics(["__consumer_offsets"]) + + new_topic = NewTopic(name="__consumer_offsets", + num_partitions=partitions, + topic_configs={ + "cleanup.policy": "compact", + "retention.bytes": -1, + "retention.ms": -1, + }, + replication_factor=replication_factor) + logger.info('recreating __consumer_offsets topic') + admin.create_topics(new_topics=[new_topic]) + + +def main(): + import argparse + + def generate_options(): + parser = argparse.ArgumentParser( + description='Redpanda group recovery tool') + parser.add_argument('--cfg', + type=str, + help='config file path', + required=True) + parser.add_argument('-o', + "--offsets-path", + type=str, + default="./offsets") + parser.add_argument('-v', "--verbose", action="store_true") + parser.add_argument('-s', "--skip-query", action="store_true") + parser.add_argument('-p', "--target-partitions", type=int, default=16) + parser.add_argument('-e', "--execute", action="store_true") + parser.add_argument('-r', "--replication-factor", type=int, default=3) + + return parser + + parser = generate_options() + options, _ = parser.parse_known_args() + logging.basicConfig( + format="%(asctime)s %(name)-20s %(levelname)-8s %(message)s") + + if options.verbose: + logging.basicConfig(level="DEBUG") + logger.setLevel(level="DEBUG") + else: + logging.basicConfig(level="ERROR") + logger.setLevel(level="INFO") + + logger.info(f"starting group recovery tool with options: {options}") + cfg = read_config(options.cfg) + + logger.info(f"configuration: {cfg}") + + if not options.skip_query: + query_offsets(options.offsets_path, cfg) + + if options.execute: + recreate_topic(options.target_partitions, options.replication_factor, + cfg) + + seek_all(cfg, options.offsets_path, dry_run=not options.execute) + + +if __name__ == '__main__': + main() diff --git a/tools/consumer_offsets_recovery/requirements.txt b/tools/consumer_offsets_recovery/requirements.txt new file mode 100644 index 000000000000..ecc258802d08 --- /dev/null +++ b/tools/consumer_offsets_recovery/requirements.txt @@ -0,0 +1,2 @@ +kafka-python +jproperties From a6321520c99c79316ba1ab4c8102fd653eb73d2f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 21 Dec 2022 13:56:29 +0100 Subject: [PATCH 29/33] tests/dockerfile: added consumer group recovery tool to docker image Signed-off-by: Michal Maslanka (cherry picked from commit 28c5bae43e36bd56e1c7548f5ace13f973707be0) --- tests/docker/Dockerfile | 4 ++++ tests/docker/Dockerfile.dockerignore | 1 + 2 files changed, 5 insertions(+) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index cac65675c029..54ec9f2fcc88 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -182,4 +182,8 @@ RUN mkdir -p /opt/scripts && \ RUN mkdir -p /opt/scripts/offline_log_viewer COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer +RUN mkdir -p /opt/scripts/consumer_offsets_recovery +COPY --chown=0:0 tools/consumer_offsets_recovery /opt/scripts/consumer_offsets_recovery +RUN python3 -m pip install --force --no-cache-dir -r /opt/scripts/consumer_offsets_recovery/requirements.txt + CMD service ssh start && tail -f /dev/null diff --git a/tests/docker/Dockerfile.dockerignore b/tests/docker/Dockerfile.dockerignore index 8d0cc79a40a1..6690f25515e9 100644 --- a/tests/docker/Dockerfile.dockerignore +++ b/tests/docker/Dockerfile.dockerignore @@ -5,3 +5,4 @@ !tests/python !tests/go !tools/offline_log_viewer +!tools/consumer_offsets_recovery From 31ed4b6d5c1e2373f39dde75cd7978a34e7ea4ab Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 21 Dec 2022 13:56:55 +0100 Subject: [PATCH 30/33] tests: added test for consumer offsets partition change tool Signed-off-by: Michal Maslanka (cherry picked from commit 34e3ff21aeef72a85557269141063b7ab717d5a1) --- .../clients/consumer_offsets_recovery.py | 46 ++++++++ tests/rptest/test_suite_ec2.yml | 1 + .../consumer_group_recovery_tool_test.py | 111 ++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 tests/rptest/clients/consumer_offsets_recovery.py create mode 100644 tests/rptest/tests/consumer_group_recovery_tool_test.py diff --git a/tests/rptest/clients/consumer_offsets_recovery.py b/tests/rptest/clients/consumer_offsets_recovery.py new file mode 100644 index 000000000000..4ed80afbe219 --- /dev/null +++ b/tests/rptest/clients/consumer_offsets_recovery.py @@ -0,0 +1,46 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.redpanda import RedpandaService + + +class ConsumerOffsetsRecovery: + """ + Wrap tools/consumer_offsets_recovery for use in tests + """ + def __init__(self, redpanda: RedpandaService): + self._redpanda = redpanda + self._cfg_path = '/tmp/cgr.properties' + self._offsets_cache_path = '/tmp/offsets' + + def _cmd(self, partition_count, dry_run): + cmd = "python3 /opt/scripts/consumer_offsets_recovery/main.py" + + cmd += f" --cfg {self._cfg_path}" + cmd += f" --offsets {self._offsets_cache_path}" + cmd += f" -p {partition_count}" + if not dry_run: + cmd += " --execute" + return cmd + + def _render_config(self, node): + properties = {"bootstrap_servers": self._redpanda.brokers()} + content = "" + for k, v in properties.items(): + content += f"{k}={v}\n" + + node.account.create_file(self._cfg_path, content) + + def change_partition_count(self, partition_count, node, dry_run): + node.account.remove(self._offsets_cache_path, allow_fail=True) + node.account.mkdir(self._offsets_cache_path) + self._render_config(node) + cmd = self._cmd(partition_count, dry_run) + output = node.account.ssh_output(cmd, combine_stderr=True) + self._redpanda.logger.info(f"out: {output}") diff --git a/tests/rptest/test_suite_ec2.yml b/tests/rptest/test_suite_ec2.yml index 9001c171d560..e11ad789d86b 100644 --- a/tests/rptest/test_suite_ec2.yml +++ b/tests/rptest/test_suite_ec2.yml @@ -13,3 +13,4 @@ ec2: - tests/wasm_redpanda_failure_recovery_test.py # no wasm - tests/wasm_partition_movement_test.py # no wasm - tests/e2e_iam_role_test.py # use static credentials + - tests/consumer_group_recovery_tool_test.py # use script available in dockerfile diff --git a/tests/rptest/tests/consumer_group_recovery_tool_test.py b/tests/rptest/tests/consumer_group_recovery_tool_test.py new file mode 100644 index 000000000000..b8db226f7d82 --- /dev/null +++ b/tests/rptest/tests/consumer_group_recovery_tool_test.py @@ -0,0 +1,111 @@ +# Copyright 2020 Redpanda Data, Inc. +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.clients.consumer_offsets_recovery import ConsumerOffsetsRecovery +from rptest.services.cluster import cluster + +from rptest.clients.rpk import RpkException, RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.kafka_cli_consumer import KafkaCliConsumer +from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.rpk_producer import RpkProducer +from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.tests.redpanda_test import RedpandaTest +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize + + +class ConsumerOffsetsRecoveryToolTest(PreallocNodesTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + self.initial_partition_count = 3 + super(ConsumerOffsetsRecoveryToolTest, self).__init__( + test_ctx, + num_brokers=3, + *args, + # disable leader balancer to make sure that group will not be realoaded because of leadership changes + extra_rp_conf={ + "group_topic_partitions": self.initial_partition_count + }, + node_prealloc_count=1, + **kwargs) + + def describe_all_groups(self): + rpk = RpkTool(self.redpanda) + all_groups = {} + for g in rpk.group_list(): + gd = rpk.group_describe(g) + all_groups[gd.name] = {} + for p in gd.partitions: + all_groups[ + gd.name][f"{p.topic}/{p.partition}"] = p.current_offset + + return all_groups + + @cluster(num_nodes=4) + def test_consumer_offsets_partition_count_change(self): + topic = TopicSpec(partition_count=16, replication_factor=3) + self.client().create_topic([topic]) + msg_size = 1024 + msg_cnt = 10000 + + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + topic.name, + msg_size, + msg_cnt, + custom_node=self.preallocated_nodes) + + producer.start(clean=False) + + wait_until(lambda: producer.produce_status.acked > 10, + timeout_sec=30, + backoff_sec=0.5) + + consumer = KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + topic.name, + msg_size, + readers=3, + nodes=self.preallocated_nodes) + consumer.start(clean=False) + + producer.wait() + consumer.wait() + + assert consumer.consumer_status.validator.valid_reads >= producer.produce_status.acked + + groups_pre_migration = self.describe_all_groups() + + cgr = ConsumerOffsetsRecovery(self.redpanda) + + # execute recovery tool, ask for 16 partitions in consumer offsets topic + # + # this is dry run, expect that partition count didn't change + cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=True) + rpk = RpkTool(self.redpanda) + tp_desc = list(rpk.describe_topic("__consumer_offsets")) + + # check if topic partition count changed + assert len(tp_desc) == self.initial_partition_count + + # now allow the tool to execute + cgr.change_partition_count(16, self.redpanda.nodes[0], dry_run=False) + + # check if topic partition count changed + wait_until( + lambda: len(list(rpk.describe_topic("__consumer_offsets"))) == 16, + 20) + + groups_post_migration = self.describe_all_groups() + + assert groups_pre_migration == groups_post_migration From 37ff0bd251c72071e1e4c333ee5970e03e948655 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 4 Jan 2023 08:43:08 +0100 Subject: [PATCH 31/33] tests: removed debugging sleep from offline_log_viewer.py Signed-off-by: Michal Maslanka (cherry picked from commit 2f476fa4de0270b50f979a0180c08300554b699c) --- tests/rptest/clients/offline_log_viewer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py index c2e1938869d5..75df420b9b01 100644 --- a/tests/rptest/clients/offline_log_viewer.py +++ b/tests/rptest/clients/offline_log_viewer.py @@ -35,8 +35,6 @@ def _json_cmd(self, node, suffix): except json.decoder.JSONDecodeError: # Log the bad output before re-raising self._redpanda.logger.error(f"Invalid JSON output: {json_out}") - import time - time.sleep(3600) raise def read_controller(self, node): From 2a8b6de3ca2718ea612887dca5e209a3730f3437 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 16 Dec 2022 13:14:09 +0100 Subject: [PATCH 32/33] tools: fixed incorrect parsing of some controller commands Signed-off-by: Michal Maslanka (cherry picked from commit 428a4884b15171f10e00bf7a8cbfb1cdbd2fdee7) --- tools/offline_log_viewer/controller.py | 117 ++++++++++++++++--------- tools/offline_log_viewer/reader.py | 12 +++ 2 files changed, 89 insertions(+), 40 deletions(-) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index b68d53f89a04..59016dd01dc8 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -9,8 +9,16 @@ logger = logging.getLogger('controller') +def read_remote_topic_properties_serde(rdr: Reader): + return rdr.read_envelope( + lambda rdr, _: { + "remote_revision": rdr.read_int64(), + "remote_partition_count": rdr.read_int32(), + }) + + def read_topic_properties_serde(rdr: Reader, version): - assert 0 <= version <= 1 + topic_properties = { 'compression': rdr.read_optional(Reader.read_serde_enum), 'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum), @@ -24,18 +32,26 @@ def read_topic_properties_serde(rdr: Reader, version): } # introduced for remote read replicas - if version == 1: + if version >= 1: topic_properties |= { 'read_replica': rdr.read_optional(Reader.read_bool), 'read_replica_bucket': rdr.read_optional(Reader.read_string), 'remote_topic_properties': - rdr.read_optional( - lambda r: { - 'remote_revision': r.read_int64(), - 'remote_partition_count': r.read_int32() - }), + rdr.read_optional(read_remote_topic_properties_serde) + } + if version >= 2: + topic_properties |= { + 'batch_max_bytes': rdr.read_optional(Reader.read_uint32), + } + + if version >= 3: + topic_properties |= { + 'retention_local_target_bytes': + rdr.read_tristate(Reader.read_uint64), + 'retention_local_target_ms': rdr.read_tristate(Reader.read_uint64), + 'remote_delete': rdr.read_bool() } return topic_properties @@ -56,22 +72,22 @@ def read_topic_assignment_serde(rdr: Reader): rdr.read_int16(), 'properties': rdr.read_envelope(read_topic_properties_serde, - max_version=1), - }), + max_version=3), + }, 1), 'assignments': rdr.read_serde_vector(lambda r: r.read_envelope( - lambda r, _: { + lambda ir, _: { 'group': - r.read_int64(), + ir.read_int64(), 'id': - r.read_int32(), + ir.read_int32(), 'replicas': - r.read_serde_vector(lambda r: { - 'node_id': r.read_int32(), - 'shard': r.read_uint32(), + ir.read_serde_vector(lambda iir: { + 'node_id': iir.read_int32(), + 'shard': iir.read_uint32(), }), })), - }) + }, 1) def read_inc_update_op_serde(rdr: Reader): @@ -149,12 +165,16 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): cmd['type'] = rdr.read_int8() if cmd['type'] == 0: cmd['type_string'] = 'create_topic' - # FIXME: this fails to read messages written on tip of dev ead54dda - #cmd |= read_topic_assignment_serde(rdr) + cmd['key'] = {} + cmd['key']['namespace'] = k_rdr.read_string() + cmd['key']['topic'] = k_rdr.read_string() + cmd |= read_topic_assignment_serde(rdr) elif cmd['type'] == 1: cmd['type_string'] = 'delete_topic' cmd['namespace'] = rdr.read_string() cmd['topic'] = rdr.read_string() + k_rdr.read_string() + k_rdr.read_string() elif cmd['type'] == 2: cmd['type_string'] = 'update_partitions' cmd['namespace'] = k_rdr.read_string() @@ -163,6 +183,7 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) elif cmd['type'] == 3: cmd['type_string'] = 'finish_partitions_update' + cmd['namespace'] = k_rdr.read_string() cmd['topic'] = k_rdr.read_string() cmd['partition'] = k_rdr.read_int32() cmd['replicas'] = rdr.read_serde_vector(read_broker_shard) @@ -189,6 +210,9 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader): }) elif cmd['type'] == 7: cmd['type_string'] = 'cancel_moving_partition_replicas' + cmd['namespace'] = k_rdr.read_string() + cmd['topic'] = k_rdr.read_string() + cmd['partition'] = k_rdr.read_int32() cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()}) return cmd @@ -506,6 +530,7 @@ def decode_action_t(v): def decode_feature_command_serde(k_rdr: Reader, rdr: Reader): cmd = {'type': rdr.read_int8()} + rdr.read_int8() if cmd['type'] == 0: cmd['type_name'] = 'feature_update' cmd |= k_rdr.read_envelope( @@ -545,6 +570,7 @@ def decode_feature_update_action(r): cmd = {} cmd['type'] = rdr.read_int8() + rdr.read_int8() if cmd['type'] == 0: cmd['type_name'] = 'feature_update' cmd['v'] = k_rdr.read_int8() @@ -560,49 +586,60 @@ def decode_node_management_command(k_rdr: Reader, rdr: Reader): 'type_string': 'decommission_node', 'node_id': k_rdr.read_int32() } + rdr.read_int8() elif cmd['type'] == 1: cmd |= { 'type_string': 'recommission_node', 'node_id': k_rdr.read_int32() } + rdr.read_int8() elif cmd['type'] == 2: cmd |= { 'type_string': 'finish_reallocations', 'node_id': k_rdr.read_int32() } + rdr.read_int8() elif cmd['type'] == 3: cmd |= { 'type_string': 'maintenance_mode', 'node_id': k_rdr.read_int32(), 'enabled': rdr.read_bool() } + elif cmd['type'] == 4: + cmd |= { + 'type_string': 'register_node_uuid', + 'uuid': k_rdr.read_uuid(), + 'id': rdr.read_optional(lambda r: r.read_int32()) + } return cmd -def decode_cluster_bootstrap_command(record): - def decode_user_and_credential(r): - user_cred = {} - user_cred['username'] = r.read_string() - cmd['salt'] = obfuscate_secret(r.read_iobuf().hex()) - cmd['server_key'] = obfuscate_secret(r.read_iobuf().hex()) - cmd['stored_key'] = obfuscate_secret(r.read_iobuf().hex()) +def decode_user_and_credential(rdr: Reader): + return rdr.read_envelope( + lambda r, _: { + 'username': r.read_string(), + 'salt': obfuscate_secret(r.read_iobuf().hex()), + 'server_key': obfuscate_secret(r.read_iobuf().hex()), + 'stored_key': obfuscate_secret(r.read_iobuf().hex()), + }) + - rdr = Reader(BytesIO(record.value)) - k_rdr = Reader(BytesIO(record.key)) +def decode_cluster_bootstrap_command(k_rdr, rdr): cmd = {} - rdr.read_envelope() + rdr.skip(1) + k_rdr.read_int8() cmd['type'] = rdr.read_int8() - if cmd['type'] == 0 or cmd['type'] == 5: # TODO: remove 5 + if cmd['type'] == 0: cmd['type_name'] = 'bootstrap_cluster' - cmd['key'] = k_rdr.read_int8() - cmd['v'] = rdr.read_int8() - if cmd['v'] == 0: - cmd['cluster_uuid'] = ''.join([ - f'{rdr.read_uint8():02x}' + ('-' if k in [3, 5, 7, 9] else '') - for k in range(16) - ]) - cmd['bootstrap_user_cred'] = rdr.read_optional( - decode_user_and_credential) + cmd |= rdr.read_envelope( + lambda r, _: { + 'cluster_uuid': + r.read_uuid(), + 'bootstrap_user_cred': + r.read_optional(decode_user_and_credential), + 'nodes_by_uuid': + r.read_serde_map(Reader.read_uuid, Reader.read_int32) + }) return cmd @@ -657,9 +694,8 @@ def decode_record(batch, record, bin_dump: bool): ret['data'] = decode_adl_or_serde(k_rdr, rdr, decode_node_management_command, decode_node_management_command) - # TODO: serde for cluster_bootstrap_cmd if batch.type == BatchType.cluster_bootstrap_cmd: - ret['data'] = decode_cluster_bootstrap_command(record) + ret['data'] = decode_cluster_bootstrap_command(k_rdr, rdr) k_unread = k_rdr.remaining() v_unread = rdr.remaining() @@ -680,4 +716,5 @@ def decode(self, bin_dump: bool): s = Segment(path) for b in s: for r in b: + dec = decode_record(b, r, bin_dump) self.records.append(decode_record(b, r, bin_dump)) diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index 46bc7762119a..95a8c304206d 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -149,6 +149,12 @@ def read_bytes(self, length): def peek(self, length): return self.stream.peek(length) + def read_uuid(self): + return ''.join([ + f'{self.read_uint8():02x}' + ('-' if k in [3, 5, 7, 9] else '') + for k in range(16) + ]) + def peek_int8(self): # peek returns the whole memory buffer, slice is needed to conform to struct format string return struct.unpack(' Date: Thu, 5 Jan 2023 17:07:08 -0800 Subject: [PATCH 33/33] Revert "k/group_metadata: added support for v1 offset metadata value" This reverts commit 64217049324e755ecafb7e17d9caf134fa418d69. This is not being backported. --- src/v/kafka/server/group_metadata.cc | 21 ++++++------------- src/v/kafka/server/group_metadata.h | 6 +----- .../group_metadata_serialization_test.cc | 11 ---------- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index 6971a3350e9b..95dfe3eaf408 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -201,32 +201,26 @@ void offset_metadata_value::encode( response_writer& writer, const offset_metadata_value& v) { writer.write(v.version); writer.write(v.offset); - if (v.version >= group_metadata_version{3}) { - writer.write(v.leader_epoch); - } + writer.write(v.leader_epoch); writer.write(v.metadata); writer.write(v.commit_timestamp); - if (v.version == group_metadata_version{1}) { - writer.write(v.expiry_timestamp); - } } offset_metadata_value offset_metadata_value::decode(request_reader& reader) { offset_metadata_value ret; auto version = read_metadata_version(reader); validate_version_range( - version, "offset_metadata_value", offset_metadata_value::latest_version); + version, "offset_metadata_value", offset_metadata_value::version); - ret.version = version; ret.offset = model::offset(reader.read_int64()); if (version >= group_metadata_version{3}) { ret.leader_epoch = kafka::leader_epoch(reader.read_int32()); } ret.metadata = reader.read_string(); ret.commit_timestamp = model::timestamp(reader.read_int64()); - // read expiry_timestamp only present in version 1 + // read and ignore expiry_timestamp only present in version 1 if (version == group_metadata_version{1}) { - ret.expiry_timestamp = model::timestamp(reader.read_int64()); + reader.read_int64(); } return ret; @@ -574,14 +568,11 @@ std::ostream& operator<<(std::ostream& o, const offset_metadata_key& v) { std::ostream& operator<<(std::ostream& o, const offset_metadata_value& v) { fmt::print( o, - "{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestamp: {}, " - "expiry_timestamp: {}, version: {}}}", + "{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestap: {}}}", v.offset, v.leader_epoch, v.metadata, - v.commit_timestamp, - v.expiry_timestamp, - v.version); + v.commit_timestamp); return o; } namespace old { diff --git a/src/v/kafka/server/group_metadata.h b/src/v/kafka/server/group_metadata.h index bb0b7dab3300..f238fb6f34fe 100644 --- a/src/v/kafka/server/group_metadata.h +++ b/src/v/kafka/server/group_metadata.h @@ -229,15 +229,11 @@ struct offset_metadata_key { * The value type for offset commit records, consistent with Kafka format */ struct offset_metadata_value { - static constexpr group_metadata_version latest_version{3}; - group_metadata_version version = latest_version; + static constexpr group_metadata_version version{3}; model::offset offset; - // present only in version >= 3 kafka::leader_epoch leader_epoch = invalid_leader_epoch; ss::sstring metadata; model::timestamp commit_timestamp; - // present only in version 1 - model::timestamp expiry_timestamp{-1}; friend std::ostream& operator<<(std::ostream&, const offset_metadata_value&); diff --git a/src/v/kafka/server/tests/group_metadata_serialization_test.cc b/src/v/kafka/server/tests/group_metadata_serialization_test.cc index ebdd0d19424a..c121d4383aa0 100644 --- a/src/v/kafka/server/tests/group_metadata_serialization_test.cc +++ b/src/v/kafka/server/tests/group_metadata_serialization_test.cc @@ -112,18 +112,7 @@ FIXTURE_TEST(metadata_rt_test, fixture) { offset_key.partition = random_named_int(); roundtrip_test(offset_key); - // version 1 - kafka::offset_metadata_value offset_md_v1; - offset_md_v1.version = kafka::group_metadata_version(1); - offset_md_v1.offset = random_named_int(); - offset_md_v1.metadata = random_named_string(); - offset_md_v1.commit_timestamp = model::timestamp::now(); - offset_md_v1.expiry_timestamp = model::timestamp::now(); - - roundtrip_test(offset_md_v1); - - // version 3 kafka::offset_metadata_value offset_md; offset_md.offset = random_named_int();