diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index f723bdbfb02d..ee6c311ed22c 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -2851,6 +2851,10 @@ void admin_server::register_cluster_routes() { std::get(result)); } else if (std::holds_alternative(result)) { auto node_id = std::get(result); + vlog( + logger.debug, + "proxying the partition_balancer_overview call to node {}", + node_id); auto rpc_result = co_await _connection_cache.local() .with_node_client< diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index b6ea38954c54..77e6be691ddc 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -34,16 +34,13 @@ class PartitionBalancerTest(EndToEndTest): - NODE_AVAILABILITY_TIMEOUT = 10 - def __init__(self, ctx, *args, **kwargs): super(PartitionBalancerTest, self).__init__( ctx, *args, extra_rp_conf={ "partition_autobalancing_mode": "continuous", - "partition_autobalancing_node_availability_timeout_sec": - self.NODE_AVAILABILITY_TIMEOUT, + "partition_autobalancing_node_availability_timeout_sec": 10, "partition_autobalancing_tick_interval_ms": 5000, "raft_learner_recovery_rate": 1_000_000, }, @@ -79,34 +76,16 @@ def node2partition_count(self): return ret - def wait_node_availability_timeout(self, unavailable_node): - # we need to first wait for the controller to stabilize, - # because each time controller changes, unavailability timer - # starts anew. - unavailable_id = self.redpanda.idx(unavailable_node) - admin = Admin(self.redpanda) - hosts = [n.name for n in self.redpanda.nodes if n != unavailable_node] - controller_id = admin.await_stable_leader( - namespace="redpanda", - topic="controller", - partition=0, - timeout_s=60, - hosts=hosts, - check=lambda node_id: node_id != unavailable_id, - ) - self.logger.info( - f"controller stable: {self.redpanda.get_node(controller_id).name}") - - time.sleep(self.NODE_AVAILABILITY_TIMEOUT) - def wait_until_status(self, predicate, timeout_sec=120): - admin = Admin(self.redpanda) + # We may get a 504 if we proxy a status request to a suspended node. + # It is okay to retry (the controller leader will get re-elected in the meantime). + admin = Admin(self.redpanda, retry_codes=[503, 504]) start = time.time() def check(): req_start = time.time() - status = admin.get_partition_balancer_status(timeout=1) + status = admin.get_partition_balancer_status(timeout=10) self.logger.info(f"partition balancer status: {status}") if "seconds_since_last_tick" not in status: @@ -124,10 +103,23 @@ def check(): err_msg="failed to wait until status condition", ) - def wait_until_ready(self, timeout_sec=120): - return self.wait_until_status( - lambda status: status["status"] == "ready", - timeout_sec=timeout_sec) + def wait_until_ready(self, + timeout_sec=120, + expected_unavailable_node=None): + if expected_unavailable_node: + node_id = self.redpanda.idx(expected_unavailable_node) + self.logger.info(f"waiting for quiescent state, " + f"expected unavailable node: {node_id}") + else: + node_id = None + self.logger.info(f"waiting for quiescent state") + + def predicate(status): + return (status["status"] == "ready" and + ((node_id is None) or (node_id in status["violations"].get( + "unavailable_nodes", [])))) + + return self.wait_until_status(predicate, timeout_sec=timeout_sec) def check_no_replicas_on_node(self, node): node2pc = self.node2partition_count() @@ -190,19 +182,6 @@ def make_unavailable(self, self.cur_failure = FailureSpec(random.choice(failure_types), node) self.f_injector._start_func(self.cur_failure.type)( self.cur_failure.node) - self.test.wait_node_availability_timeout(node) - - def step_and_wait_for_in_progress(self, node): - self.make_unavailable(node) - self.test.logger.info(f"waiting for partition balancer to kick in") - self.test.wait_until_status(lambda s: s["status"] == "in_progress" - or s["status"] == "ready") - - def step_and_wait_for_ready(self, node, wait_for_node_status=False): - self.make_unavailable(node, - wait_for_previous_node=wait_for_node_status) - self.test.logger.info(f"waiting for quiescent state") - self.test.wait_until_ready() @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_unavailable_nodes(self): @@ -218,10 +197,30 @@ def test_unavailable_nodes(self): with self.NodeStopper(self) as ns: for n in range(7): node = self.redpanda.nodes[n % len(self.redpanda.nodes)] + ns.make_unavailable(node) + if n in {1, 3, 4}: - ns.step_and_wait_for_in_progress(node) + # For these nodes we don't wait for the quiescent state, + # but just for the partition movement to start and then + # move to the next node. + + self.logger.info( + f"waiting for partition balancer to kick in") + + node_id = self.redpanda.idx(node) + + def predicate(s): + if s["status"] == "in_progress": + return True + elif s["status"] == "ready": + return node_id in s["violations"].get( + "unavailable_nodes", []) + else: + return False + + self.wait_until_status(predicate) else: - ns.step_and_wait_for_ready(node) + self.wait_until_ready(expected_unavailable_node=node) self.check_no_replicas_on_node(node) # Restore the system to a fully healthy state before validation: @@ -251,7 +250,7 @@ def test_movement_cancellations(self): # clean node ns.make_unavailable(empty_node, failure_types=[FailureSpec.FAILURE_KILL]) - self.wait_until_ready() + self.wait_until_ready(expected_unavailable_node=empty_node) ns.make_available() self.check_no_replicas_on_node(empty_node) @@ -267,15 +266,14 @@ def test_movement_cancellations(self): self.wait_until_status(lambda s: s["status"] == "in_progress") ns.make_available() - with self.NodeStopper(self) as ns2: - # stop empty node - ns.make_unavailable( - empty_node, failure_types=[FailureSpec.FAILURE_KILL]) + # stop empty node + ns.make_unavailable(empty_node, + failure_types=[FailureSpec.FAILURE_KILL]) - # wait until movements are cancelled - self.wait_until_ready() + # wait until movements are cancelled + self.wait_until_ready(expected_unavailable_node=empty_node) - self.check_no_replicas_on_node(empty_node) + self.check_no_replicas_on_node(empty_node) self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) @@ -312,7 +310,8 @@ def check_rack_placement(): with self.NodeStopper(self) as ns: for n in range(7): node = self.redpanda.nodes[n % len(self.redpanda.nodes)] - ns.step_and_wait_for_ready(node, wait_for_node_status=True) + ns.make_unavailable(node, wait_for_previous_node=True) + self.wait_until_ready(expected_unavailable_node=node) self.check_no_replicas_on_node(node) check_rack_placement() @@ -364,7 +363,7 @@ def get_node2partition_count(): ns.make_unavailable(node) try: admin_fuzz.pause() - self.wait_until_ready() + self.wait_until_ready(expected_unavailable_node=node) node2partition_count = get_node2partition_count() assert self.redpanda.idx(node) not in node2partition_count finally: @@ -543,9 +542,10 @@ def entered_maintenance_mode(node): admin.patch_cluster_config( {"raft_learner_recovery_rate": 1_000_000}) - self.wait_until_status(lambda s: s["status"] == "ready") - - if not kill_same_node: + if kill_same_node: + self.wait_until_ready() + else: + self.wait_until_ready(expected_unavailable_node=to_kill) self.check_no_replicas_on_node(to_kill) self.run_validation(enable_idempotence=False,