Skip to content

Commit

Permalink
tests/partition_balancer: more robust wait for quiescent state
Browse files Browse the repository at this point in the history
Because unavailability timer resets every time the controller leader
changes, robustly waiting for the timer to elapse is hard. Instead we
simply wait until the unavailable node appears in the "violations"
status field.
  • Loading branch information
ztlpn committed Aug 13, 2022
1 parent 0f43dab commit ce4a809
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,6 @@ def node2partition_count(self):

return ret

def wait_node_availability_timeout(self, unavailable_node):
# we need to first wait for the controller to stabilize,
# because each time controller changes, unavailability timer
# starts anew.
unavailable_id = self.redpanda.idx(unavailable_node)
admin = Admin(self.redpanda)
hosts = [n.name for n in self.redpanda.nodes if n != unavailable_node]
controller_id = admin.await_stable_leader(
namespace="redpanda",
topic="controller",
partition=0,
timeout_s=60,
hosts=hosts,
check=lambda node_id: node_id != unavailable_id,
)
self.logger.info(
f"controller stable: {self.redpanda.get_node(controller_id).name}")

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

def wait_until_status(self, predicate, timeout_sec=120):
admin = Admin(self.redpanda)
start = time.time()
Expand All @@ -124,10 +104,23 @@ def check():
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=120):
return self.wait_until_status(
lambda status: status["status"] == "ready",
timeout_sec=timeout_sec)
def wait_until_ready(self,
timeout_sec=120,
expected_unavailable_node=None):
if expected_unavailable_node:
node_id = self.redpanda.idx(expected_unavailable_node)
self.logger.info(f"waiting for quiescent state, "
f"expected unavailable node: {node_id}")
else:
node_id = None
self.logger.info(f"waiting for quiescent state")

def predicate(status):
return (status["status"] == "ready" and
((node_id is None) or (node_id in status["violations"].get(
"unavailable_nodes", []))))

return self.wait_until_status(predicate, timeout_sec=timeout_sec)

def check_no_replicas_on_node(self, node):
node2pc = self.node2partition_count()
Expand Down Expand Up @@ -190,19 +183,20 @@ def make_unavailable(self,
self.cur_failure = FailureSpec(random.choice(failure_types), node)
self.f_injector._start_func(self.cur_failure.type)(
self.cur_failure.node)
self.test.wait_node_availability_timeout(node)

def step_and_wait_for_in_progress(self, node):
self.make_unavailable(node)
self.test.logger.info(f"waiting for partition balancer to kick in")
self.test.wait_until_status(lambda s: s["status"] == "in_progress"
or s["status"] == "ready")
node_id = self.redpanda.idx(node)
self.test.wait_until_status(
lambda s: s["status"] == "in_progress" or
(s["status"] == "ready" and node_id in s["violations"].get(
"unavailable_nodes", [])))

def step_and_wait_for_ready(self, node, wait_for_node_status=False):
self.make_unavailable(node,
wait_for_previous_node=wait_for_node_status)
self.test.logger.info(f"waiting for quiescent state")
self.test.wait_until_ready()
self.test.wait_until_ready(expected_unavailable_node=node)

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_unavailable_nodes(self):
Expand Down Expand Up @@ -251,7 +245,7 @@ def test_movement_cancellations(self):
# clean node
ns.make_unavailable(empty_node,
failure_types=[FailureSpec.FAILURE_KILL])
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=empty_node)
ns.make_available()
self.check_no_replicas_on_node(empty_node)

Expand All @@ -273,7 +267,7 @@ def test_movement_cancellations(self):
empty_node, failure_types=[FailureSpec.FAILURE_KILL])

# wait until movements are cancelled
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=empty_node)

self.check_no_replicas_on_node(empty_node)

Expand Down Expand Up @@ -364,7 +358,7 @@ def get_node2partition_count():
ns.make_unavailable(node)
try:
admin_fuzz.pause()
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=node)
node2partition_count = get_node2partition_count()
assert self.redpanda.idx(node) not in node2partition_count
finally:
Expand Down Expand Up @@ -543,9 +537,10 @@ def entered_maintenance_mode(node):
admin.patch_cluster_config(
{"raft_learner_recovery_rate": 1_000_000})

self.wait_until_status(lambda s: s["status"] == "ready")

if not kill_same_node:
if kill_same_node:
self.wait_until_ready()
else:
self.wait_until_ready(expected_unavailable_node=to_kill)
self.check_no_replicas_on_node(to_kill)

self.run_validation(enable_idempotence=False,
Expand Down

0 comments on commit ce4a809

Please sign in to comment.