Skip to content

Commit

Permalink
admin: make transfer_leadership_to more stable
Browse files Browse the repository at this point in the history
making transfer_leadership_to more stable by checking that all
nodes has updated metadata. previously we could choose a stale
server, get 503 and retry the requests
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 1064521 commit f15e3d8
Showing 1 changed file with 24 additions and 19 deletions.
43 changes: 24 additions & 19 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,19 @@ def _get_stable_configuration(
return None
info = PartitionDetails()
info.status = status
info.leader = last_leader
info.leader = int(last_leader)
info.replicas = replicas
return info

def wait_stable_configuration(
self,
topic,
*,
partition=0,
namespace="kafka",
replication=None,
timeout_s=10,
backoff_s=1,
hosts: Optional[list[str]] = None) -> PartitionDetails:
"""
Method waits for timeout_s until the configuration is stable and returns it.
Expand Down Expand Up @@ -231,7 +233,7 @@ def get_stable_configuration():
return wait_until_result(
get_stable_configuration,
timeout_sec=timeout_s,
backoff_sec=1,
backoff_sec=backoff_s,
err_msg=
f"can't fetch stable replicas for {namespace}/{topic}/{partition} within {timeout_s} sec"
)
Expand All @@ -242,6 +244,7 @@ def await_stable_leader(self,
namespace="kafka",
replication=None,
timeout_s=10,
backoff_s=1,
hosts: Optional[list[str]] = None,
check: Callable[[int],
bool] = lambda node_id: True):
Expand All @@ -252,17 +255,21 @@ def await_stable_leader(self,
When the timeout is exhaust it throws TimeoutException
"""
def is_leader_stable():
info = self.wait_stable_configuration(topic, partition, namespace,
replication, timeout_s,
hosts)
info = self.wait_stable_configuration(topic,
partition=partition,
namespace=namespace,
replication=replication,
timeout_s=timeout_s,
hosts=hosts,
backoff_s=backoff_s)
if check(info.leader):
return True, info.leader
return False

return wait_until_result(
is_leader_stable,
timeout_sec=timeout_s,
backoff_sec=1,
backoff_sec=backoff_s,
err_msg=
f"can't get stable leader of {namespace}/{topic}/{partition} within {timeout_s} sec"
)
Expand Down Expand Up @@ -547,16 +554,14 @@ def transfer_leadership_to(self,
namespace,
topic,
partition,
target_id=None):
target_id=None,
leader_id=None):
"""
Looks up current ntp leader and transfer leadership to target node,
this operations is NOP when current leader is the same as target.
If user pass None for target this function will choose next replica for new leader.
If leadership transfer was performed this function return True
"""

# check which node is current leader

def _get_details():
p = self.get_partitions(topic=topic,
partition=partition,
Expand All @@ -565,25 +570,25 @@ def _get_details():
f"ntp {namespace}/{topic}/{partition} details: {p}")
return p

def _has_leader():
return self.get_partition_leader(
namespace=namespace, topic=topic, partition=partition) != -1
# check which node is current leader

wait_until(_has_leader,
timeout_sec=30,
backoff_sec=2,
err_msg="Failed to establish current leader")
if leader_id == None:
leader_id = self.await_stable_leader(topic,
partition=partition,
namespace=namespace,
timeout_s=30,
backoff_s=2)

details = _get_details()

if target_id is not None:
if details['leader_id'] == target_id:
if leader_id == target_id:
return False
path = f"raft/{details['raft_group_id']}/transfer_leadership?target={target_id}"
else:
path = f"raft/{details['raft_group_id']}/transfer_leadership"

leader = self.redpanda.get_node(details['leader_id'])
leader = self.redpanda.get_node(leader_id)
ret = self._request('post', path=path, node=leader)
return ret.status_code == 200

Expand Down

0 comments on commit f15e3d8

Please sign in to comment.