Skip to content

Commit

Permalink
tools/offline_log_viewer: added serde decoding for topic_commands
Browse files Browse the repository at this point in the history
the new code tries to mimic the structure of the adl decoding functions.
the new code is aware of the version number embedded in the binary stream and will skip the field if the version is not supported, while reporting the error.
  • Loading branch information
andijcr authored and jcsp committed Dec 15, 2022
1 parent 4bbd0c5 commit 021317a
Showing 1 changed file with 200 additions and 5 deletions.
205 changes: 200 additions & 5 deletions tools/offline_log_viewer/controller.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from io import BufferedReader, BytesIO
from model import *
from reader import Reader
Expand All @@ -6,9 +7,190 @@
import datetime


def decode_topic_command(record):
rdr = Reader(BufferedReader(BytesIO(record.value)))
k_rdr = Reader(BytesIO(record.key))
def read_topic_properties_serde(rdr: Reader, version):
assert 0 <= version <= 1
topic_properties = {
'compression': rdr.read_optional(Reader.read_serde_enum),
'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum),
'compaction_strategy': rdr.read_optional(Reader.read_serde_enum),
'timestamp_type': rdr.read_optional(Reader.read_serde_enum),
'segment_size': rdr.read_optional(Reader.read_uint64),
'retention_bytes': rdr.read_tristate(Reader.read_uint64),
'retention_duration': rdr.read_tristate(Reader.read_uint64),
'recovery': rdr.read_optional(Reader.read_bool),
'shadow_indexing': rdr.read_optional(Reader.read_serde_enum),
}

# introduced for remote read replicas
if version == 1:
topic_properties |= {
'read_replica':
rdr.read_optional(Reader.read_bool),
'read_replica_bucket':
rdr.read_optional(Reader.read_string),
'remote_topic_properties':
rdr.read_optional(
lambda r: {
'remote_revision': r.read_int64(),
'remote_partition_count': r.read_int32()
}),
}
return topic_properties


def read_topic_assignment_serde(rdr: Reader):
return rdr.read_envelope(
lambda rdr, _: {
'cfg':
rdr.read_envelope(
lambda rdr, _: {
'namespace':
rdr.read_string(),
'topic':
rdr.read_string(),
'partitions':
rdr.read_int32(),
'replication_factor':
rdr.read_int16(),
'properties':
rdr.read_envelope(read_topic_properties_serde,
max_version=1),
}),
'assignments':
rdr.read_serde_vector(lambda r: r.read_envelope(
lambda r, _: {
'group':
r.read_int64(),
'id':
r.read_int32(),
'replicas':
r.read_serde_vector(lambda r: {
'node_id': r.read_int32(),
'shard': r.read_uint32(),
}),
})),
})


def read_inc_update_op_serde(rdr: Reader):
v = rdr.read_serde_enum()
if -1 < v < 3:
return ['none', 'set', 'remove'][v]
return 'error'


def read_property_update_serde(rdr: Reader, type_reader):
return rdr.read_envelope(lambda rdr, _: {
'value': type_reader(rdr),
'op': read_inc_update_op_serde(rdr),
})


def read_incremental_topic_update_serde(rdr: Reader):
return rdr.read_envelope(
lambda rdr, _: {
'compression':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_serde_enum)),
'cleanup_policy_bitflags':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_serde_enum)),
'compaction_strategy':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_serde_enum)),
'timestamp_type':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_serde_enum)),
'segment_size':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_uint64)),
'retention_bytes':
read_property_update_serde(
rdr, lambda r: r.read_tristate(Reader.read_uint64)),
'retention_duration':
read_property_update_serde(
rdr, lambda r: r.read_tristate(Reader.read_int64)),
'shadow_indexing':
read_property_update_serde(
rdr, lambda r: r.read_optional(Reader.read_serde_enum)),
})


def read_create_partitions_serde(rdr: Reader):
return rdr.read_envelope(
lambda rdr, _: {
'cfg':
rdr.read_envelope(
lambda rdr, _: {
'namespace':
rdr.read_string(),
'topic':
rdr.read_string(),
'new_total_partition_count':
rdr.read_int32(),
'custom_assignments':
rdr.read_serde_vector(lambda r: r.read_serde_vector(
Reader.read_int32)),
}),
'assignments':
rdr.read_serde_vector(lambda r: r.read_envelope(
lambda rdr, _: {
'group': rdr.read_int64(),
'id': rdr.read_int32(),
'replicas': rdr.read_serde_vector(read_broker_shard),
})),
})


def decode_topic_command_serde(k_rdr: Reader, rdr: Reader):
cmd = {}
cmd['type'] = rdr.read_int8()
if cmd['type'] == 0:
cmd['type_string'] = 'create_topic'
cmd |= read_topic_assignment_serde(rdr)
elif cmd['type'] == 1:
cmd['type_string'] = 'delete_topic'
cmd['namespace'] = rdr.read_string()
cmd['topic'] = rdr.read_string()
elif cmd['type'] == 2:
cmd['type_string'] = 'update_partitions'
cmd['namespace'] = k_rdr.read_string()
cmd['topic'] = k_rdr.read_string()
cmd['partition'] = k_rdr.read_int32()
cmd['replicas'] = rdr.read_serde_vector(read_broker_shard)
elif cmd['type'] == 3:
cmd['type_string'] = 'finish_partitions_update'
cmd['topic'] = k_rdr.read_string()
cmd['partition'] = k_rdr.read_int32()
cmd['replicas'] = rdr.read_serde_vector(read_broker_shard)
elif cmd['type'] == 4:
cmd['type_string'] = 'update_topic_properties'
cmd['namespace'] = k_rdr.read_string()
cmd['topic'] = k_rdr.read_string()
cmd['update'] = read_incremental_topic_update_serde(rdr)
elif cmd['type'] == 5:
cmd['type_string'] = 'create_partitions'
cmd |= read_create_partitions_serde(rdr)
elif cmd['type'] == 6:
cmd['type_string'] = 'create_non_replicable_topic'
cmd['topic'] = k_rdr.read_envelope(
lambda k_rdr, _: {
'source': {
'namespace': k_rdr.read_string(),
'topic': k_rdr.read_string(),
},
'name': {
'namespace': k_rdr.read_string(),
'topic': k_rdr.read_string(),
},
})
elif cmd['type'] == 7:
cmd['type_string'] = 'cancel_moving_partition_replicas'
cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()})
return cmd


def decode_topic_command_adl(k_rdr: Reader, rdr: Reader):
cmd = {}
cmd['type'] = rdr.read_int8()
if cmd['type'] == 0:
Expand Down Expand Up @@ -68,6 +250,19 @@ def decode_topic_command(record):
return cmd


def decode_topic_command(record):
rdr = Reader(BufferedReader(BytesIO(record.value)))
k_rdr = Reader(BytesIO(record.key))
either_ald_or_serde = rdr.peek_int8()
assert either_ald_or_serde >= -1, "unsupported serialization format"
if either_ald_or_serde == -1:
# serde encoding flag, consume it and proceed
rdr.skip(1)
return decode_topic_command_serde(k_rdr, rdr)
else:
return decode_topic_command_adl(k_rdr, rdr)


def decode_config(record):
rdr = Reader(BytesIO(record.value))
return read_raft_config(rdr)
Expand Down Expand Up @@ -114,7 +309,6 @@ def decode_acl_command(record):


def read_config_kv(reader):

k = reader.read_string()
v = reader.read_string()
return (k, v)
Expand Down Expand Up @@ -229,7 +423,8 @@ def decode_record(batch, record, bin_dump: bool):
if batch.type == BatchType.raft_configuration:
ret['data'] = decode_config(record)
if batch.type == BatchType.topic_management_cmd:
ret['data'] = decode_topic_command(record)
ret['data'] = decode_adl_or_serde(record, decode_topic_command_adl,
decode_topic_command_serde)
if batch.type == BatchType.user_management_cmd:
ret['data'] = decode_user_command(record)
if batch.type == BatchType.acl_management_cmd:
Expand Down

0 comments on commit 021317a

Please sign in to comment.