diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 2a0262a2515bd..67eeae30c4da1 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -27,13 +27,13 @@ def __init__(self, ctx, *args, **kwargs): ctx, *args, extra_rp_conf={ - 'partition_autobalancing_mode': 'continuous', - 'partition_autobalancing_node_availability_timeout_sec': - self.NODE_AVAILABILITY_TIMEOUT, - 'partition_autobalancing_tick_interval_ms': 5000, - 'raft_learner_recovery_rate': 1_000_000, + "partition_autobalancing_mode": "continuous", + "partition_autobalancing_node_availability_timeout_sec": self.NODE_AVAILABILITY_TIMEOUT, + "partition_autobalancing_tick_interval_ms": 5000, + "raft_learner_recovery_rate": 1_000_000, }, - **kwargs) + **kwargs, + ) def node2partition_count(self): topics = [self.topic] @@ -53,7 +53,8 @@ def all_partitions_ready(): all_partitions_ready, timeout_sec=30, backoff_sec=1, - err_msg="failed to wait until all partitions have leaders") + err_msg="failed to wait until all partitions have leaders", + ) for p in partitions: for r in p.replicas: @@ -69,23 +70,27 @@ def check(): req_start = time.time() status = admin.get_partition_balancer_status(timeout=1) - self.logger.info(f'partition balancer status: {status}') + self.logger.info(f"partition balancer status: {status}") - if 'seconds_since_last_tick' not in status: + if "seconds_since_last_tick" not in status: return False - return (req_start - status['seconds_since_last_tick'] - 1 > start - and predicate(status), status) + return ( + req_start - status["seconds_since_last_tick"] - 1 > start + and predicate(status), + status, + ) return wait_until_result( check, timeout_sec=timeout_sec, backoff_sec=2, - err_msg="failed to wait until status condition") + err_msg="failed to wait until status condition", + ) def wait_until_ready(self, timeout_sec=60): return self.wait_until_status( - lambda status: status['status'] == 'ready', - timeout_sec=timeout_sec) + lambda status: status["status"] == "ready", timeout_sec=timeout_sec + ) @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_unavailable_nodes(self): @@ -106,8 +111,9 @@ def test_unavailable_nodes(self): for n in range(10): node = self.redpanda.nodes[n % 5] failure_types = [ - FailureSpec.FAILURE_KILL, FailureSpec.FAILURE_TERMINATE, - FailureSpec.FAILURE_SUSPEND + FailureSpec.FAILURE_KILL, + FailureSpec.FAILURE_TERMINATE, + FailureSpec.FAILURE_SUSPEND, ] failure = FailureSpec(random.choice(failure_types), node) f_injector._start_func(failure.type)(failure.node) @@ -118,21 +124,13 @@ def test_unavailable_nodes(self): time.sleep(self.NODE_AVAILABILITY_TIMEOUT) - # TODO: enable when cancellation gets implemented - # wait_for_quiescent_state = random.random() < 0.5 - wait_for_quiescent_state = True + self.wait_until_ready() - if wait_for_quiescent_state: - self.wait_until_ready() + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") - node2pc = self.node2partition_count() - self.logger.info(f'partition counts after: {node2pc}') - - assert sum(node2pc.values()) == total_replicas - assert self.redpanda.idx(node) not in node2pc - else: - self.wait_until_status(lambda s: s['status'] == 'in_progress' - or s['status'] == 'ready') + assert sum(node2pc.values()) == total_replicas + assert self.redpanda.idx(node) not in node2pc prev_failure = failure