diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 10d3943b54a5..88607f3c8df8 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -13,6 +13,7 @@ import random import ducktape.errors from typing import Optional +import requests from ducktape.mark import parametrize from ducktape.utils.util import wait_until @@ -123,41 +124,48 @@ def _expect_available(self): self.logger.info("Cluster is available as expected") def _transfer_leadership(self, admin: Admin, namespace: str, topic: str, - target_node_id: int) -> None: - - last_log_msg = "" # avoid spamming log - - def leader_predicate(l: Optional[int]) -> bool: - nonlocal last_log_msg, target_node_id - if not l: - return False - if l != target_node_id: # type: ignore - log_msg = f'Still waiting for leader {target_node_id}, got {l}' - if log_msg != last_log_msg: # type: ignore # "unbound" - self.logger.info(log_msg) - last_log_msg = log_msg - return False - return True + target_id: int) -> None: - retry_once = True + count = 0 while True: - self.logger.info(f"Starting transfer to {target_node_id}") - admin.partition_transfer_leadership("kafka", topic, 0, - target_node_id) + count += 1 + self.logger.info(f"Waiting for a leader") + leader_id = admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=30, + backoff_s=2) + self.logger.info(f"Current leader {leader_id}") + + if leader_id == target_id: + return + + self.logger.info(f"Starting transfer to {target_id}") + requests.exceptions.HTTPError try: - self._wait_for_leader(leader_predicate, - timeout=ELECTION_TIMEOUT * 2) - except ducktape.errors.TimeoutError as e: - if retry_once: + admin.transfer_leadership_to(topic=topic, + namespace=namespace, + partition=0, + target_id=target_id, + leader_id=leader_id) + except requests.exceptions.HTTPError as e: + if count <= 10 and e.response.status_code == 503: self.logger.info( - f'Failed to get desired leader, retrying once.') - retry_once = False + f"Got 503: {leader_id}'s metadata hasn't been updated yet" + ) + time.sleep(1) continue - else: - raise e - break # no exception -> success, we can return now + raise + break - self.logger.info(f"Completed transfer to {target_node_id}") + admin.await_stable_leader(topic, + partition=0, + namespace=namespace, + timeout_s=ELECTION_TIMEOUT * 4, + backoff_s=2, + check=lambda node_id: node_id != leader_id) + + self.logger.info(f"Completed transfer to {target_id}") @cluster(num_nodes=3) def test_one_node_down(self): @@ -404,7 +412,7 @@ def test_leader_transfers_recovery(self, acks): # is tripping up the cluster when we have so many leadership transfers. # https://github.com/redpanda-data/redpanda/issues/2623 - admin = Admin(self.redpanda) + admin = Admin(self.redpanda, retry_codes=[]) initial_leader_id = leader_node_id for n in range(0, transfer_count):