From 551045ec62d06c97825e804602aa2478f9d0f382 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Tue, 3 May 2022 12:06:59 +0530 Subject: [PATCH 1/2] tests: adds action injection context manager a new context manager is added which runs a background thread, injecting actions into a redpanda cluster, and optionally reversing them. --- tests/rptest/services/action_injector.py | 238 +++++++++++++++++++++++ tests/rptest/services/redpanda.py | 7 + 2 files changed, 245 insertions(+) create mode 100644 tests/rptest/services/action_injector.py diff --git a/tests/rptest/services/action_injector.py b/tests/rptest/services/action_injector.py new file mode 100644 index 000000000000..ef124d66320a --- /dev/null +++ b/tests/rptest/services/action_injector.py @@ -0,0 +1,238 @@ +import dataclasses +import random +import time +from threading import Thread, Event +from typing import Optional + +from ducktape.cluster.cluster import ClusterNode +from ducktape.utils.util import wait_until + +from rptest.services.admin import Admin +from rptest.services.failure_injector import FailureSpec, FailureInjector +from rptest.services.redpanda import RedpandaService + + +@dataclasses.dataclass +class ActionConfig: + # lead time the action injector waits for the cluster + # to become healthy before starting disruptive actions + cluster_start_lead_time_sec: float + min_time_between_actions_sec: float + max_time_between_actions_sec: float + + # if set, the action will not be applied after this count of nodes has been affected + # subsequent calls to action will not do anything for the lifetime of the action injector thread. + max_affected_nodes: Optional[int] = None + + def time_between_actions(self) -> float: + """ + The action injector thread sleeps for this time interval between calls + to DisruptiveAction + """ + return random.uniform(self.min_time_between_actions_sec, + self.max_time_between_actions_sec) + + +class DisruptiveAction: + """ + Defines an action taken on a node or on the cluster as a whole which causes a disruption. + + The action could be a process failure, leadership transfer, topic modification etc. + + The action can be reversible, it also stores the set of affected nodes and the last node + the action was applied on. + """ + def __init__(self, redpanda: RedpandaService, config: ActionConfig, + admin: Admin): + self.admin = admin + self.config = config + self.redpanda = redpanda + self.affected_nodes = set() + self.is_reversible = False + self.last_affected_node = None + + def max_affected_nodes_reached(self) -> bool: + """ + Checks if the number of affected nodes so far equals the maximum number of nodes + this action is allowed to affect. If so all future calls to action will be no-op. + """ + raise NotImplementedError + + def target_node(self) -> Optional[ClusterNode]: + """ + Randomly selects the next node to apply the action on. A set of affected + nodes is maintained so that we do not apply the action on nodes which were + already targeted in previous invocations. + """ + available = set(self.redpanda.nodes) - self.affected_nodes + if available: + selected = random.choice(list(available)) + names = {n.account.hostname for n in available} + self.redpanda.logger.info( + f'selected {selected.account.hostname} of {names} for operation' + ) + return selected + return None + + def do_action(self) -> ClusterNode: + """ + Applies the disruptive action, returns node or entity the action was applied on + """ + raise NotImplementedError + + def action(self) -> Optional[ClusterNode]: + if not self.max_affected_nodes_reached(): + return self.do_action() + return None + + def do_reverse_action(self) -> ClusterNode: + """ + Reverses the last applied action if applicable. + """ + raise NotImplementedError + + def reverse(self) -> Optional[ClusterNode]: + if self.is_reversible and self.last_affected_node is not None: + return self.do_reverse_action() + return None + + +class ProcessKill(DisruptiveAction): + PROCESS_START_WAIT_SEC = 20 + PROCESS_START_WAIT_BACKOFF = 2 + + def __init__(self, redpanda: RedpandaService, config: ActionConfig, + admin: Admin): + super(ProcessKill, self).__init__(redpanda, config, admin) + self.failure_injector = FailureInjector(self.redpanda) + self.is_reversible = True + + def max_affected_nodes_reached(self): + return len(self.affected_nodes) >= self.config.max_affected_nodes + + def do_action(self): + node = self.target_node() + if node: + self.redpanda.logger.info( + f'executing action on {node.account.hostname}') + self.failure_injector.inject_failure( + FailureSpec(FailureSpec.FAILURE_KILL, node)) + self.affected_nodes.add(node) + self.last_affected_node = node + + # Update started_nodes so storage validations are run + # on the correct set of nodes later. + self.redpanda.remove_from_started_nodes(node) + return node + else: + self.redpanda.logger.warn(f'no usable node') + return None + + def do_reverse_action(self): + self._start_rp(node=self.last_affected_node) + self.affected_nodes.remove(self.last_affected_node) + self.redpanda.add_to_started_nodes(self.last_affected_node) + + last_affected_node, self.last_affected_node = self.last_affected_node, None + return last_affected_node + + def _start_rp(self, node): + self.failure_injector._start(node) + wait_until( + lambda: self.redpanda.redpanda_pid(node), + timeout_sec=self.PROCESS_START_WAIT_SEC, + backoff_sec=self.PROCESS_START_WAIT_BACKOFF, + err_msg= + f'Failed to start redpanda process on {node.account.hostname}') + + +class ActionInjectorThread(Thread): + def __init__( + self, + config: ActionConfig, + redpanda: RedpandaService, + disruptive_action: DisruptiveAction, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.disruptive_action = disruptive_action + self.redpanda = redpanda + self.config = config + self._stop_requested = Event() + self.action_triggered = False + self.reverse_action_triggered = False + + def run(self): + admin = Admin(self.redpanda) + + def all_nodes_started(): + statuses = [ + admin.ready(node).get("status") for node in self.redpanda.nodes + ] + return all(status == 'ready' for status in statuses) + + wait_until(all_nodes_started, + timeout_sec=self.config.cluster_start_lead_time_sec, + backoff_sec=2, + err_msg=f'Cluster not ready to begin actions') + + self.redpanda.logger.info('cluster is ready, starting action loop') + + while not self._stop_requested.is_set(): + if self.disruptive_action.action(): + self.action_triggered = True + time.sleep(self.config.time_between_actions()) + if self.disruptive_action.reverse(): + self.reverse_action_triggered = True + + def stop(self): + self._stop_requested.set() + + +class ActionCtx: + def __init__(self, config: ActionConfig, redpanda: RedpandaService, + disruptive_action: DisruptiveAction): + self.redpanda = redpanda + self.config = config + if config.max_affected_nodes is None: + config.max_affected_nodes = len(redpanda.nodes) // 2 + self.thread = ActionInjectorThread(config, redpanda, disruptive_action) + + def __enter__(self): + self.redpanda.logger.info(f'entering random failure ctx') + self.thread.start() + return self + + def __exit__(self, *args, **kwargs): + self.redpanda.logger.info(f'leaving random failure ctx') + self.thread.stop() + self.thread.join() + + def assert_actions_triggered(self): + """ + Helper to allow tests to assert that the forward and reverse + actions were triggered by the thread + """ + assert (self.thread.action_triggered + and self.thread.reverse_action_triggered) + + +def create_context_with_defaults(redpanda: RedpandaService, + op_type, + config: ActionConfig = None, + *args, + **kwargs) -> ActionCtx: + admin = Admin(redpanda) + config = config or ActionConfig( + cluster_start_lead_time_sec=20, + min_time_between_actions_sec=10, + max_time_between_actions_sec=30, + ) + return ActionCtx(config, redpanda, + op_type(redpanda, config, admin, *args, **kwargs)) + + +def random_process_kills(redpanda: RedpandaService, + config: ActionConfig = None) -> ActionCtx: + return create_context_with_defaults(redpanda, ProcessKill, config=config) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index c217eac5b9bc..19d7fc2e569d 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -1023,9 +1023,16 @@ def stop_node(self, node, timeout=None): lambda: len(self.pids(node)) == 0, timeout_sec=timeout, err_msg=f"Redpanda node failed to stop in {timeout} seconds") + self.remove_from_started_nodes(node) + + def remove_from_started_nodes(self, node): if node in self._started: self._started.remove(node) + def add_to_started_nodes(self, node): + if node not in self._started: + self._started.append(node) + def clean(self, **kwargs): super().clean(**kwargs) if self._s3client: From 889747877f0cdc0c16ef82e9b5499a83e595c912 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Tue, 3 May 2022 12:07:57 +0530 Subject: [PATCH 2/2] tests: uses action injection in e2e SI test --- .../rptest/tests/e2e_shadow_indexing_test.py | 96 +++++++++++++------ 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/tests/rptest/tests/e2e_shadow_indexing_test.py b/tests/rptest/tests/e2e_shadow_indexing_test.py index c43c52d1e698..c61580f9b435 100644 --- a/tests/rptest/tests/e2e_shadow_indexing_test.py +++ b/tests/rptest/tests/e2e_shadow_indexing_test.py @@ -7,70 +7,74 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.services.cluster import cluster -from rptest.services.redpanda import SISettings +import os +import random + +from ducktape.tests.test import TestContext +from ducktape.utils.util import wait_until + from rptest.clients.kafka_cli_tools import KafkaCliTools +from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec -from rptest.services.redpanda import RedpandaService, SISettings -from rptest.util import Scale +from rptest.services.action_injector import random_process_kills +from rptest.services.cluster import cluster +from rptest.services.franz_go_verifiable_services import FranzGoVerifiableProducer, FranzGoVerifiableRandomConsumer +from rptest.services.franz_go_verifiable_services import ServiceStatus, await_minimum_produced_records +from rptest.services.redpanda import RedpandaService, CHAOS_LOG_ALLOW_LIST +from rptest.services.redpanda import SISettings from rptest.tests.end_to_end import EndToEndTest +from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.util import Scale from rptest.util import ( produce_until_segments, wait_for_segments_removal, ) -from rptest.services.franz_go_verifiable_services import FranzGoVerifiableProducer, FranzGoVerifiableRandomConsumer -from rptest.services.franz_go_verifiable_services import ServiceStatus, await_minimum_produced_records -from rptest.tests.prealloc_nodes import PreallocNodesTest -from rptest.clients.rpk import RpkTool -from ducktape.utils.util import wait_until -from ducktape.tests.test import TestContext - -import os -import random - -class EndToEndShadowIndexingTest(EndToEndTest): +class EndToEndShadowIndexingBase(EndToEndTest): segment_size = 1048576 # 1 Mb s3_topic_name = "panda-topic" + + num_brokers = 3 + topics = (TopicSpec( name=s3_topic_name, partition_count=1, replication_factor=3, ), ) - def __init__(self, test_context): - super(EndToEndShadowIndexingTest, + def __init__(self, test_context, extra_rp_conf=None): + super(EndToEndShadowIndexingBase, self).__init__(test_context=test_context) + self.test_context = test_context self.topic = EndToEndShadowIndexingTest.s3_topic_name - si_settings = SISettings( + self.si_settings = SISettings( cloud_storage_reconciliation_interval_ms=500, cloud_storage_max_connections=5, log_segment_size=EndToEndShadowIndexingTest.segment_size, # 1MB ) - self.s3_bucket_name = si_settings.cloud_storage_bucket - - si_settings.load_context(self.logger, test_context) - + self.s3_bucket_name = self.si_settings.cloud_storage_bucket + self.si_settings.load_context(self.logger, test_context) self.scale = Scale(test_context) - self.redpanda = RedpandaService( - context=test_context, - num_brokers=3, - si_settings=si_settings, - ) + self.redpanda = RedpandaService(context=self.test_context, + num_brokers=self.num_brokers, + si_settings=self.si_settings, + extra_rp_conf=extra_rp_conf) self.kafka_tools = KafkaCliTools(self.redpanda) def setUp(self): self.redpanda.start() - for topic in EndToEndShadowIndexingTest.topics: + for topic in EndToEndShadowIndexingBase.topics: self.kafka_tools.create_topic(topic) def tearDown(self): self.s3_client.empty_bucket(self.s3_bucket_name) + +class EndToEndShadowIndexingTest(EndToEndShadowIndexingBase): @cluster(num_nodes=5) def test_write(self): """Write at least 10 segments, set retention policy to leave only 5 @@ -95,11 +99,45 @@ def test_write(self): topic=self.topic, partition_idx=0, count=6) - self.start_consumer() self.run_validation() +class EndToEndShadowIndexingTestWithDisruptions(EndToEndShadowIndexingBase): + def __init__(self, test_context): + super().__init__(test_context, + extra_rp_conf={ + 'default_topic_replications': self.num_brokers, + }) + + @cluster(num_nodes=5, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_write_with_node_failures(self): + self.start_producer() + produce_until_segments( + redpanda=self.redpanda, + topic=self.topic, + partition_idx=0, + count=10, + ) + + self.kafka_tools.alter_topic_config( + self.topic, + { + TopicSpec.PROPERTY_RETENTION_BYTES: + 5 * EndToEndShadowIndexingTest.segment_size + }, + ) + + with random_process_kills(self.redpanda) as ctx: + wait_for_segments_removal(redpanda=self.redpanda, + topic=self.topic, + partition_idx=0, + count=6) + self.start_consumer() + self.run_validation() + ctx.assert_actions_triggered() + + class ShadowIndexingWhileBusyTest(PreallocNodesTest): # With SI enabled, run common operations against a cluster # while the system is under load (busy).