Skip to content

Commit

Permalink
Merge pull request #5360 from mmaslankaprv/node-operations-fuzzy-impr…
Browse files Browse the repository at this point in the history
…ovements

Node operations fuzzy improvements
  • Loading branch information
mmaslankaprv committed Jul 11, 2022
2 parents 18fa203 + 3795ed6 commit f12a18f
Showing 1 changed file with 20 additions and 82 deletions.
102 changes: 20 additions & 82 deletions tests/rptest/scale_tests/node_operations_fuzzy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

DECOMMISSION = "decommission"
ADD = "add"
ADD_TOPIC = "add_tp"
DELETE_TOPIC = "delete_tp"
ADD_NO_WAIT = "add_no_wait"

ALLOWED_REPLICATION = [1, 3]


Expand All @@ -37,19 +37,12 @@ class NodeOperationFuzzyTest(EndToEndTest):
min_inter_failure_time = 30
max_inter_failure_time = 60

def generate_random_workload(self, count, skip_nodes, available_nodes):
op_types = [ADD, DECOMMISSION]
tp_op_types = [ADD_TOPIC, DELETE_TOPIC]
def generate_random_workload(self, count, available_nodes):
op_types = [ADD, ADD_NO_WAIT, DECOMMISSION]
# current state
active_nodes = list(available_nodes)
decommissioned_nodes = []
operations = []
topics = []

def eligible_active_nodes():
return list(
filter(lambda n: not (n == 1 or n in skip_nodes),
active_nodes))

def decommission(id):
active_nodes.remove(id)
Expand All @@ -60,37 +53,24 @@ def add(id):
decommissioned_nodes.remove(id)

for _ in range(0, count):
if len(decommissioned_nodes) == 2:
if len(active_nodes) <= 3:
id = random.choice(decommissioned_nodes)
operations.append((ADD, id))
add(id)
elif len(decommissioned_nodes) == 0:
id = random.choice(eligible_active_nodes())
id = random.choice(active_nodes)
operations.append((DECOMMISSION, id))
decommission(id)
else:
op = random.choice(op_types)
if op == DECOMMISSION:
id = random.choice(eligible_active_nodes())
id = random.choice(active_nodes)
operations.append((DECOMMISSION, id))
decommission(id)
elif op == ADD:
elif op == ADD or op == ADD_NO_WAIT:
id = random.choice(decommissioned_nodes)
operations.append((ADD, id))
operations.append((op, id))
add(id)
# topic operation
if len(topics) == 0:
op = ADD_TOPIC
else:
op = random.choice(tp_op_types)

if op == ADD_TOPIC:
operations.append((
ADD_TOPIC,
f"test-topic-{random.randint(0,2000)}-{round(time.time()*1000000)}",
random.choice(ALLOWED_REPLICATION), 3))
else:
operations.append((DELETE_TOPIC, random.choice(topics)))

return operations

Expand All @@ -112,11 +92,6 @@ def _create_random_topics(self, count):

return topics

"""
Adding nodes to the cluster should result in partition reallocations to new
nodes
"""

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
@parametrize(enable_failures=True)
@parametrize(enable_failures=False)
Expand Down Expand Up @@ -171,7 +146,7 @@ def failure_injector_loop():
1, NodeOperationFuzzyTest.max_suspend_duration_seconds)
node = random.choice(self.redpanda.nodes)
else:
#kill/termianate only active nodes (not to influence the test outcome)
#kill/terminate only active nodes (not to influence the test outcome)
idx = random.choice(list(self.active_nodes))
node = self.redpanda.get_node(idx)

Expand All @@ -198,7 +173,7 @@ def decommission(idx):
def decommissioned():
try:
admin = Admin(self.redpanda)
# if broker is already draining, it is suceess
# if broker is already draining, it is success

brokers = admin.get_brokers()
for b in brokers:
Expand Down Expand Up @@ -293,7 +268,9 @@ def add_node(idx, cleanup=True):
"seed_servers": seed_servers_for(idx)
})

def wait_for_new_replicas(idx):
def has_new_replicas():
id = self.ids_mapping[idx]
per_node = replicas_per_node()
self.logger.info(f"replicas per node: {per_node}")
return id in per_node
Expand All @@ -302,42 +279,9 @@ def has_new_replicas():
timeout_sec=NODE_OP_TIMEOUT,
backoff_sec=2)

def is_topic_present(name):
kcl = KCL(self.redpanda)
lines = kcl.list_topics().splitlines()
self.redpanda.logger.debug(
f"checking if topic {name} is present in {lines}")
for l in lines:
if l.startswith(name):
return True
return False

def create_topic(spec):
try:
DefaultClient(self.redpanda).create_topic(spec)
except Exception as e:
self.redpanda.logger.warn(
f"error creating topic {spec.name} - {e}")
try:
return is_topic_present(spec.name)
except Exception as e:
self.redpanda.logger.warn(f"error while listing topics - {e}")
return False

def delete_topic(name):
try:
DefaultClient(self.redpanda).delete_topic(name)
except Exception as e:
self.redpanda.logger.warn(f"error deleting topic {name} - {e}")
try:
return not is_topic_present(name)
except Exception as e:
self.redpanda.logger.warn(f"error while listing topics - {e}")
return False

work = self.generate_random_workload(10,
skip_nodes=set(),
work = self.generate_random_workload(30,
available_nodes=self.active_nodes)

self.redpanda.logger.info(f"node operations to execute: {work}")
for op in work:
op_type = op[0]
Expand All @@ -347,21 +291,15 @@ def delete_topic(name):
idx = op[1]
self.active_nodes.add(idx)
add_node(idx)
wait_for_new_replicas(idx)
if op_type == ADD_NO_WAIT:
idx = op[1]
self.active_nodes.add(idx)
add_node(idx)
if op_type == DECOMMISSION:
idx = op[1]
self.active_nodes.remove(idx)
decommission(idx)
elif op_type == ADD_TOPIC:
spec = TopicSpec(name=op[1],
replication_factor=op[2],
partition_count=op[3])
wait_until(lambda: create_topic(spec) == True,
timeout_sec=180,
backoff_sec=2)
elif op_type == DELETE_TOPIC:
wait_until(lambda: delete_topic(op[1]) == True,
timeout_sec=180,
backoff_sec=2)

enable_failures = False
admin_fuzz.wait(20, 180)
Expand Down

0 comments on commit f12a18f

Please sign in to comment.