From 4c0db786b824f336d2d0bd12909ba9716c41a897 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Tue, 19 Jul 2022 12:27:54 +0300 Subject: [PATCH] ducktape: partition balancer cancellations tests --- tests/rptest/tests/partition_balancer_test.py | 102 ++++++++++++++---- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 59111569905d9..aa22459cfd87f 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -36,27 +36,30 @@ def __init__(self, ctx, *args, **kwargs): **kwargs, ) + def topic_partitions_replicas(self, topic): + + rpk = RpkTool(self.redpanda) + num_partitions = topic.partition_count + + def all_partitions_ready(): + try: + partitions = list(rpk.describe_topic(topic.name)) + except RpkException: + return False + return (len(partitions) == num_partitions, partitions) + + return wait_until_result( + all_partitions_ready, + timeout_sec=120, + backoff_sec=1, + err_msg="failed to wait until all partitions have leaders", + ) + def node2partition_count(self): topics = [self.topic] - rpk = RpkTool(self.redpanda) ret = {} for topic in topics: - num_partitions = topic.partition_count - - def all_partitions_ready(): - try: - partitions = list(rpk.describe_topic(topic.name)) - except RpkException: - return False - return (len(partitions) == num_partitions, partitions) - - partitions = wait_until_result( - all_partitions_ready, - timeout_sec=120, - backoff_sec=1, - err_msg="failed to wait until all partitions have leaders", - ) - + partitions = self.topic_partitions_replicas(topic) for p in partitions: for r in p.replicas: ret[r] = ret.setdefault(r, 0) + 1 @@ -125,8 +128,6 @@ def test_unavailable_nodes(self): time.sleep(self.NODE_AVAILABILITY_TIMEOUT) - self.wait_until_ready() - node2pc = self.node2partition_count() self.logger.info(f"partition counts after: {node2pc}") @@ -136,3 +137,66 @@ def test_unavailable_nodes(self): prev_failure = failure self.run_validation() + + def _throttle_recovery(self, new_value): + self.redpanda.set_cluster_config( + {"raft_learner_recovery_rate": str(new_value)}) + + @cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_movement_cancellations(self): + self.start_redpanda(num_nodes=4) + + self.topic = TopicSpec(partition_count=random.randint(20, 30)) + + f_injector = FailureInjector(self.redpanda) + + self.client().create_topic(self.topic) + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + + empty_node = self.redpanda.nodes[-1] + + # clean node + initial_failure = FailureSpec(FailureSpec.FAILURE_KILL, empty_node) + f_injector._start_func(initial_failure.type)(initial_failure.node) + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + self.wait_until_ready() + f_injector._stop_func(initial_failure.type)(initial_failure.node) + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") + assert self.redpanda.idx(empty_node) not in node2pc + + # throttle recovery to prevent partition move from finishing + self._throttle_recovery(10) + + total_replicas = sum(self.node2partition_count().values()) + + for n in range(3): + node = self.redpanda.nodes[n % 3] + failure = FailureSpec(FailureSpec.FAILURE_KILL, node) + f_injector._start_func(failure.type)(failure.node) + + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + + # wait until movement start + self.wait_until_status(lambda s: s["status"] == "in_progress") + f_injector._stop_func(failure.type)(failure.node) + + # stop empty node + f_injector._start_func(initial_failure.type)(initial_failure.node) + time.sleep(self.NODE_AVAILABILITY_TIMEOUT) + + # wait until movements are cancelled + self.wait_until_ready() + + node2pc = self.node2partition_count() + self.logger.info(f"partition counts after: {node2pc}") + + assert sum(node2pc.values()) == total_replicas + assert self.redpanda.idx(empty_node) not in node2pc + + f_injector._stop_func(initial_failure.type)(initial_failure.node) + + self.run_validation()