Skip to content

Commit

Permalink
tests/clients: add permissive mode for rpk describe_topic
Browse files Browse the repository at this point in the history
When using this function to query leadership for partitions,
it is not necessary to exclude partitions just because
they failed to get some metadata from the leader (e.g. NOT_LEADER
errors for offets during transient leaderhsip change).

Add a `tolerant` flag that permits returning partially populated
RpkPartition results that just show the leader of a partition.
  • Loading branch information
jcsp committed Aug 8, 2022
1 parent 1432357 commit d50400a
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,26 +226,54 @@ def produce(self,
assert m, f"Reported offset not found in: {out}"
return int(m.group(1))

def describe_topic(self, topic):
def describe_topic(self, topic: str, tolerant: bool = False):
"""
By default this will omit any partitions which do not have full
metadata in the response: this means that if we are unlucky and a
partition returns NOT_LEADER due to a leadership transfer while
we query offsets, it will be missing. To be more forgiving, pass
tolerant=true
:param topic: topic name
:param tolerant: if true, RpkPartition results may be included with some
fields set to None, as long as the leader field is present.
:return:
"""
cmd = ['describe', topic, '-p']
output = self._run_topic(cmd)
if "not found" in output:
raise Exception(f"Topic not found: {topic}")
lines = output.splitlines()
lines = output.splitlines()[1:]

def partition_line(line):
m = re.match(
r" *(?P<id>\d+) +(?P<leader>\d+) +(?P<epoch>\d+) +\[(?P<replicas>.+?)\] +(?P<logstart>\d+?) +(?P<hw>\d+) *",
line)
if m == None:
if m is None and tolerant:
m = re.match(r" *(?P<id>\d+) +(?P<leader>\d+) .*", line)
if m is None:
self._redpanda.logger.info(f"No match on '{line}'")
return None

return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=None,
replicas=None,
hw=None,
start_offset=None)

elif m is None:
return None
replicas = list(map(lambda r: int(r), m.group('replicas').split()))
return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=int(m.group('epoch')),
replicas=replicas,
hw=int(m.group('hw')),
start_offset=int(m.group("logstart")))
elif m:
replicas = list(
map(lambda r: int(r),
m.group('replicas').split()))
return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=int(m.group('epoch')),
replicas=replicas,
hw=int(m.group('hw')),
start_offset=int(m.group("logstart")))

return filter(None, map(partition_line, lines))

Expand Down

0 comments on commit d50400a

Please sign in to comment.