Skip to content

Commit

Permalink
raft_availability_test: use stable leadership transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jun 19, 2022
1 parent 84455eb commit 560f193
Showing 1 changed file with 38 additions and 30 deletions.
68 changes: 38 additions & 30 deletions tests/rptest/tests/raft_availability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import random
import ducktape.errors
from typing import Optional
import requests

from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -123,41 +124,48 @@ def _expect_available(self):
self.logger.info("Cluster is available as expected")

def _transfer_leadership(self, admin: Admin, namespace: str, topic: str,
target_node_id: int) -> None:

last_log_msg = "" # avoid spamming log

def leader_predicate(l: Optional[int]) -> bool:
nonlocal last_log_msg, target_node_id
if not l:
return False
if l != target_node_id: # type: ignore
log_msg = f'Still waiting for leader {target_node_id}, got {l}'
if log_msg != last_log_msg: # type: ignore # "unbound"
self.logger.info(log_msg)
last_log_msg = log_msg
return False
return True
target_id: int) -> None:

retry_once = True
count = 0
while True:
self.logger.info(f"Starting transfer to {target_node_id}")
admin.partition_transfer_leadership("kafka", topic, 0,
target_node_id)
count += 1
self.logger.info(f"Waiting for a leader")
leader_id = admin.await_stable_leader(topic,
partition=0,
namespace=namespace,
timeout_s=30,
backoff_s=2)
self.logger.info(f"Current leader {leader_id}")

if leader_id == target_id:
return

self.logger.info(f"Starting transfer to {target_id}")
requests.exceptions.HTTPError
try:
self._wait_for_leader(leader_predicate,
timeout=ELECTION_TIMEOUT * 2)
except ducktape.errors.TimeoutError as e:
if retry_once:
admin.transfer_leadership_to(topic=topic,
namespace=namespace,
partition=0,
target_id=target_id,
leader_id=leader_id)
except requests.exceptions.HTTPError as e:
if count <= 10 and e.response.status_code == 503:
self.logger.info(
f'Failed to get desired leader, retrying once.')
retry_once = False
f"Got 503: {leader_id}'s metadata hasn't been updated yet"
)
time.sleep(1)
continue
else:
raise e
break # no exception -> success, we can return now
raise
break

self.logger.info(f"Completed transfer to {target_node_id}")
admin.await_stable_leader(topic,
partition=0,
namespace=namespace,
timeout_s=ELECTION_TIMEOUT * 4,
backoff_s=2,
check=lambda node_id: node_id != leader_id)

self.logger.info(f"Completed transfer to {target_id}")

@cluster(num_nodes=3)
def test_one_node_down(self):
Expand Down Expand Up @@ -404,7 +412,7 @@ def test_leader_transfers_recovery(self, acks):
# is tripping up the cluster when we have so many leadership transfers.
# https://github.com/redpanda-data/redpanda/issues/2623

admin = Admin(self.redpanda)
admin = Admin(self.redpanda, retry_codes=[])

initial_leader_id = leader_node_id
for n in range(0, transfer_count):
Expand Down

0 comments on commit 560f193

Please sign in to comment.