From f15e3d87b886d96639c3b685cc22923bdbb8fb88 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 18 Jun 2022 21:57:43 -0700 Subject: [PATCH] admin: make transfer_leadership_to more stable making transfer_leadership_to more stable by checking that all nodes has updated metadata. previously we could choose a stale server, get 503 and retry the requests --- tests/rptest/services/admin.py | 43 +++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index dea1dada30f9..fd81762fb035 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -187,17 +187,19 @@ def _get_stable_configuration( return None info = PartitionDetails() info.status = status - info.leader = last_leader + info.leader = int(last_leader) info.replicas = replicas return info def wait_stable_configuration( self, topic, + *, partition=0, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None) -> PartitionDetails: """ Method waits for timeout_s until the configuration is stable and returns it. @@ -231,7 +233,7 @@ def get_stable_configuration(): return wait_until_result( get_stable_configuration, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't fetch stable replicas for {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -242,6 +244,7 @@ def await_stable_leader(self, namespace="kafka", replication=None, timeout_s=10, + backoff_s=1, hosts: Optional[list[str]] = None, check: Callable[[int], bool] = lambda node_id: True): @@ -252,9 +255,13 @@ def await_stable_leader(self, When the timeout is exhaust it throws TimeoutException """ def is_leader_stable(): - info = self.wait_stable_configuration(topic, partition, namespace, - replication, timeout_s, - hosts) + info = self.wait_stable_configuration(topic, + partition=partition, + namespace=namespace, + replication=replication, + timeout_s=timeout_s, + hosts=hosts, + backoff_s=backoff_s) if check(info.leader): return True, info.leader return False @@ -262,7 +269,7 @@ def is_leader_stable(): return wait_until_result( is_leader_stable, timeout_sec=timeout_s, - backoff_sec=1, + backoff_sec=backoff_s, err_msg= f"can't get stable leader of {namespace}/{topic}/{partition} within {timeout_s} sec" ) @@ -547,16 +554,14 @@ def transfer_leadership_to(self, namespace, topic, partition, - target_id=None): + target_id=None, + leader_id=None): """ Looks up current ntp leader and transfer leadership to target node, this operations is NOP when current leader is the same as target. If user pass None for target this function will choose next replica for new leader. If leadership transfer was performed this function return True """ - - # check which node is current leader - def _get_details(): p = self.get_partitions(topic=topic, partition=partition, @@ -565,25 +570,25 @@ def _get_details(): f"ntp {namespace}/{topic}/{partition} details: {p}") return p - def _has_leader(): - return self.get_partition_leader( - namespace=namespace, topic=topic, partition=partition) != -1 + # check which node is current leader - wait_until(_has_leader, - timeout_sec=30, - backoff_sec=2, - err_msg="Failed to establish current leader") + if leader_id == None: + leader_id = self.await_stable_leader(topic, + partition=partition, + namespace=namespace, + timeout_s=30, + backoff_s=2) details = _get_details() if target_id is not None: - if details['leader_id'] == target_id: + if leader_id == target_id: return False path = f"raft/{details['raft_group_id']}/transfer_leadership?target={target_id}" else: path = f"raft/{details['raft_group_id']}/transfer_leadership" - leader = self.redpanda.get_node(details['leader_id']) + leader = self.redpanda.get_node(leader_id) ret = self._request('post', path=path, node=leader) return ret.status_code == 200