diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index b6ea38954c54..5d61e919e3b2 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -79,26 +79,6 @@ def node2partition_count(self): return ret - def wait_node_availability_timeout(self, unavailable_node): - # we need to first wait for the controller to stabilize, - # because each time controller changes, unavailability timer - # starts anew. - unavailable_id = self.redpanda.idx(unavailable_node) - admin = Admin(self.redpanda) - hosts = [n.name for n in self.redpanda.nodes if n != unavailable_node] - controller_id = admin.await_stable_leader( - namespace="redpanda", - topic="controller", - partition=0, - timeout_s=60, - hosts=hosts, - check=lambda node_id: node_id != unavailable_id, - ) - self.logger.info( - f"controller stable: {self.redpanda.get_node(controller_id).name}") - - time.sleep(self.NODE_AVAILABILITY_TIMEOUT) - def wait_until_status(self, predicate, timeout_sec=120): admin = Admin(self.redpanda) start = time.time() @@ -124,10 +104,23 @@ def check(): err_msg="failed to wait until status condition", ) - def wait_until_ready(self, timeout_sec=120): - return self.wait_until_status( - lambda status: status["status"] == "ready", - timeout_sec=timeout_sec) + def wait_until_ready(self, + timeout_sec=120, + expected_unavailable_node=None): + if expected_unavailable_node: + node_id = self.redpanda.idx(expected_unavailable_node) + self.logger.info(f"waiting for quiescent state, " + f"expected unavailable node: {node_id}") + else: + node_id = None + self.logger.info(f"waiting for quiescent state") + + def predicate(status): + return (status["status"] == "ready" and + ((node_id is None) or (node_id in status["violations"].get( + "unavailable_nodes", [])))) + + return self.wait_until_status(predicate, timeout_sec=timeout_sec) def check_no_replicas_on_node(self, node): node2pc = self.node2partition_count() @@ -190,19 +183,20 @@ def make_unavailable(self, self.cur_failure = FailureSpec(random.choice(failure_types), node) self.f_injector._start_func(self.cur_failure.type)( self.cur_failure.node) - self.test.wait_node_availability_timeout(node) def step_and_wait_for_in_progress(self, node): self.make_unavailable(node) self.test.logger.info(f"waiting for partition balancer to kick in") - self.test.wait_until_status(lambda s: s["status"] == "in_progress" - or s["status"] == "ready") + node_id = self.redpanda.idx(node) + self.test.wait_until_status( + lambda s: s["status"] == "in_progress" or + (s["status"] == "ready" and node_id in s["violations"].get( + "unavailable_nodes", []))) def step_and_wait_for_ready(self, node, wait_for_node_status=False): self.make_unavailable(node, wait_for_previous_node=wait_for_node_status) - self.test.logger.info(f"waiting for quiescent state") - self.test.wait_until_ready() + self.test.wait_until_ready(expected_unavailable_node=node) @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_unavailable_nodes(self): @@ -251,7 +245,7 @@ def test_movement_cancellations(self): # clean node ns.make_unavailable(empty_node, failure_types=[FailureSpec.FAILURE_KILL]) - self.wait_until_ready() + self.wait_until_ready(expected_unavailable_node=empty_node) ns.make_available() self.check_no_replicas_on_node(empty_node) @@ -273,7 +267,7 @@ def test_movement_cancellations(self): empty_node, failure_types=[FailureSpec.FAILURE_KILL]) # wait until movements are cancelled - self.wait_until_ready() + self.wait_until_ready(expected_unavailable_node=empty_node) self.check_no_replicas_on_node(empty_node) @@ -364,7 +358,7 @@ def get_node2partition_count(): ns.make_unavailable(node) try: admin_fuzz.pause() - self.wait_until_ready() + self.wait_until_ready(expected_unavailable_node=node) node2partition_count = get_node2partition_count() assert self.redpanda.idx(node) not in node2partition_count finally: @@ -543,9 +537,10 @@ def entered_maintenance_mode(node): admin.patch_cluster_config( {"raft_learner_recovery_rate": 1_000_000}) - self.wait_until_status(lambda s: s["status"] == "ready") - - if not kill_same_node: + if kill_same_node: + self.wait_until_ready() + else: + self.wait_until_ready(expected_unavailable_node=to_kill) self.check_no_replicas_on_node(to_kill) self.run_validation(enable_idempotence=False,