Skip to content

Commit

Permalink
tests: run partition_movement_test with mixed versions
Browse files Browse the repository at this point in the history
Given the test moves replicas from node to node, it seems like a good
candidate to test compatibility between Redpanda versions.

The test currently relies on selecting random nodes for the sake of
moving replicas around, so I wasn't very intentional in deciding what
nodes to upgrade -- this commit just selects the first couple in index
order.
  • Loading branch information
andrwng committed Jul 7, 2022
1 parent 2488975 commit fca5b43
Showing 1 changed file with 162 additions and 37 deletions.
199 changes: 162 additions & 37 deletions tests/rptest/tests/partition_movement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from rptest.clients.kafka_cat import KafkaCat
from ducktape.mark import ok_to_fail
from ducktape.mark import parametrize

from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.services.honey_badger import HoneyBadger
from rptest.services.rpk_producer import RpkProducer
Expand All @@ -29,14 +30,38 @@

# Errors we should tolerate when moving partitions around
PARTITION_MOVEMENT_LOG_ERRORS = [
# e.g. raft - [follower: {id: {1}, revision: {10}}] [group_id:3, {kafka/topic/2}] - recovery_stm.cc:422 - recovery append entries error: raft group does not exists on target broker
# e.g. raft - [follower: {id: {1}, revision: {10}}] [group_id:3, {kafka/topic/2}] - recovery_stm.cc:422 - recovery append entries error: raft group does not exist on target broker
"raft - .*raft group does not exist on target broker",
# e.g. raft - [group_id:3, {kafka/topic/2}] consensus.cc:2317 - unable to replicate updated configuration: raft::errc::replicated_entry_truncated
"raft - .*unable to replicate updated configuration: .*",
# e.g. recovery_stm.cc:432 - recovery append entries error: rpc::errc::client_request_timeout"
"raft - .*recovery append entries error.*client_request_timeout"
]

PREV_VERSION_LOG_ALLOW_LIST = [
# e.g. cluster - controller_backend.cc:400 - Error while reconciling topics - seastar::abort_requested_exception (abort requested)
"cluster - .*Error while reconciling topic.*",
# Typo fixed in recent versions.
# e.g. raft - [follower: {id: {1}, revision: {10}}] [group_id:3, {kafka/topic/2}] - recovery_stm.cc:422 - recovery append entries error: raft group does not exists on target broker
"raft - .*raft group does not exists on target broker",
# e.g. rpc - Service handler thrown an exception - seastar::gate_closed_exception (gate closed)
"rpc - .*gate_closed_exception.*",
# Tests on mixed versions will start out with an unclean restart before
# starting a workload.
"(raft|rpc) - .*(disconnected_endpoint|Broken pipe|Connection reset by peer)"
]


def _upgrade_nodes(redpanda, num_to_upgrade):
"""
Upgrades the given number of nodes to the latest version.
"""
nodes_to_upgrade = [
redpanda.get_node(i + 1) for i in range(num_to_upgrade)
]
redpanda._installer.install(nodes_to_upgrade, RedpandaInstaller.HEAD)
redpanda.restart_nodes(nodes_to_upgrade)


class PartitionMovementTest(PartitionMovementMixin, EndToEndTest):
"""
Expand All @@ -61,17 +86,25 @@ def __init__(self, ctx, *args, **kwargs):
**kwargs)
self._ctx = ctx

@cluster(num_nodes=3)
def test_moving_not_fully_initialized_partition(self):
@cluster(num_nodes=3, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
"""
Move partition before first leader is elected
"""
self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)

hb = HoneyBadger()
# if failure injector is not enabled simply skip this test
if not hb.is_enabled(self.redpanda.nodes[0]):
return
for node in self.redpanda.nodes:
if not hb.is_enabled(node):
return

for n in self.redpanda.nodes:
hb.set_exception(n, 'raftgen_service::failure_probes', 'vote')
Expand Down Expand Up @@ -122,12 +155,19 @@ def derived_done():

wait_until(derived_done, timeout_sec=60, backoff_sec=2)

@cluster(num_nodes=3)
def test_empty(self):
@cluster(num_nodes=3, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_empty(self, num_to_upgrade):
"""
Move empty partitions.
"""
self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)

topics = []
for partition_count in range(1, 5):
Expand All @@ -144,13 +184,22 @@ def test_empty(self):
for _ in range(25):
self._move_and_verify()

@cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS)
def test_static(self):
@cluster(num_nodes=4,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_static(self, num_to_upgrade):
"""
Move partitions with data, but no active producers or consumers.
"""
self.logger.info(f"Starting redpanda...")
self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)

topics = []
for partition_count in range(1, 5):
Expand Down Expand Up @@ -234,15 +283,24 @@ def _get_scale_params(self):

return throughput, records, moves

@cluster(num_nodes=5, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS)
def test_dynamic(self):
@cluster(num_nodes=5,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_dynamic(self, num_to_upgrade):
"""
Move partitions with active consumer / producer
"""
throughput, records, moves = self._get_scale_params()

test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions,
extra_rp_conf={"default_topic_replications": 3})
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)
spec = TopicSpec(name="topic", partition_count=3, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name
Expand All @@ -256,17 +314,34 @@ def test_dynamic(self):
consumer_timeout_sec=45,
min_records=records)

@cluster(num_nodes=5, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS)
def test_move_consumer_offsets_intranode(self):
@cluster(num_nodes=5,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_move_consumer_offsets_intranode(self, num_to_upgrade):
"""
Exercise moving the consumer_offsets/0 partition between shards
within the same nodes. This reproduces certain bugs in the special
handling of this topic.
"""
throughput, records, moves = self._get_scale_params()

self.start_redpanda(num_nodes=3,
extra_rp_conf={"default_topic_replications": 3})
test_mixed_versions = num_to_upgrade > 0
if test_mixed_versions:
# Start at a version that supports consumer groups.
# TODO: use 'install_previous_version' once it becomes the prior
# feature version.
self.start_redpanda(
num_nodes=3,
enable_installer=test_mixed_versions,
install_version=(22, 1, 3),
extra_rp_conf={"default_topic_replications": 3})
_upgrade_nodes(self.redpanda, num_to_upgrade)
else:
self.start_redpanda(
num_nodes=3, extra_rp_conf={"default_topic_replications": 3})

spec = TopicSpec(name="topic", partition_count=3, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name
Expand All @@ -292,12 +367,19 @@ def test_move_consumer_offsets_intranode(self):

@cluster(num_nodes=5,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
RESTART_LOG_ALLOW_LIST)
def test_bootstrapping_after_move(self):
RESTART_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_bootstrapping_after_move(self, num_to_upgrade):
"""
Move partitions with active consumer / producer
"""
self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)
spec = TopicSpec(name="topic", partition_count=3, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name
Expand Down Expand Up @@ -327,13 +409,20 @@ def offsets_are_recovered():

wait_until(offsets_are_recovered, 30, 2)

@cluster(num_nodes=3)
def test_invalid_destination(self):
@cluster(num_nodes=3, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_invalid_destination(self, num_to_upgrade):
"""
Check that requuests to move to non-existent locations are properly rejected.
"""

self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)
spec = TopicSpec(name="topic", partition_count=1, replication_factor=1)
self.client().create_topic(spec)
topic = spec.name
Expand Down Expand Up @@ -409,14 +498,27 @@ def test_invalid_destination(self):
r = admin.set_partition_replicas(topic, partition, assignments)
assert r.status_code == 200

@cluster(num_nodes=5)
def test_overlapping_changes(self):
@cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_overlapping_changes(self, num_to_upgrade):
"""
Check that while a movement is in flight, rules about
overlapping operations are properly enforced.
"""

self.start_redpanda(num_nodes=4)
test_mixed_versions = num_to_upgrade > 0
if test_mixed_versions:
# Start at a version that supports the RpkProducer workload.
# TODO: use 'install_previous_version' once it becomes the prior
# feature version.
self.start_redpanda(num_nodes=4,
enable_installer=test_mixed_versions,
install_version=(22, 1, 3))
_upgrade_nodes(self.redpanda, num_to_upgrade)
else:
self.start_redpanda(num_nodes=4)

node_ids = {1, 2, 3, 4}

# Create topic with enough data that inter-node movement
Expand Down Expand Up @@ -498,14 +600,21 @@ def partition_ready():
rpk.delete_topic(name)
assert name not in rpk.list_topics()

@cluster(num_nodes=4)
def test_deletion_stops_move(self):
@cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_deletion_stops_move(self, num_to_upgrade):
"""
Delete topic which partitions are being moved and check status after
topic is created again, old move
opeartions should not influcence newly created topic
"""
self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)

# create a single topic with replication factor of 1
topic = 'test-topic'
Expand Down Expand Up @@ -573,6 +682,7 @@ def get_status():
wait_until(lambda: get_status() == 'in_progress', 10, 1)
# delete the topic
rpk.delete_topic(topic)

# start the node back up
self.redpanda.start_node(node)
# create topic again
Expand Down Expand Up @@ -616,8 +726,10 @@ def _get_scale_params(self):
partitions = 1 if self.debug_mode else 10
return throughput, records, moves, partitions

@cluster(num_nodes=5)
def test_shadow_indexing(self):
@cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_shadow_indexing(self, num_to_upgrade):
"""
Test interaction between the shadow indexing and the partition movement.
Partition movement generate partitions with different revision-ids and the
Expand All @@ -626,7 +738,12 @@ def test_shadow_indexing(self):
"""
throughput, records, moves, partitions = self._get_scale_params()

self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)
spec = TopicSpec(name="topic",
partition_count=partitions,
replication_factor=3)
Expand All @@ -641,15 +758,23 @@ def test_shadow_indexing(self):
consumer_timeout_sec=45,
min_records=records)

@cluster(num_nodes=5)
def test_cross_shard(self):
@cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@parametrize(num_to_upgrade=0)
@parametrize(num_to_upgrade=2)
def test_cross_shard(self, num_to_upgrade):
"""
Test interaction between the shadow indexing and the partition movement.
Move partitions with SI enabled between shards.
"""
throughput, records, moves, partitions = self._get_scale_params()

self.start_redpanda(num_nodes=3)
test_mixed_versions = num_to_upgrade > 0
self.start_redpanda(num_nodes=3,
enable_installer=test_mixed_versions,
install_previous_version=test_mixed_versions)
if test_mixed_versions:
_upgrade_nodes(self.redpanda, num_to_upgrade)

spec = TopicSpec(name="topic",
partition_count=partitions,
replication_factor=3)
Expand Down

0 comments on commit fca5b43

Please sign in to comment.