diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 478008ee2f6e..58026782fcc1 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -865,9 +865,8 @@ controller_backend::process_partition_reconfiguration( * NOTE: deletion is safe as we are already running with new * configuration so old replicas are not longer needed. */ - if ( - !has_local_replicas(_self, previous_replicas) - || type == topic_table_delta::op_type::force_abort_update) { + if (can_finish_update( + type, target_assignment.replicas, previous_replicas)) { co_return co_await dispatch_update_finished( std::move(ntp), target_assignment); } @@ -916,6 +915,30 @@ controller_backend::process_partition_reconfiguration( co_return ec; } +bool controller_backend::can_finish_update( + topic_table_delta::op_type update_type, + const std::vector& current_replicas, + const std::vector& previous_replicas) { + // force abort update may be finished by any node + if (update_type == topic_table_delta::op_type::force_abort_update) { + return true; + } + // update may be finished by a node that was added to replica set + if (!has_local_replicas(_self, previous_replicas)) { + return true; + } + // finally if no nodes were added to replica set one of the current replicas + // may finish an update + + auto added_nodes = subtract_replica_sets( + current_replicas, previous_replicas); + if (added_nodes.empty()) { + return has_local_replicas(_self, current_replicas); + } + + return false; +} + ss::future controller_backend::create_partition_from_remote_shard( model::ntp ntp, diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 369609e970d3..cbdb850eac00 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -149,6 +149,11 @@ class controller_backend ss::future create_partition_from_remote_shard( model::ntp, ss::shard_id, partition_assignment); + bool can_finish_update( + topic_table_delta::op_type, + const std::vector&, + const std::vector&); + void housekeeping(); void setup_metrics(); ss::sharded& _topics; diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index 94ee61720909..6dc5803bc979 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -27,8 +27,8 @@ DECOMMISSION = "decommission" ADD = "add" -ADD_TOPIC = "add_tp" -DELETE_TOPIC = "delete_tp" +ADD_NO_WAIT = "add_no_wait" + ALLOWED_REPLICATION = [1, 3] @@ -37,19 +37,12 @@ class NodeOperationFuzzyTest(EndToEndTest): min_inter_failure_time = 30 max_inter_failure_time = 60 - def generate_random_workload(self, count, skip_nodes, available_nodes): - op_types = [ADD, DECOMMISSION] - tp_op_types = [ADD_TOPIC, DELETE_TOPIC] + def generate_random_workload(self, count, available_nodes): + op_types = [ADD, ADD_NO_WAIT, DECOMMISSION] # current state active_nodes = list(available_nodes) decommissioned_nodes = [] operations = [] - topics = [] - - def eligible_active_nodes(): - return list( - filter(lambda n: not (n == 1 or n in skip_nodes), - active_nodes)) def decommission(id): active_nodes.remove(id) @@ -60,37 +53,24 @@ def add(id): decommissioned_nodes.remove(id) for _ in range(0, count): - if len(decommissioned_nodes) == 2: + if len(active_nodes) <= 3: id = random.choice(decommissioned_nodes) operations.append((ADD, id)) add(id) elif len(decommissioned_nodes) == 0: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) else: op = random.choice(op_types) if op == DECOMMISSION: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) - elif op == ADD: + elif op == ADD or op == ADD_NO_WAIT: id = random.choice(decommissioned_nodes) - operations.append((ADD, id)) + operations.append((op, id)) add(id) - # topic operation - if len(topics) == 0: - op = ADD_TOPIC - else: - op = random.choice(tp_op_types) - - if op == ADD_TOPIC: - operations.append(( - ADD_TOPIC, - f"test-topic-{random.randint(0,2000)}-{round(time.time()*1000000)}", - random.choice(ALLOWED_REPLICATION), 3)) - else: - operations.append((DELETE_TOPIC, random.choice(topics))) return operations @@ -112,11 +92,6 @@ def _create_random_topics(self, count): return topics - """ - Adding nodes to the cluster should result in partition reallocations to new - nodes - """ - @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @parametrize(enable_failures=True) @parametrize(enable_failures=False) @@ -171,7 +146,7 @@ def failure_injector_loop(): 1, NodeOperationFuzzyTest.max_suspend_duration_seconds) node = random.choice(self.redpanda.nodes) else: - #kill/termianate only active nodes (not to influence the test outcome) + #kill/terminate only active nodes (not to influence the test outcome) idx = random.choice(list(self.active_nodes)) node = self.redpanda.get_node(idx) @@ -198,7 +173,7 @@ def decommission(idx): def decommissioned(): try: admin = Admin(self.redpanda) - # if broker is already draining, it is suceess + # if broker is already draining, it is success brokers = admin.get_brokers() for b in brokers: @@ -293,7 +268,9 @@ def add_node(idx, cleanup=True): "seed_servers": seed_servers_for(idx) }) + def wait_for_new_replicas(idx): def has_new_replicas(): + id = self.ids_mapping[idx] per_node = replicas_per_node() self.logger.info(f"replicas per node: {per_node}") return id in per_node @@ -302,42 +279,9 @@ def has_new_replicas(): timeout_sec=NODE_OP_TIMEOUT, backoff_sec=2) - def is_topic_present(name): - kcl = KCL(self.redpanda) - lines = kcl.list_topics().splitlines() - self.redpanda.logger.debug( - f"checking if topic {name} is present in {lines}") - for l in lines: - if l.startswith(name): - return True - return False - - def create_topic(spec): - try: - DefaultClient(self.redpanda).create_topic(spec) - except Exception as e: - self.redpanda.logger.warn( - f"error creating topic {spec.name} - {e}") - try: - return is_topic_present(spec.name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - - def delete_topic(name): - try: - DefaultClient(self.redpanda).delete_topic(name) - except Exception as e: - self.redpanda.logger.warn(f"error deleting topic {name} - {e}") - try: - return not is_topic_present(name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - - work = self.generate_random_workload(10, - skip_nodes=set(), + work = self.generate_random_workload(30, available_nodes=self.active_nodes) + self.redpanda.logger.info(f"node operations to execute: {work}") for op in work: op_type = op[0] @@ -347,21 +291,15 @@ def delete_topic(name): idx = op[1] self.active_nodes.add(idx) add_node(idx) + wait_for_new_replicas(idx) + if op_type == ADD_NO_WAIT: + idx = op[1] + self.active_nodes.add(idx) + add_node(idx) if op_type == DECOMMISSION: idx = op[1] self.active_nodes.remove(idx) decommission(idx) - elif op_type == ADD_TOPIC: - spec = TopicSpec(name=op[1], - replication_factor=op[2], - partition_count=op[3]) - wait_until(lambda: create_topic(spec) == True, - timeout_sec=180, - backoff_sec=2) - elif op_type == DELETE_TOPIC: - wait_until(lambda: delete_topic(op[1]) == True, - timeout_sec=180, - backoff_sec=2) enable_failures = False admin_fuzz.wait(20, 180) diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 3bb8ea63c3ab..07ce9509ced5 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -9,6 +9,7 @@ import random import time +from numpy import record import requests from rptest.services.cluster import cluster @@ -579,6 +580,33 @@ def get_status(): rpk.create_topic(topic, 1, 1) wait_until(lambda: get_status() == 'done', 10, 1) + @cluster(num_nodes=5) + def test_down_replicate(self): + """ + Test changing replication factor from 3 -> 1 + """ + throughput, records, _ = self._get_scale_params() + partition_count = 5 + self.start_redpanda(num_nodes=3) + admin = Admin(self.redpanda) + + spec = TopicSpec(partition_count=partition_count, replication_factor=3) + self.client().create_topic(spec) + self.topic = spec.name + self.start_producer(1, throughput=throughput) + self.start_consumer(1) + self.await_startup() + + for partition in range(0, partition_count): + assignments = self._get_assignments(admin, self.topic, partition) + new_assignment = [assignments[0]] + admin.set_partition_replicas(self.topic, partition, new_assignment) + self._wait_post_move(self.topic, partition, new_assignment, 60) + + self.run_validation(enable_idempotence=False, + consumer_timeout_sec=45, + min_records=records) + class SIPartitionMovementTest(PartitionMovementMixin, EndToEndTest): """