Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node operations fuzzy improvements #5360

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,8 @@ controller_backend::process_partition_reconfiguration(
* NOTE: deletion is safe as we are already running with new
* configuration so old replicas are not longer needed.
*/
if (
!has_local_replicas(_self, previous_replicas)
|| type == topic_table_delta::op_type::force_abort_update) {
if (can_finish_update(
type, target_assignment.replicas, previous_replicas)) {
co_return co_await dispatch_update_finished(
std::move(ntp), target_assignment);
}
Expand Down Expand Up @@ -916,6 +915,30 @@ controller_backend::process_partition_reconfiguration(
co_return ec;
}

bool controller_backend::can_finish_update(
topic_table_delta::op_type update_type,
const std::vector<model::broker_shard>& current_replicas,
const std::vector<model::broker_shard>& previous_replicas) {
// force abort update may be finished by any node
if (update_type == topic_table_delta::op_type::force_abort_update) {
return true;
}
// update may be finished by a node that was added to replica set
if (!has_local_replicas(_self, previous_replicas)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just looking at this method, this comment doesn't quite match up with the behavior. Could you add a note that it's expected that one of current_replicas is local?

return true;
}
// finally if no nodes were added to replica set one of the current replicas
// may finish an update

auto added_nodes = subtract_replica_sets(
current_replicas, previous_replicas);
if (added_nodes.empty()) {
return has_local_replicas(_self, current_replicas);
}

return false;
}

ss::future<std::error_code>
controller_backend::create_partition_from_remote_shard(
model::ntp ntp,
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ class controller_backend
ss::future<std::error_code> create_partition_from_remote_shard(
model::ntp, ss::shard_id, partition_assignment);

bool can_finish_update(
topic_table_delta::op_type,
const std::vector<model::broker_shard>&,
const std::vector<model::broker_shard>&);

void housekeeping();
void setup_metrics();
ss::sharded<topic_table>& _topics;
Expand Down
102 changes: 20 additions & 82 deletions tests/rptest/scale_tests/node_operations_fuzzy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

DECOMMISSION = "decommission"
ADD = "add"
ADD_TOPIC = "add_tp"
DELETE_TOPIC = "delete_tp"
ADD_NO_WAIT = "add_no_wait"

ALLOWED_REPLICATION = [1, 3]


Expand All @@ -37,19 +37,12 @@ class NodeOperationFuzzyTest(EndToEndTest):
min_inter_failure_time = 30
max_inter_failure_time = 60

def generate_random_workload(self, count, skip_nodes, available_nodes):
op_types = [ADD, DECOMMISSION]
tp_op_types = [ADD_TOPIC, DELETE_TOPIC]
def generate_random_workload(self, count, available_nodes):
op_types = [ADD, ADD_NO_WAIT, DECOMMISSION]
# current state
active_nodes = list(available_nodes)
decommissioned_nodes = []
operations = []
topics = []

def eligible_active_nodes():
return list(
filter(lambda n: not (n == 1 or n in skip_nodes),
active_nodes))

def decommission(id):
active_nodes.remove(id)
Expand All @@ -60,37 +53,24 @@ def add(id):
decommissioned_nodes.remove(id)

for _ in range(0, count):
if len(decommissioned_nodes) == 2:
if len(active_nodes) <= 3:
id = random.choice(decommissioned_nodes)
operations.append((ADD, id))
add(id)
elif len(decommissioned_nodes) == 0:
id = random.choice(eligible_active_nodes())
id = random.choice(active_nodes)
operations.append((DECOMMISSION, id))
decommission(id)
else:
op = random.choice(op_types)
if op == DECOMMISSION:
id = random.choice(eligible_active_nodes())
id = random.choice(active_nodes)
operations.append((DECOMMISSION, id))
decommission(id)
elif op == ADD:
elif op == ADD or op == ADD_NO_WAIT:
id = random.choice(decommissioned_nodes)
operations.append((ADD, id))
operations.append((op, id))
add(id)
# topic operation
if len(topics) == 0:
op = ADD_TOPIC
else:
op = random.choice(tp_op_types)

if op == ADD_TOPIC:
operations.append((
ADD_TOPIC,
f"test-topic-{random.randint(0,2000)}-{round(time.time()*1000000)}",
random.choice(ALLOWED_REPLICATION), 3))
else:
operations.append((DELETE_TOPIC, random.choice(topics)))

return operations

Expand All @@ -112,11 +92,6 @@ def _create_random_topics(self, count):

return topics

"""
Adding nodes to the cluster should result in partition reallocations to new
nodes
"""

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
@parametrize(enable_failures=True)
@parametrize(enable_failures=False)
Expand Down Expand Up @@ -171,7 +146,7 @@ def failure_injector_loop():
1, NodeOperationFuzzyTest.max_suspend_duration_seconds)
node = random.choice(self.redpanda.nodes)
else:
#kill/termianate only active nodes (not to influence the test outcome)
#kill/terminate only active nodes (not to influence the test outcome)
idx = random.choice(list(self.active_nodes))
node = self.redpanda.get_node(idx)

Expand All @@ -198,7 +173,7 @@ def decommission(idx):
def decommissioned():
try:
admin = Admin(self.redpanda)
# if broker is already draining, it is suceess
# if broker is already draining, it is success

brokers = admin.get_brokers()
for b in brokers:
Expand Down Expand Up @@ -293,7 +268,9 @@ def add_node(idx, cleanup=True):
"seed_servers": seed_servers_for(idx)
})

def wait_for_new_replicas(idx):
def has_new_replicas():
id = self.ids_mapping[idx]
per_node = replicas_per_node()
self.logger.info(f"replicas per node: {per_node}")
return id in per_node
Expand All @@ -302,42 +279,9 @@ def has_new_replicas():
timeout_sec=NODE_OP_TIMEOUT,
backoff_sec=2)

def is_topic_present(name):
kcl = KCL(self.redpanda)
lines = kcl.list_topics().splitlines()
self.redpanda.logger.debug(
f"checking if topic {name} is present in {lines}")
for l in lines:
if l.startswith(name):
return True
return False

def create_topic(spec):
try:
DefaultClient(self.redpanda).create_topic(spec)
except Exception as e:
self.redpanda.logger.warn(
f"error creating topic {spec.name} - {e}")
try:
return is_topic_present(spec.name)
except Exception as e:
self.redpanda.logger.warn(f"error while listing topics - {e}")
return False

def delete_topic(name):
try:
DefaultClient(self.redpanda).delete_topic(name)
except Exception as e:
self.redpanda.logger.warn(f"error deleting topic {name} - {e}")
try:
return not is_topic_present(name)
except Exception as e:
self.redpanda.logger.warn(f"error while listing topics - {e}")
return False

work = self.generate_random_workload(10,
skip_nodes=set(),
work = self.generate_random_workload(30,
available_nodes=self.active_nodes)

self.redpanda.logger.info(f"node operations to execute: {work}")
for op in work:
op_type = op[0]
Expand All @@ -347,21 +291,15 @@ def delete_topic(name):
idx = op[1]
self.active_nodes.add(idx)
add_node(idx)
wait_for_new_replicas(idx)
if op_type == ADD_NO_WAIT:
idx = op[1]
self.active_nodes.add(idx)
add_node(idx)
if op_type == DECOMMISSION:
idx = op[1]
self.active_nodes.remove(idx)
decommission(idx)
elif op_type == ADD_TOPIC:
spec = TopicSpec(name=op[1],
replication_factor=op[2],
partition_count=op[3])
wait_until(lambda: create_topic(spec) == True,
timeout_sec=180,
backoff_sec=2)
elif op_type == DELETE_TOPIC:
wait_until(lambda: delete_topic(op[1]) == True,
timeout_sec=180,
backoff_sec=2)

enable_failures = False
admin_fuzz.wait(20, 180)
Expand Down
28 changes: 28 additions & 0 deletions tests/rptest/tests/partition_movement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import random
import time
from numpy import record
import requests

from rptest.services.cluster import cluster
Expand Down Expand Up @@ -579,6 +580,33 @@ def get_status():
rpk.create_topic(topic, 1, 1)
wait_until(lambda: get_status() == 'done', 10, 1)

@cluster(num_nodes=5)
def test_down_replicate(self):
"""
Test changing replication factor from 3 -> 1
"""
throughput, records, _ = self._get_scale_params()
partition_count = 5
self.start_redpanda(num_nodes=3)
admin = Admin(self.redpanda)

spec = TopicSpec(partition_count=partition_count, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name
self.start_producer(1, throughput=throughput)
self.start_consumer(1)
self.await_startup()

for partition in range(0, partition_count):
assignments = self._get_assignments(admin, self.topic, partition)
new_assignment = [assignments[0]]
admin.set_partition_replicas(self.topic, partition, new_assignment)
self._wait_post_move(self.topic, partition, new_assignment, 60)

self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
min_records=records)


class SIPartitionMovementTest(PartitionMovementMixin, EndToEndTest):
"""
Expand Down