Skip to content

Commit

Permalink
Merge pull request #7814 from mmaslankaprv/improved-offline-log-viewer
Browse files Browse the repository at this point in the history
tools: fixed incorrect parsing of some controller commands
  • Loading branch information
jcsp committed Dec 20, 2022
2 parents 85cc350 + 428a488 commit b211fb0
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 40 deletions.
117 changes: 77 additions & 40 deletions tools/offline_log_viewer/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
logger = logging.getLogger('controller')


def read_remote_topic_properties_serde(rdr: Reader):
return rdr.read_envelope(
lambda rdr, _: {
"remote_revision": rdr.read_int64(),
"remote_partition_count": rdr.read_int32(),
})


def read_topic_properties_serde(rdr: Reader, version):
assert 0 <= version <= 1

topic_properties = {
'compression': rdr.read_optional(Reader.read_serde_enum),
'cleanup_policy_bitflags': rdr.read_optional(Reader.read_serde_enum),
Expand All @@ -24,18 +32,26 @@ def read_topic_properties_serde(rdr: Reader, version):
}

# introduced for remote read replicas
if version == 1:
if version >= 1:
topic_properties |= {
'read_replica':
rdr.read_optional(Reader.read_bool),
'read_replica_bucket':
rdr.read_optional(Reader.read_string),
'remote_topic_properties':
rdr.read_optional(
lambda r: {
'remote_revision': r.read_int64(),
'remote_partition_count': r.read_int32()
}),
rdr.read_optional(read_remote_topic_properties_serde)
}
if version >= 2:
topic_properties |= {
'batch_max_bytes': rdr.read_optional(Reader.read_uint32),
}

if version >= 3:
topic_properties |= {
'retention_local_target_bytes':
rdr.read_tristate(Reader.read_uint64),
'retention_local_target_ms': rdr.read_tristate(Reader.read_uint64),
'remote_delete': rdr.read_bool()
}
return topic_properties

Expand All @@ -56,22 +72,22 @@ def read_topic_assignment_serde(rdr: Reader):
rdr.read_int16(),
'properties':
rdr.read_envelope(read_topic_properties_serde,
max_version=1),
}),
max_version=3),
}, 1),
'assignments':
rdr.read_serde_vector(lambda r: r.read_envelope(
lambda r, _: {
lambda ir, _: {
'group':
r.read_int64(),
ir.read_int64(),
'id':
r.read_int32(),
ir.read_int32(),
'replicas':
r.read_serde_vector(lambda r: {
'node_id': r.read_int32(),
'shard': r.read_uint32(),
ir.read_serde_vector(lambda iir: {
'node_id': iir.read_int32(),
'shard': iir.read_uint32(),
}),
})),
})
}, 1)


def read_inc_update_op_serde(rdr: Reader):
Expand Down Expand Up @@ -149,12 +165,16 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader):
cmd['type'] = rdr.read_int8()
if cmd['type'] == 0:
cmd['type_string'] = 'create_topic'
# FIXME: this fails to read messages written on tip of dev ead54dda
#cmd |= read_topic_assignment_serde(rdr)
cmd['key'] = {}
cmd['key']['namespace'] = k_rdr.read_string()
cmd['key']['topic'] = k_rdr.read_string()
cmd |= read_topic_assignment_serde(rdr)
elif cmd['type'] == 1:
cmd['type_string'] = 'delete_topic'
cmd['namespace'] = rdr.read_string()
cmd['topic'] = rdr.read_string()
k_rdr.read_string()
k_rdr.read_string()
elif cmd['type'] == 2:
cmd['type_string'] = 'update_partitions'
cmd['namespace'] = k_rdr.read_string()
Expand All @@ -163,6 +183,7 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader):
cmd['replicas'] = rdr.read_serde_vector(read_broker_shard)
elif cmd['type'] == 3:
cmd['type_string'] = 'finish_partitions_update'
cmd['namespace'] = k_rdr.read_string()
cmd['topic'] = k_rdr.read_string()
cmd['partition'] = k_rdr.read_int32()
cmd['replicas'] = rdr.read_serde_vector(read_broker_shard)
Expand All @@ -189,6 +210,9 @@ def decode_topic_command_serde(k_rdr: Reader, rdr: Reader):
})
elif cmd['type'] == 7:
cmd['type_string'] = 'cancel_moving_partition_replicas'
cmd['namespace'] = k_rdr.read_string()
cmd['topic'] = k_rdr.read_string()
cmd['partition'] = k_rdr.read_int32()
cmd |= rdr.read_envelope(lambda rdr, _: {'force': rdr.read_bool()})
return cmd

Expand Down Expand Up @@ -506,6 +530,7 @@ def decode_action_t(v):

def decode_feature_command_serde(k_rdr: Reader, rdr: Reader):
cmd = {'type': rdr.read_int8()}
rdr.read_int8()
if cmd['type'] == 0:
cmd['type_name'] = 'feature_update'
cmd |= k_rdr.read_envelope(
Expand Down Expand Up @@ -545,6 +570,7 @@ def decode_feature_update_action(r):

cmd = {}
cmd['type'] = rdr.read_int8()
rdr.read_int8()
if cmd['type'] == 0:
cmd['type_name'] = 'feature_update'
cmd['v'] = k_rdr.read_int8()
Expand All @@ -560,49 +586,60 @@ def decode_node_management_command(k_rdr: Reader, rdr: Reader):
'type_string': 'decommission_node',
'node_id': k_rdr.read_int32()
}
rdr.read_int8()
elif cmd['type'] == 1:
cmd |= {
'type_string': 'recommission_node',
'node_id': k_rdr.read_int32()
}
rdr.read_int8()
elif cmd['type'] == 2:
cmd |= {
'type_string': 'finish_reallocations',
'node_id': k_rdr.read_int32()
}
rdr.read_int8()
elif cmd['type'] == 3:
cmd |= {
'type_string': 'maintenance_mode',
'node_id': k_rdr.read_int32(),
'enabled': rdr.read_bool()
}
elif cmd['type'] == 4:
cmd |= {
'type_string': 'register_node_uuid',
'uuid': k_rdr.read_uuid(),
'id': rdr.read_optional(lambda r: r.read_int32())
}
return cmd


def decode_cluster_bootstrap_command(record):
def decode_user_and_credential(r):
user_cred = {}
user_cred['username'] = r.read_string()
cmd['salt'] = obfuscate_secret(r.read_iobuf().hex())
cmd['server_key'] = obfuscate_secret(r.read_iobuf().hex())
cmd['stored_key'] = obfuscate_secret(r.read_iobuf().hex())
def decode_user_and_credential(rdr: Reader):
return rdr.read_envelope(
lambda r, _: {
'username': r.read_string(),
'salt': obfuscate_secret(r.read_iobuf().hex()),
'server_key': obfuscate_secret(r.read_iobuf().hex()),
'stored_key': obfuscate_secret(r.read_iobuf().hex()),
})


rdr = Reader(BytesIO(record.value))
k_rdr = Reader(BytesIO(record.key))
def decode_cluster_bootstrap_command(k_rdr, rdr):
cmd = {}
rdr.read_envelope()
rdr.skip(1)
k_rdr.read_int8()
cmd['type'] = rdr.read_int8()
if cmd['type'] == 0 or cmd['type'] == 5: # TODO: remove 5
if cmd['type'] == 0:
cmd['type_name'] = 'bootstrap_cluster'
cmd['key'] = k_rdr.read_int8()
cmd['v'] = rdr.read_int8()
if cmd['v'] == 0:
cmd['cluster_uuid'] = ''.join([
f'{rdr.read_uint8():02x}' + ('-' if k in [3, 5, 7, 9] else '')
for k in range(16)
])
cmd['bootstrap_user_cred'] = rdr.read_optional(
decode_user_and_credential)
cmd |= rdr.read_envelope(
lambda r, _: {
'cluster_uuid':
r.read_uuid(),
'bootstrap_user_cred':
r.read_optional(decode_user_and_credential),
'nodes_by_uuid':
r.read_serde_map(Reader.read_uuid, Reader.read_int32)
})

return cmd

Expand Down Expand Up @@ -657,9 +694,8 @@ def decode_record(batch, record, bin_dump: bool):
ret['data'] = decode_adl_or_serde(k_rdr, rdr,
decode_node_management_command,
decode_node_management_command)
# TODO: serde for cluster_bootstrap_cmd
if batch.type == BatchType.cluster_bootstrap_cmd:
ret['data'] = decode_cluster_bootstrap_command(record)
ret['data'] = decode_cluster_bootstrap_command(k_rdr, rdr)

k_unread = k_rdr.remaining()
v_unread = rdr.remaining()
Expand All @@ -680,4 +716,5 @@ def decode(self, bin_dump: bool):
s = Segment(path)
for b in s:
for r in b:
dec = decode_record(b, r, bin_dump)
self.records.append(decode_record(b, r, bin_dump))
12 changes: 12 additions & 0 deletions tools/offline_log_viewer/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def read_bytes(self, length):
def peek(self, length):
return self.stream.peek(length)

def read_uuid(self):
return ''.join([
f'{self.read_uint8():02x}' + ('-' if k in [3, 5, 7, 9] else '')
for k in range(16)
])

def peek_int8(self):
# peek returns the whole memory buffer, slice is needed to conform to struct format string
return struct.unpack('<b', self.stream.peek(1)[:1])[0]
Expand All @@ -133,3 +139,9 @@ def skip(self, length):

def remaining(self):
return len(self.stream.raw.getbuffer()) - self.stream.tell()

def read_serde_map(self, k_reader, v_reader):
ret = {}
for _ in range(self.read_uint32()):
ret[k_reader(self)] = v_reader(self)
return ret

0 comments on commit b211fb0

Please sign in to comment.