Skip to content

Commit

Permalink
Merge pull request #5282 from andrwng/upgrade-tests
Browse files Browse the repository at this point in the history
tests: start using RedpandaInstaller in more tests
  • Loading branch information
andrwng committed Jul 27, 2022
2 parents 33e65e1 + 57b91ef commit 0b51519
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 74 deletions.
4 changes: 2 additions & 2 deletions tests/rptest/services/admin_ops_fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ def execute_with_retries(self, op_type, op):
except Exception as e:
error = e
self.redpanda.logger.info(
f"Operation: {op_type} error: {error}, retires left: {self.retries-retry}/{self.retries}"
f"Operation: {op_type} error: {error}, retries left: {self.retries-retry}/{self.retries}"
)
sleep(1)
sleep(self.retries_interval)
raise error

def make_random_operation(self) -> Operation:
Expand Down
19 changes: 16 additions & 3 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.services.rolling_restarter import RollingRestarter
from rptest.services.storage import ClusterStorage, NodeStorage
from rptest.services.utils import BadLogLines, NodeCrash
from rptest.clients.python_librdkafka import PythonLibrdkafka
Expand Down Expand Up @@ -1185,7 +1186,6 @@ def get_version(self, node):

def stop_node(self, node, timeout=None):
pids = self.pids(node)

for pid in pids:
node.account.signal(pid, signal.SIGTERM, allow_fail=False)

Expand Down Expand Up @@ -1390,6 +1390,18 @@ def restart_nodes(self,
for node in nodes:
self.start_node(node, override_cfg_params, timeout=start_timeout)

def rolling_restart_nodes(self,
nodes,
override_cfg_params=None,
start_timeout=None,
stop_timeout=None):
nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes
restarter = RollingRestarter(self)
restarter.restart_nodes(nodes,
override_cfg_params=override_cfg_params,
start_timeout=start_timeout,
stop_timeout=stop_timeout)

def registered(self, node):
"""
Check if a newly added node is fully registered with the cluster, such
Expand Down Expand Up @@ -1743,8 +1755,9 @@ def save_executable(self):
# the same binaries.
return

# Assume that if 'CI' isn't explicitly set to false, we do want to keep
# the executable.
if os.environ.get('CI', None) == 'false':
# We are on a developer workstation
self.logger.info("Skipping saving executable, not in CI")
return

Expand All @@ -1756,7 +1769,7 @@ def save_executable(self):
# still have the original binaries available.
node = self.nodes[0]
if self._installer._started:
head_root_path = self._installer.path_for_version(
head_root_path = self._installer.root_for_version(
RedpandaInstaller.HEAD)
binary = f"{head_root_path}/libexec/redpanda"
else:
Expand Down
22 changes: 22 additions & 0 deletions tests/rptest/services/redpanda_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ def get_unique_versions():
return unique_versions


class InstallOptions:
"""
Options with which to configure the installation of Redpanda in a cluster.
"""
def __init__(self,
install_previous_version=False,
version=None,
num_to_upgrade=0):
# If true, install the highest version of the prior feature version
# before HEAD.
self.install_previous_version = install_previous_version

# Either RedpandaInstaller.HEAD or a numeric tuple representing the
# version to install (e.g. (22, 1, 3)).
self.version = version

# Number of nodes in a cluster to upgrade to HEAD after starting the
# cluster on an older version, e.g. to simulate a mixed-version
# environment.
self.num_to_upgrade = num_to_upgrade


class RedpandaInstaller:
"""
Provides mechanisms to install multiple Redpanda binaries on a cluster.
Expand Down
70 changes: 70 additions & 0 deletions tests/rptest/services/rolling_restarter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import requests

from ducktape.utils.util import wait_until


class RollingRestarter:
"""
Encapsulates the logic needed to perform a rolling restart.
"""
def __init__(self, redpanda):
self.redpanda = redpanda

def restart_nodes(self,
nodes,
override_cfg_params=None,
start_timeout=None,
stop_timeout=None):
"""
Performs a rolling restart on the given nodes, optionally overriding
the given configs.
"""
admin = self.redpanda._admin

def has_drained_leaders(node):
try:
node_id = self.redpanda.idx(node)
broker_resp = admin.get_broker(node_id)
maintenance_status = broker_resp["maintenance_status"]
return maintenance_status["draining"] and maintenance_status[
"finished"]
except requests.exceptions.HTTPError:
return False

# TODO: incorporate rack awareness into this to support updating
# multiple nodes at a time.
for node in nodes:
wait_until(lambda: self.redpanda.healthy(),
timeout_sec=stop_timeout,
backoff_sec=1)

admin.maintenance_start(node)

wait_until(lambda: has_drained_leaders(node),
timeout_sec=stop_timeout,
backoff_sec=1)

wait_until(lambda: self.redpanda.healthy(),
timeout_sec=stop_timeout,
backoff_sec=1)

self.redpanda.stop_node(node, timeout=stop_timeout)

self.redpanda.start_node(node,
override_cfg_params,
timeout=start_timeout)

wait_until(lambda: self.redpanda.healthy(),
timeout_sec=start_timeout,
backoff_sec=1)

admin.maintenance_stop(node)
55 changes: 34 additions & 21 deletions tests/rptest/tests/controller_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,37 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import random
import threading
import time
import re

from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from rptest.clients.types import TopicSpec
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.clients.default import DefaultClient
from ducktape.utils.util import wait_until
from rptest.services.admin import Admin
from rptest.services.admin_ops_fuzzer import AdminOperationsFuzzer
from rptest.services.cluster import cluster
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.tests.end_to_end import EndToEndTest

from ducktape.mark import matrix
# TODO: fix https://github.com/redpanda-data/redpanda/issues/5629
ALLOWED_LOGS = [
# e.g. cluster - controller_backend.cc:466 - exception while executing partition operation: {type: update_finished, ntp: {kafka/test-topic-1944-1639161306808363/1}, offset: 413, new_assignment: { id: 1, group_id: 65, replicas: {{node_id: 3, shard: 2}, {node_id: 4, shard: 2}, {node_id: 1, shard: 0}} }, previous_assignment: {nullopt}} - std::__1::__fs::filesystem::filesystem_error (error system:39, filesystem error: remove failed: Directory not empty [/var/lib/redpanda/data/kafka/test-topic-1944-1639161306808363])
re.compile("cluster - .*Directory not empty"),
]


class ControllerUpgradeTest(EndToEndTest):
@cluster(num_nodes=7, log_allow_list=RESTART_LOG_ALLOW_LIST)
@cluster(num_nodes=7, log_allow_list=RESTART_LOG_ALLOW_LIST + ALLOWED_LOGS)
def test_updating_cluster_when_executing_operations(self):
'''
Validates that cluster is operational when upgrading controller log
'''

# set redpanda logical version to value without __consumer_offsets support
self.redpanda = RedpandaService(
self.test_context,
5,
# set redpanda logical version to v22.1 - last version without serde encoding
environment={"__REDPANDA_LOGICAL_VERSION": 3})
# set redpanda version to v22.1 - last version without serde encoding
self.redpanda = RedpandaService(self.test_context, 5)

installer = self.redpanda._installer
installer.install(self.redpanda.nodes, (22, 1, 4))

self.redpanda.start()
admin_fuzz = AdminOperationsFuzzer(self.redpanda)
Expand Down Expand Up @@ -63,17 +65,28 @@ def cluster_is_stable():

return True

# reset environment to use latest redpanda logical version
self.redpanda.set_environment({})
for n in self.redpanda.nodes:
# Get to a stable starting point before asserting anything about our
# workload.
wait_until(cluster_is_stable, 90, backoff_sec=2)

self.redpanda.restart_nodes(n, stop_timeout=60)
admin_fuzz.wait(5, 240)

# reset to use latest Redpanda version
installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD)

for n in self.redpanda.nodes:
# NOTE: we're not doing a rolling restart to avoid interfering with
# the admin operations.
self.redpanda.restart_nodes([n], stop_timeout=60, start_timeout=90)
num_executed_before_restart = admin_fuzz.executed

# wait for leader balancer to start evening out leadership
wait_until(cluster_is_stable, 90, backoff_sec=2)
admin_fuzz.wait(num_executed_before_restart + 2, 240)

self.run_validation(min_records=100000,
producer_timeout_sec=300,
consumer_timeout_sec=180)
admin_fuzz.wait(20, 240)
num_executed_after_upgrade = admin_fuzz.executed
admin_fuzz.wait(num_executed_after_upgrade + 5, 240)
admin_fuzz.stop()
51 changes: 42 additions & 9 deletions tests/rptest/tests/end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
# - Imported annotate_missing_msgs helper from kafka test suite

from collections import namedtuple
from typing import Optional
import os
from typing import Optional
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda_installer import InstallOptions
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.clients.default import DefaultClient
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.services.verifiable_producer import VerifiableProducer, is_int_with_prefix
Expand Down Expand Up @@ -74,7 +77,8 @@ def __init__(self,
def start_redpanda(self,
num_nodes=1,
extra_rp_conf=None,
si_settings=None):
si_settings=None,
install_opts: Optional[InstallOptions] = None):
self.si_settings = si_settings
if self.si_settings:
self.si_settings.load_context(self.logger, self.test_context)
Expand All @@ -90,7 +94,30 @@ def start_redpanda(self,
extra_rp_conf=self._extra_rp_conf,
extra_node_conf=self._extra_node_conf,
si_settings=self.si_settings)
version_to_install = None
if install_opts:
if install_opts.install_previous_version:
version_to_install = \
self.redpanda._installer.highest_from_prior_feature_version(RedpandaInstaller.HEAD)
if install_opts.version:
version_to_install = install_opts.version

if version_to_install:
self.redpanda._installer.install(self.redpanda.nodes,
version_to_install)
self.redpanda.start()
if version_to_install and install_opts.num_to_upgrade > 0:
# Perform the upgrade rather than starting each node on the
# appropriate version. Redpanda may not start up if starting a new
# cluster with mixed-versions.
nodes_to_upgrade = [
self.redpanda.get_node(i + 1)
for i in range(install_opts.num_to_upgrade)
]
self.redpanda._installer.install(nodes_to_upgrade,
RedpandaInstaller.HEAD)
self.redpanda.restart_nodes(nodes_to_upgrade)

self._client = DefaultClient(self.redpanda)

def client(self):
Expand Down Expand Up @@ -155,16 +182,25 @@ def has_finished_consuming():
err_msg="Consumer failed to consume up to offsets %s after waiting %ds, last consumed offsets: %s." %\
(str(last_acked_offsets), timeout_sec, list(self.last_consumed_offsets)))

def await_num_produced(self, min_records, timeout_sec=30):
wait_until(lambda: self.producer.num_acked > min_records,
timeout_sec=timeout_sec,
err_msg="Producer failed to produce messages for %ds." %\
timeout_sec)

def await_num_consumed(self, min_records, timeout_sec=30):
wait_until(lambda: self.consumer.total_consumed() >= min_records,
timeout_sec=timeout_sec,
err_msg="Timed out after %ds while awaiting record consumption of %d records" %\
(timeout_sec, min_records))

def _collect_all_logs(self):
for s in self.test_context.services:
self.mark_for_collect(s)

def await_startup(self, min_records=5, timeout_sec=30):
try:
wait_until(lambda: self.consumer.total_consumed() >= min_records,
timeout_sec=timeout_sec,
err_msg="Timed out after %ds while awaiting initial record delivery of %d records" %\
(timeout_sec, min_records))
self.await_num_consumed(min_records, timeout_sec)
except BaseException:
self._collect_all_logs()
raise
Expand All @@ -175,10 +211,7 @@ def run_validation(self,
consumer_timeout_sec=30,
enable_idempotence=False):
try:
wait_until(lambda: self.producer.num_acked > min_records,
timeout_sec=producer_timeout_sec,
err_msg="Producer failed to produce messages for %ds." %\
producer_timeout_sec)
self.await_num_produced(min_records, producer_timeout_sec)

self.logger.info("Stopping producer after writing up to offsets %s" %\
str(self.producer.last_acked_offsets))
Expand Down
Loading

0 comments on commit 0b51519

Please sign in to comment.