Skip to content

Commit

Permalink
tests: cleanup unused code, review comments
Browse files Browse the repository at this point in the history
removes unused code from tree, also cleans up order of tests

The redpanda.healthy api uses metrics which are sometimes disabled
eg in franzgo tests. so we use the admin ready api.
  • Loading branch information
abhijat committed Apr 28, 2022
1 parent c000795 commit d5f909d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 89 deletions.
88 changes: 31 additions & 57 deletions tests/rptest/services/action_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ class ActionConfig:
min_time_between_actions_sec: float
max_time_between_actions_sec: float

# if set, the action injector thread will reverse the last applied action
# before injecting the next action
reverse_action_on_next_cycle: bool = False

# if set, the action will not be applied after this count of nodes has been affected
# subsequent calls to action will not do anything for the lifetime of the action injector thread.
max_affected_nodes: Optional[int] = None
Expand Down Expand Up @@ -134,36 +130,10 @@ def restore_state_on_exit(self, action_log: List[ActionLogEntry]):
self.do_restore_nodes(nodes_to_restore)


class NodeDecommission(DisruptiveAction):
def __init__(
self,
redpanda: RedpandaService,
config: ActionConfig,
admin: Admin,
):
super().__init__(redpanda, config, admin)

def max_affected_nodes_reached(self):
return len(self.affected_nodes) >= self.config.max_affected_nodes


class LeadershipTransfer(DisruptiveAction):
def __init__(
self,
redpanda: RedpandaService,
config: ActionConfig,
admin: Admin,
topics: List[TopicSpec],
):
super().__init__(redpanda, config, admin)
self.topics = topics
self.is_reversible = False

def max_affected_nodes_reached(self):
return False


class ProcessKill(DisruptiveAction):
PROCESS_START_WAIT_SEC = 20
PROCESS_START_WAIT_BACKOFF = 2

def __init__(self, redpanda: RedpandaService, config: ActionConfig,
admin: Admin):
super(ProcessKill, self).__init__(redpanda, config, admin)
Expand Down Expand Up @@ -191,7 +161,7 @@ def do_action(self):
self.redpanda.logger.warn(f'no usable node')

def do_reverse_action(self):
self.failure_injector._start(self.last_affected_node)
self._start_rp(node=self.last_affected_node)
self.affected_nodes.remove(self.last_affected_node)
self.redpanda.add_to_started_nodes(self.last_affected_node)

Expand All @@ -203,7 +173,16 @@ def do_restore_nodes(self, nodes_to_restore: Set[ClusterNode]):
Attempt to restore the redpanda process on all nodes where it was stopped.
"""
for node in nodes_to_restore:
self.failure_injector._start(node)
self._start_rp(node)

def _start_rp(self, node):
self.failure_injector._start(node)
wait_until(
lambda: self.redpanda.redpanda_pid(node),
timeout_sec=self.PROCESS_START_WAIT_SEC,
backoff_sec=self.PROCESS_START_WAIT_BACKOFF,
err_msg=
f'Failed to start redpanda process on {node.account.hostname}')


class ActionInjectorThread(Thread):
Expand All @@ -223,23 +202,34 @@ def __init__(
self.action_log = []

def run(self):
wait_until(lambda: self.redpanda.healthy(),
admin = Admin(self.redpanda)

def all_nodes_started():
statuses = [
admin.ready(node).get("status") for node in self.redpanda.nodes
]
return all(status == 'ready' for status in statuses)

wait_until(all_nodes_started,
timeout_sec=self.config.cluster_start_lead_time_sec,
backoff_sec=2,
err_msg=f'Cluster not ready to begin actions')

self.redpanda.logger.info('cluster is ready, starting action loop')

while not self._stop_requested.is_set():
if self.config.reverse_action_on_next_cycle:
result = self.disruptive_action.reverse()
if result:
self.action_log.append(
ActionLogEntry(result, is_reverse_action=True))
result = self.disruptive_action.action()
if result:
self.action_log.append(
ActionLogEntry(result, is_reverse_action=False))

time.sleep(self.config.time_between_actions())

result = self.disruptive_action.reverse()
if result:
self.action_log.append(
ActionLogEntry(result, is_reverse_action=True))

if self.config.restore_state_on_exit:
self.redpanda.logger.info('attempting to restore system state')
self.disruptive_action.restore_state_on_exit(self.action_log)
Expand Down Expand Up @@ -290,19 +280,3 @@ def create_context_with_defaults(redpanda: RedpandaService,
def random_process_kills(redpanda: RedpandaService,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda, ProcessKill, config=config)


def random_decommissions(redpanda: RedpandaService,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda,
NodeDecommission,
config=config)


def random_leadership_transfers(redpanda: RedpandaService,
topics,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda,
LeadershipTransfer,
topics,
config=config)
8 changes: 0 additions & 8 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,6 @@ def decommission_broker(self, id, node=None):
self.redpanda.logger.debug(f"decommissioning {path}")
return self._request('put', path, node=node)

def recommission_broker(self, broker_id, node=None):
"""
Recommission broker
"""
path = f"brokers/{broker_id}/recommission"
self.redpanda.logger.debug(f"recommissioning {path}")
return self._request('put', path, node=node)

def get_partitions(self,
topic=None,
partition=None,
Expand Down
36 changes: 12 additions & 24 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,7 @@ class EndToEndShadowIndexingBase(EndToEndTest):
replication_factor=3,
), )

def _build_redpanda_instance(self):
return RedpandaService(
context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
)

def _build_kafka_tools(self):
return KafkaCliTools(self.redpanda)

def __init__(self, test_context):
def __init__(self, test_context, extra_rp_conf=None):
super(EndToEndShadowIndexingBase,
self).__init__(test_context=test_context)

Expand All @@ -58,11 +48,14 @@ def __init__(self, test_context):
self.s3_bucket_name = self.si_settings.cloud_storage_bucket
self.si_settings.load_context(self.logger, test_context)
self.scale = Scale(test_context)
self.kafka_tools = None

self.redpanda = RedpandaService(context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
extra_rp_conf=extra_rp_conf)
self.kafka_tools = KafkaCliTools(self.redpanda)

def setUp(self):
self.redpanda = self._build_redpanda_instance()
self.kafka_tools = self._build_kafka_tools()
self.redpanda.start()
for topic in EndToEndShadowIndexingBase.topics:
self.kafka_tools.create_topic(topic)
Expand Down Expand Up @@ -101,16 +94,11 @@ def test_write(self):


class EndToEndShadowIndexingTestWithDisruptions(EndToEndShadowIndexingBase):
def _build_redpanda_instance(self):
return RedpandaService(
context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
# With node failures we require __consumer_offsets to be replicated
# to survive leader crash
extra_rp_conf={
'default_topic_replications': self.num_brokers,
})
def __init__(self, test_context):
super().__init__(test_context,
extra_rp_conf={
'default_topic_replications': self.num_brokers,
})

@cluster(num_nodes=5, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_write_with_node_failures(self):
Expand Down

0 comments on commit d5f909d

Please sign in to comment.