Skip to content

Commit

Permalink
tests: attempt to restore state on exit
Browse files Browse the repository at this point in the history
attempts to restore changes made on system exit. additionally
there are refactors to make function names and variables clearer

also introduces mutator methods in redpanda class to make changes
to started nodes explicit
  • Loading branch information
abhijat committed Apr 27, 2022
1 parent 4911872 commit e966021
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 87 deletions.
247 changes: 160 additions & 87 deletions tests/rptest/services/action_injector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import random
import time
from collections import defaultdict
from threading import Thread, Event
from typing import Optional, List
from typing import Optional, List, Set

from ducktape.cluster.cluster import ClusterNode
from ducktape.utils.util import wait_until
Expand All @@ -13,34 +14,77 @@
from rptest.services.redpanda import RedpandaService


@dataclasses.dataclass
class ActionLogEntry:
node: ClusterNode
is_reverse_action: bool

def __repr__(self) -> str:
return f'Node: {self.node.account.hostname}, reverse? {self.is_reverse_action}'


@dataclasses.dataclass
class ActionConfig:
# lead time the action injector waits for the cluster
# to become healthy before starting disruptive actions
cluster_start_lead_time_sec: float
min_time_between_actions_sec: float
max_time_between_actions_sec: float

# if set, the action injector thread will reverse the last applied action
# before injecting the next action
reverse_action_on_next_cycle: bool = False

# if set, the action will not be applied after this count of nodes has been affected
# subsequent calls to action will not do anything for the lifetime of the action injector thread.
max_affected_nodes: Optional[int] = None

def random_time_in_range(self) -> float:
# attempt to restore state when the action injector thread exits
restore_state_on_exit: bool = True

def time_between_actions(self) -> float:
"""
The action injector thread sleeps for this time interval between calls
to DisruptiveAction
"""
return random.uniform(self.min_time_between_actions_sec,
self.max_time_between_actions_sec)


class RandomNodeOp:
class DisruptiveAction:
"""
Defines an action taken on a node or on the cluster as a whole which causes a disruption.
The action could be a process failure, leadership transfer, topic modification etc.
The action can be reversible, it also stores the set of affected nodes and the last node
the action was applied on.
"""

def __init__(self, redpanda: RedpandaService, config: ActionConfig,
admin: Admin):
self.admin = admin
self.config = config
self.redpanda = redpanda
self.nodes_affected = set()
self.affected_nodes = set()
self.is_reversible = False
self.last_action_on_node = None

def limit_reached(self) -> bool:
self.last_affected_node = None

def max_affected_nodes_reached(self) -> bool:
"""
Checks if the number of affected nodes equals the maximum number of nodes
this action is allowed to affect. If so the next calls to action will return
early.
"""
raise NotImplementedError

def target_node(self) -> ClusterNode:
available = set(self.redpanda.nodes) - self.nodes_affected
"""
Randomly selects the next node to apply the action on. A set of affected
nodes is maintained so that we do not apply the action on nodes which were
already targeted in previous invocations.
"""
available = set(self.redpanda.nodes) - self.affected_nodes
if available:
selected = random.choice(list(available))
names = {n.account.hostname for n in available}
Expand All @@ -49,28 +93,55 @@ def target_node(self) -> ClusterNode:
)
return selected

def do_action(self) -> ClusterNode:
"""
Applies the disruptive action, returns node the action was applied on
"""
raise NotImplementedError

def action(self) -> ClusterNode:
if not self.max_affected_nodes_reached():
return self.do_action()

def do_reverse_action(self) -> ClusterNode:
"""
Reverses the last applied action
"""
raise NotImplementedError

def reverse(self, node: ClusterNode) -> ClusterNode:
def reverse(self) -> ClusterNode:
if self.is_reversible and self.last_affected_node is not None:
return self.do_reverse_action()

def do_restore_nodes(self, nodes_to_restore: Set[ClusterNode]):
raise NotImplementedError

def __call__(self, reverse=False, *args, **kwargs) -> ClusterNode:
if reverse:
if self.is_reversible and self.last_action_on_node is not None:
return self.reverse(node=self.last_action_on_node)
else:
self.redpanda.logger.warn(
f'Ignoring reverse call with nothing to reverse')
elif not self.limit_reached():
return self.action()
def restore_state_on_exit(self, action_log: List[ActionLogEntry]):
"""
Optionally restore state when the action injector thread is ending.
Uses the action log to determine what restoration should be done.
"""
all_nodes = {entry.node for entry in action_log}

nodes_where_action_reversed = defaultdict(lambda: False)
for entry in action_log:
nodes_where_action_reversed[entry.node] = entry.is_reverse_action

class RandomNodeDecommission(RandomNodeOp):
def limit_reached(self):
return len(self.nodes_affected) >= self.config.max_affected_nodes
nodes_to_restore = all_nodes - {
node for node, is_reversed in nodes_where_action_reversed.items()
if not is_reversed
}

def action(self) -> ClusterNode:
hostnames = {node.account.hostname for node in nodes_to_restore}
self.redpanda.logger.info(f'Restoring state on {hostnames}')
self.do_restore_nodes(nodes_to_restore)


class NodeDecommission(DisruptiveAction):
def max_affected_nodes_reached(self):
return len(self.affected_nodes) >= self.config.max_affected_nodes

def do_action(self) -> ClusterNode:
brokers = self.admin.get_brokers()
node_to_decommission = random.choice(brokers)

Expand All @@ -88,34 +159,37 @@ def done():
timeout_sec=120,
backoff_sec=2,
err_msg=f'Failed to decommission broker id {broker_id}')
self.nodes_affected.add(broker_id)
self.last_action_on_node = broker_id
return self.last_action_on_node
self.affected_nodes.add(broker_id)
self.last_affected_node = broker_id
return self.last_affected_node

def do_reverse_action(self) -> ClusterNode:
self.admin.recommission_broker(self.last_affected_node)
self.affected_nodes.remove(self.last_affected_node)

def reverse(self, node) -> ClusterNode:
self.admin.recommission_broker(self.last_action_on_node)
self.nodes_affected.remove(self.last_action_on_node)
last_action_on_node = self.last_action_on_node
self.last_action_on_node = None
return last_action_on_node
last_affected_node, self.last_affected_node = self.last_affected_node, None
return last_affected_node

def do_restore_nodes(self, nodes_to_restore: Set[ClusterNode]):
pass

class RandomLeadershipTransfer(RandomNodeOp):

class LeadershipTransfer(DisruptiveAction):
def __init__(
self,
redpanda: RedpandaService,
config: ActionConfig,
admin: Admin,
topics: List[TopicSpec],
self,
redpanda: RedpandaService,
config: ActionConfig,
admin: Admin,
topics: List[TopicSpec],
):
super().__init__(redpanda, config, admin)
self.topics = topics
self.is_reversible = False

def limit_reached(self):
def max_affected_nodes_reached(self):
return False

def action(self):
def do_action(self):
for topic in self.topics:
for partition in range(topic.partition_count):
old_leader = self.admin.get_partition_leader(
Expand All @@ -135,72 +209,67 @@ def leader_is_changed():
backoff_sec=2,
err_msg='Leadership transfer failed')

def reverse(self, node: ClusterNode):
raise NotImplementedError('Leadership transfer not reversible')
def do_reverse_action(self):
raise NotImplementedError('Leadership transfer is not reversible')

def do_restore_nodes(self, nodes_to_restore: Set[ClusterNode]):
pass


class RandomNodeProcessFailure(RandomNodeOp):
class ProcessKill(DisruptiveAction):
def __init__(self, redpanda: RedpandaService, config: ActionConfig,
admin: Admin):
super(RandomNodeProcessFailure, self).__init__(redpanda, config, admin)
super(ProcessKill, self).__init__(redpanda, config, admin)
self.failure_injector = FailureInjector(self.redpanda)
self.is_reversible = True

def limit_reached(self):
return len(self.nodes_affected) >= self.config.max_affected_nodes
def max_affected_nodes_reached(self):
return len(self.affected_nodes) >= self.config.max_affected_nodes

def action(self):
def do_action(self):
node = self.target_node()
if node:
self.redpanda.logger.info(
f'executing action on {node.account.hostname}')
self.failure_injector.inject_failure(
FailureSpec(FailureSpec.FAILURE_KILL, node))
self.nodes_affected.add(node)
self.last_action_on_node = node

# Update started_nodes so validations are run on the correct
# set of nodes later.
try:
self.redpanda.started_nodes().remove(node)
except ValueError:
self.redpanda.logger.warn(
f'failed to remove {node.account.hostname} from rp node list'
)
finally:
return node
self.affected_nodes.add(node)
self.last_affected_node = node

# Update started_nodes so storage validations are run
# on the correct set of nodes later.
self.redpanda.remove_from_started_nodes(node)
return node
else:
self.redpanda.logger.warn(f'no usable node')

def reverse(self, node: ClusterNode):
self.failure_injector._start(self.last_action_on_node)
self.nodes_affected.remove(self.last_action_on_node)
self.redpanda.started_nodes().append(self.last_action_on_node)
last_action_on_node = self.last_action_on_node
self.last_action_on_node = None
return last_action_on_node
def do_reverse_action(self):
self.failure_injector._start(self.last_affected_node)
self.affected_nodes.remove(self.last_affected_node)
self.redpanda.add_to_started_nodes(self.last_affected_node)

last_affected_node, self.last_affected_node = self.last_affected_node, None
return last_affected_node

@dataclasses.dataclass
class ActionLogEntry:
node: ClusterNode
is_reverse_action: bool

def __repr__(self) -> str:
return f'Node: {self.node.account.hostname}, reverse? {self.is_reverse_action}'
def do_restore_nodes(self, nodes_to_restore: Set[ClusterNode]):
"""
Attempt to restore the redpanda process on all nodes where it was stopped.
"""
for node in nodes_to_restore:
self.failure_injector._start(node)


class ActionInjectorThread(Thread):
def __init__(
self,
config: ActionConfig,
redpanda: RedpandaService,
random_op: RandomNodeOp,
*args,
**kwargs,
self,
config: ActionConfig,
redpanda: RedpandaService,
action: DisruptiveAction,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.random_op = random_op
self.action = action
self.redpanda = redpanda
self.config = config
self._stop_requested = Event()
Expand All @@ -214,23 +283,27 @@ def run(self):

while not self._stop_requested.is_set():
if self.config.reverse_action_on_next_cycle:
result = self.random_op(reverse=True)
result = self.action.reverse()
if result:
self.action_log.append(
ActionLogEntry(result, is_reverse_action=True))
result = self.random_op()
result = self.action.action()
if result:
self.action_log.append(
ActionLogEntry(result, is_reverse_action=False))
time.sleep(self.config.random_time_in_range())
time.sleep(self.config.time_between_actions())

if self.config.restore_state_on_exit:
self.redpanda.logger.info('attempting to restore system state')
self.action.restore_state_on_exit(self.action_log)

def stop(self):
self._stop_requested.set()


class ActionCtx:
def __init__(self, config: ActionConfig, redpanda: RedpandaService,
random_op: RandomNodeOp):
random_op: DisruptiveAction):
self.redpanda = redpanda
self.config = config
if config.max_affected_nodes is None:
Expand Down Expand Up @@ -270,21 +343,21 @@ def create_context_with_defaults(redpanda: RedpandaService,
def random_process_kills(redpanda: RedpandaService,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda,
RandomNodeProcessFailure,
ProcessKill,
config=config)


def random_decommissions(redpanda: RedpandaService,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda,
RandomNodeDecommission,
NodeDecommission,
config=config)


def random_leadership_transfers(redpanda: RedpandaService,
topics,
config: ActionConfig = None) -> ActionCtx:
return create_context_with_defaults(redpanda,
RandomLeadershipTransfer,
LeadershipTransfer,
topics,
config=config)
Loading

0 comments on commit e966021

Please sign in to comment.