From 644aa33f4cc139d5a2ba44ec830b5fd5ddd7d101 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 6 Jul 2022 10:40:07 +0200 Subject: [PATCH 1/6] c/controller_backend: fix dispatching update finished command Fixed dispatching update_finished command when no replicas were added to the replica set. Previously when no replicas were added to the replica set there was no node that would pass the condition allowing it to dispatch update finished command. This way partition reconfiguration never finished causing chaos-tests to fail. Signed-off-by: Michal Maslanka --- src/v/cluster/controller_backend.cc | 29 ++++++++++++++++++++++++++--- src/v/cluster/controller_backend.h | 5 +++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 478008ee2f6e..58026782fcc1 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -865,9 +865,8 @@ controller_backend::process_partition_reconfiguration( * NOTE: deletion is safe as we are already running with new * configuration so old replicas are not longer needed. */ - if ( - !has_local_replicas(_self, previous_replicas) - || type == topic_table_delta::op_type::force_abort_update) { + if (can_finish_update( + type, target_assignment.replicas, previous_replicas)) { co_return co_await dispatch_update_finished( std::move(ntp), target_assignment); } @@ -916,6 +915,30 @@ controller_backend::process_partition_reconfiguration( co_return ec; } +bool controller_backend::can_finish_update( + topic_table_delta::op_type update_type, + const std::vector& current_replicas, + const std::vector& previous_replicas) { + // force abort update may be finished by any node + if (update_type == topic_table_delta::op_type::force_abort_update) { + return true; + } + // update may be finished by a node that was added to replica set + if (!has_local_replicas(_self, previous_replicas)) { + return true; + } + // finally if no nodes were added to replica set one of the current replicas + // may finish an update + + auto added_nodes = subtract_replica_sets( + current_replicas, previous_replicas); + if (added_nodes.empty()) { + return has_local_replicas(_self, current_replicas); + } + + return false; +} + ss::future controller_backend::create_partition_from_remote_shard( model::ntp ntp, diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 369609e970d3..cbdb850eac00 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -149,6 +149,11 @@ class controller_backend ss::future create_partition_from_remote_shard( model::ntp, ss::shard_id, partition_assignment); + bool can_finish_update( + topic_table_delta::op_type, + const std::vector&, + const std::vector&); + void housekeeping(); void setup_metrics(); ss::sharded& _topics; From 76ed8224ed16f75cede0791c9e4056081d69d4c0 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 6 Jul 2022 10:42:12 +0200 Subject: [PATCH 2/6] tests: added test validating shrinking replica set Added test validating if we can shrink raft group replica set. Signed-off-by: Michal Maslanka --- tests/rptest/tests/partition_movement_test.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 3bb8ea63c3ab..07ce9509ced5 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -9,6 +9,7 @@ import random import time +from numpy import record import requests from rptest.services.cluster import cluster @@ -579,6 +580,33 @@ def get_status(): rpk.create_topic(topic, 1, 1) wait_until(lambda: get_status() == 'done', 10, 1) + @cluster(num_nodes=5) + def test_down_replicate(self): + """ + Test changing replication factor from 3 -> 1 + """ + throughput, records, _ = self._get_scale_params() + partition_count = 5 + self.start_redpanda(num_nodes=3) + admin = Admin(self.redpanda) + + spec = TopicSpec(partition_count=partition_count, replication_factor=3) + self.client().create_topic(spec) + self.topic = spec.name + self.start_producer(1, throughput=throughput) + self.start_consumer(1) + self.await_startup() + + for partition in range(0, partition_count): + assignments = self._get_assignments(admin, self.topic, partition) + new_assignment = [assignments[0]] + admin.set_partition_replicas(self.topic, partition, new_assignment) + self._wait_post_move(self.topic, partition, new_assignment, 60) + + self.run_validation(enable_idempotence=False, + consumer_timeout_sec=45, + min_records=records) + class SIPartitionMovementTest(PartitionMovementMixin, EndToEndTest): """ From 764143c8a2b75aeb2a4efc367927b2b8f882b31c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 6 Jul 2022 13:11:01 +0200 Subject: [PATCH 3/6] tests/node_op_fuzzy: do not create/delete topics As we are already using admin operations fuzzer to execute admin day-2 operations we may skip creating/deleting topics explicitly Signed-off-by: Michal Maslanka --- .../scale_tests/node_operations_fuzzy_test.py | 77 ++----------------- 1 file changed, 6 insertions(+), 71 deletions(-) diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index 94ee61720909..2cb8cf78832c 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -27,8 +27,7 @@ DECOMMISSION = "decommission" ADD = "add" -ADD_TOPIC = "add_tp" -DELETE_TOPIC = "delete_tp" + ALLOWED_REPLICATION = [1, 3] @@ -37,19 +36,12 @@ class NodeOperationFuzzyTest(EndToEndTest): min_inter_failure_time = 30 max_inter_failure_time = 60 - def generate_random_workload(self, count, skip_nodes, available_nodes): + def generate_random_workload(self, count, available_nodes): op_types = [ADD, DECOMMISSION] - tp_op_types = [ADD_TOPIC, DELETE_TOPIC] # current state active_nodes = list(available_nodes) decommissioned_nodes = [] operations = [] - topics = [] - - def eligible_active_nodes(): - return list( - filter(lambda n: not (n == 1 or n in skip_nodes), - active_nodes)) def decommission(id): active_nodes.remove(id) @@ -60,37 +52,24 @@ def add(id): decommissioned_nodes.remove(id) for _ in range(0, count): - if len(decommissioned_nodes) == 2: + if len(active_nodes) <= 3: id = random.choice(decommissioned_nodes) operations.append((ADD, id)) add(id) elif len(decommissioned_nodes) == 0: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) else: op = random.choice(op_types) if op == DECOMMISSION: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) elif op == ADD: id = random.choice(decommissioned_nodes) operations.append((ADD, id)) add(id) - # topic operation - if len(topics) == 0: - op = ADD_TOPIC - else: - op = random.choice(tp_op_types) - - if op == ADD_TOPIC: - operations.append(( - ADD_TOPIC, - f"test-topic-{random.randint(0,2000)}-{round(time.time()*1000000)}", - random.choice(ALLOWED_REPLICATION), 3)) - else: - operations.append((DELETE_TOPIC, random.choice(topics))) return operations @@ -302,42 +281,9 @@ def has_new_replicas(): timeout_sec=NODE_OP_TIMEOUT, backoff_sec=2) - def is_topic_present(name): - kcl = KCL(self.redpanda) - lines = kcl.list_topics().splitlines() - self.redpanda.logger.debug( - f"checking if topic {name} is present in {lines}") - for l in lines: - if l.startswith(name): - return True - return False - - def create_topic(spec): - try: - DefaultClient(self.redpanda).create_topic(spec) - except Exception as e: - self.redpanda.logger.warn( - f"error creating topic {spec.name} - {e}") - try: - return is_topic_present(spec.name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - - def delete_topic(name): - try: - DefaultClient(self.redpanda).delete_topic(name) - except Exception as e: - self.redpanda.logger.warn(f"error deleting topic {name} - {e}") - try: - return not is_topic_present(name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - work = self.generate_random_workload(10, - skip_nodes=set(), available_nodes=self.active_nodes) + self.redpanda.logger.info(f"node operations to execute: {work}") for op in work: op_type = op[0] @@ -351,17 +297,6 @@ def delete_topic(name): idx = op[1] self.active_nodes.remove(idx) decommission(idx) - elif op_type == ADD_TOPIC: - spec = TopicSpec(name=op[1], - replication_factor=op[2], - partition_count=op[3]) - wait_until(lambda: create_topic(spec) == True, - timeout_sec=180, - backoff_sec=2) - elif op_type == DELETE_TOPIC: - wait_until(lambda: delete_topic(op[1]) == True, - timeout_sec=180, - backoff_sec=2) enable_failures = False admin_fuzz.wait(20, 180) From 8a941035c426e58a19c49a57bce8e11aa9dbf8b3 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 6 Jul 2022 14:50:52 +0200 Subject: [PATCH 4/6] tests: added add no wait to nodes operations fuzzy test Added `add_no_wait` operation to nodes operations fuzzy test. The add no wait does not wait for the node to be populated with partitions before executing next operation, this way we are giving decommissioning a change to be executed before addition will be finished therefore triggering move cancellation code path. Signed-off-by: Michal Maslanka --- .../scale_tests/node_operations_fuzzy_test.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index 2cb8cf78832c..b0a975f20c13 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -27,6 +27,7 @@ DECOMMISSION = "decommission" ADD = "add" +ADD_NO_WAIT = "add_no_wait" ALLOWED_REPLICATION = [1, 3] @@ -37,7 +38,7 @@ class NodeOperationFuzzyTest(EndToEndTest): max_inter_failure_time = 60 def generate_random_workload(self, count, available_nodes): - op_types = [ADD, DECOMMISSION] + op_types = [ADD, ADD_NO_WAIT, DECOMMISSION] # current state active_nodes = list(available_nodes) decommissioned_nodes = [] @@ -66,9 +67,9 @@ def add(id): id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) - elif op == ADD: + elif op == ADD or op == ADD_NO_WAIT: id = random.choice(decommissioned_nodes) - operations.append((ADD, id)) + operations.append((op, id)) add(id) return operations @@ -272,7 +273,9 @@ def add_node(idx, cleanup=True): "seed_servers": seed_servers_for(idx) }) + def wait_for_new_replicas(idx): def has_new_replicas(): + id = self.ids_mapping[idx] per_node = replicas_per_node() self.logger.info(f"replicas per node: {per_node}") return id in per_node @@ -281,7 +284,7 @@ def has_new_replicas(): timeout_sec=NODE_OP_TIMEOUT, backoff_sec=2) - work = self.generate_random_workload(10, + work = self.generate_random_workload(30, available_nodes=self.active_nodes) self.redpanda.logger.info(f"node operations to execute: {work}") @@ -293,6 +296,11 @@ def has_new_replicas(): idx = op[1] self.active_nodes.add(idx) add_node(idx) + wait_for_new_replicas(idx) + if op_type == ADD_NO_WAIT: + idx = op[1] + self.active_nodes.add(idx) + add_node(idx) if op_type == DECOMMISSION: idx = op[1] self.active_nodes.remove(idx) From a0535450076663d8fa3faec5d629553f5c84e6d0 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 7 Jul 2022 08:32:25 +0200 Subject: [PATCH 5/6] tests/node_ops: removed obsolete comment Signed-off-by: Michal Maslanka --- tests/rptest/scale_tests/node_operations_fuzzy_test.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index b0a975f20c13..f461e04971c6 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -92,11 +92,6 @@ def _create_random_topics(self, count): return topics - """ - Adding nodes to the cluster should result in partition reallocations to new - nodes - """ - @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @parametrize(enable_failures=True) @parametrize(enable_failures=False) From 3795ed6a1744a902a447da856a4ed0305b178abf Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 7 Jul 2022 08:34:40 +0200 Subject: [PATCH 6/6] tests: fixed typos in comments Signed-off-by: Michal Maslanka --- tests/rptest/scale_tests/node_operations_fuzzy_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index f461e04971c6..6dc5803bc979 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -146,7 +146,7 @@ def failure_injector_loop(): 1, NodeOperationFuzzyTest.max_suspend_duration_seconds) node = random.choice(self.redpanda.nodes) else: - #kill/termianate only active nodes (not to influence the test outcome) + #kill/terminate only active nodes (not to influence the test outcome) idx = random.choice(list(self.active_nodes)) node = self.redpanda.get_node(idx) @@ -173,7 +173,7 @@ def decommission(idx): def decommissioned(): try: admin = Admin(self.redpanda) - # if broker is already draining, it is suceess + # if broker is already draining, it is success brokers = admin.get_brokers() for b in brokers: