Skip to content

Commit

Permalink
Merge pull request #5757 from abhijat/wait-for-topic-metadata-test-fa…
Browse files Browse the repository at this point in the history
…st-3

cloud_storage/tests: wait for partition metadata during topic recovery tests
  • Loading branch information
abhijat committed Aug 12, 2022
2 parents 6883b90 + d94b00a commit 97e8076
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
2 changes: 1 addition & 1 deletion tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def _execute(self, cmd, stdin=None, timeout=None):
p.kill()
raise RpkException(f"command {' '.join(cmd)} timed out")

self._redpanda.logger.debug(output)
self._redpanda.logger.debug(f'\n{output}')

if p.returncode:
self._redpanda.logger.error(error)
Expand Down
45 changes: 35 additions & 10 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,42 @@ def _validate_partition_last_offset(self):

def _produce_and_verify(self, topic_spec):
"""Try to produce to the topic. The method produces data to the topic and
checks that high watermark advanced."""
old_hw = None
new_hw = None
for partition in self._rpk.describe_topic(topic_spec.name):
old_hw = partition.high_watermark
assert old_hw is not None
checks that high watermark advanced. Wait for partition metadata to appear
in case of a recent leader election"""

# Utility to capture the watermark using wait_until as soon as it is not None.
class PartitionState:
def __init__(self, rpk, topic_name):
self.rpk = rpk
self.topic_name = topic_name
self.hwm = None

def watermark_is_present(self):
for partition_info in self.rpk.describe_topic(self.topic_name):
self.hwm = partition_info.high_watermark
return self.hwm is not None

old_state = PartitionState(self._rpk, topic_spec.name)
wait_until(
lambda: old_state.watermark_is_present(),
timeout_sec=60,
backoff_sec=1,
err_msg=
f'failed to get high watermark before produce for {topic_spec}')

self._kafka_tools.produce(topic_spec.name, 10000, 1024)
for topic in self._rpk.describe_topic(topic_spec.name):
new_hw = topic.high_watermark
assert new_hw is not None
assert old_hw != new_hw

new_state = PartitionState(self._rpk, topic_spec.name)
wait_until(
lambda: new_state.watermark_is_present(),
timeout_sec=60,
backoff_sec=1,
err_msg=
f'failed to get high watermark after produce for {topic_spec}')

assert old_state.hwm != new_state.hwm, \
f'old_hw {old_state.hwm} unexpectedly same as new_hw {new_state.hwm} ' \
f'for topic spec: {topic_spec}'

def _list_objects(self):
"""Return list of all topics in the bucket (only names)"""
Expand Down

0 comments on commit 97e8076

Please sign in to comment.