diff --git a/tests/rptest/scale_tests/node_operations_fuzzy_test.py b/tests/rptest/scale_tests/node_operations_fuzzy_test.py index 94ee61720909..6dc5803bc979 100644 --- a/tests/rptest/scale_tests/node_operations_fuzzy_test.py +++ b/tests/rptest/scale_tests/node_operations_fuzzy_test.py @@ -27,8 +27,8 @@ DECOMMISSION = "decommission" ADD = "add" -ADD_TOPIC = "add_tp" -DELETE_TOPIC = "delete_tp" +ADD_NO_WAIT = "add_no_wait" + ALLOWED_REPLICATION = [1, 3] @@ -37,19 +37,12 @@ class NodeOperationFuzzyTest(EndToEndTest): min_inter_failure_time = 30 max_inter_failure_time = 60 - def generate_random_workload(self, count, skip_nodes, available_nodes): - op_types = [ADD, DECOMMISSION] - tp_op_types = [ADD_TOPIC, DELETE_TOPIC] + def generate_random_workload(self, count, available_nodes): + op_types = [ADD, ADD_NO_WAIT, DECOMMISSION] # current state active_nodes = list(available_nodes) decommissioned_nodes = [] operations = [] - topics = [] - - def eligible_active_nodes(): - return list( - filter(lambda n: not (n == 1 or n in skip_nodes), - active_nodes)) def decommission(id): active_nodes.remove(id) @@ -60,37 +53,24 @@ def add(id): decommissioned_nodes.remove(id) for _ in range(0, count): - if len(decommissioned_nodes) == 2: + if len(active_nodes) <= 3: id = random.choice(decommissioned_nodes) operations.append((ADD, id)) add(id) elif len(decommissioned_nodes) == 0: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) else: op = random.choice(op_types) if op == DECOMMISSION: - id = random.choice(eligible_active_nodes()) + id = random.choice(active_nodes) operations.append((DECOMMISSION, id)) decommission(id) - elif op == ADD: + elif op == ADD or op == ADD_NO_WAIT: id = random.choice(decommissioned_nodes) - operations.append((ADD, id)) + operations.append((op, id)) add(id) - # topic operation - if len(topics) == 0: - op = ADD_TOPIC - else: - op = random.choice(tp_op_types) - - if op == ADD_TOPIC: - operations.append(( - ADD_TOPIC, - f"test-topic-{random.randint(0,2000)}-{round(time.time()*1000000)}", - random.choice(ALLOWED_REPLICATION), 3)) - else: - operations.append((DELETE_TOPIC, random.choice(topics))) return operations @@ -112,11 +92,6 @@ def _create_random_topics(self, count): return topics - """ - Adding nodes to the cluster should result in partition reallocations to new - nodes - """ - @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @parametrize(enable_failures=True) @parametrize(enable_failures=False) @@ -171,7 +146,7 @@ def failure_injector_loop(): 1, NodeOperationFuzzyTest.max_suspend_duration_seconds) node = random.choice(self.redpanda.nodes) else: - #kill/termianate only active nodes (not to influence the test outcome) + #kill/terminate only active nodes (not to influence the test outcome) idx = random.choice(list(self.active_nodes)) node = self.redpanda.get_node(idx) @@ -198,7 +173,7 @@ def decommission(idx): def decommissioned(): try: admin = Admin(self.redpanda) - # if broker is already draining, it is suceess + # if broker is already draining, it is success brokers = admin.get_brokers() for b in brokers: @@ -293,7 +268,9 @@ def add_node(idx, cleanup=True): "seed_servers": seed_servers_for(idx) }) + def wait_for_new_replicas(idx): def has_new_replicas(): + id = self.ids_mapping[idx] per_node = replicas_per_node() self.logger.info(f"replicas per node: {per_node}") return id in per_node @@ -302,42 +279,9 @@ def has_new_replicas(): timeout_sec=NODE_OP_TIMEOUT, backoff_sec=2) - def is_topic_present(name): - kcl = KCL(self.redpanda) - lines = kcl.list_topics().splitlines() - self.redpanda.logger.debug( - f"checking if topic {name} is present in {lines}") - for l in lines: - if l.startswith(name): - return True - return False - - def create_topic(spec): - try: - DefaultClient(self.redpanda).create_topic(spec) - except Exception as e: - self.redpanda.logger.warn( - f"error creating topic {spec.name} - {e}") - try: - return is_topic_present(spec.name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - - def delete_topic(name): - try: - DefaultClient(self.redpanda).delete_topic(name) - except Exception as e: - self.redpanda.logger.warn(f"error deleting topic {name} - {e}") - try: - return not is_topic_present(name) - except Exception as e: - self.redpanda.logger.warn(f"error while listing topics - {e}") - return False - - work = self.generate_random_workload(10, - skip_nodes=set(), + work = self.generate_random_workload(30, available_nodes=self.active_nodes) + self.redpanda.logger.info(f"node operations to execute: {work}") for op in work: op_type = op[0] @@ -347,21 +291,15 @@ def delete_topic(name): idx = op[1] self.active_nodes.add(idx) add_node(idx) + wait_for_new_replicas(idx) + if op_type == ADD_NO_WAIT: + idx = op[1] + self.active_nodes.add(idx) + add_node(idx) if op_type == DECOMMISSION: idx = op[1] self.active_nodes.remove(idx) decommission(idx) - elif op_type == ADD_TOPIC: - spec = TopicSpec(name=op[1], - replication_factor=op[2], - partition_count=op[3]) - wait_until(lambda: create_topic(spec) == True, - timeout_sec=180, - backoff_sec=2) - elif op_type == DELETE_TOPIC: - wait_until(lambda: delete_topic(op[1]) == True, - timeout_sec=180, - backoff_sec=2) enable_failures = False admin_fuzz.wait(20, 180)