-
Notifications
You must be signed in to change notification settings - Fork 579
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests/workload_upgrade_runner_test: RedpandaUpgradeTest
this test is a runner for a collection PWorkload. it will create an upgrade path, insert patch downgrades, setup a cluster and run the workloads concurrently against the cluster. at the end of the test it will report failed workloads. WorkloadAdapter is a wrapper to keep track workload state and to store any thrown exception
- Loading branch information
Showing
1 changed file
with
283 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,283 @@ | ||
import time | ||
import traceback | ||
from typing import Any, Optional | ||
from venv import logger | ||
from rptest.clients.offline_log_viewer import OfflineLogViewer | ||
from rptest.services.cluster import cluster | ||
from rptest.services.admin import Admin | ||
from rptest.services.redpanda import SISettings | ||
from rptest.services.redpanda_installer import RedpandaInstaller, RedpandaVersion, RedpandaVersionTriple | ||
from rptest.services.workload_protocol import PWorkload | ||
from rptest.tests.prealloc_nodes import PreallocNodesTest | ||
from rptest.tests.workload_producer_consumer import ProducerConsumerWorkload | ||
from rptest.tests.workload_dummy import DummyWorkload, MinimalWorkload | ||
from rptest.tests.redpanda_test import RedpandaTest | ||
from rptest.util import wait_until | ||
from rptest.tests.workload_license import LicenseWorkload | ||
|
||
|
||
def expand_version( | ||
installer: RedpandaInstaller, | ||
version: Optional[RedpandaVersion]) -> RedpandaVersionTriple: | ||
if version is None: | ||
# return oldest release line | ||
return installer.latest_for_line(installer.oldest_version()[0:2])[0] | ||
|
||
if version == RedpandaInstaller.HEAD: | ||
return installer.head_version() | ||
|
||
if len(version) == 3: | ||
return version | ||
|
||
# version is a release line, get latest minor for it | ||
return installer.latest_for_line(version)[0] | ||
|
||
|
||
class WorkloadAdapter(PWorkload): | ||
""" | ||
WorkloadAdapter is a wrapper around a PWorkload that keeps track of the state and save the error if one occurs. | ||
""" | ||
NOT_STARTED = "not_started" | ||
STARTED = "started" | ||
STOPPED = "stopped" | ||
STOPPED_WITH_ERROR = "stopped_with_error" | ||
|
||
def __init__(self, workload: PWorkload, ctx: RedpandaTest, | ||
installer: RedpandaInstaller) -> None: | ||
self.workload = workload | ||
self.ctx = ctx | ||
self.installer = installer | ||
self.state = WorkloadAdapter.NOT_STARTED | ||
self.error: Optional[Exception] = None | ||
self.earliest_v: Optional[tuple[int, int, int]] = None | ||
self.latest_v: Optional[tuple[int, int, int]] = None | ||
|
||
def get_earliest_applicable_release(self) -> tuple[int, int, int]: | ||
if self.earliest_v is None: | ||
self.earliest_v = expand_version( | ||
self.installer, | ||
self.workload.get_earliest_applicable_release()) | ||
|
||
return self.earliest_v | ||
|
||
def get_latest_applicable_release(self) -> tuple[int, int, int]: | ||
if self.latest_v is None: | ||
self.latest_v = expand_version( | ||
self.installer, self.workload.get_latest_applicable_release()) | ||
return self.latest_v | ||
|
||
def _exec_workload_method(self, final_state: str, method_name: str, *args): | ||
""" | ||
Executes a workload method and updates the state accordingly. | ||
Exceptions are saved and will prevent future execution of any method | ||
""" | ||
if self.state == WorkloadAdapter.STOPPED_WITH_ERROR: | ||
return None | ||
|
||
try: | ||
result = getattr(self.workload, method_name)(*args) | ||
self.state = final_state | ||
return result | ||
except Exception as e: | ||
self.ctx.logger.error( | ||
f"{self.workload.get_workload_name()} Exception in {method_name}(): {traceback.format_exception(e)}" | ||
) | ||
# the stacktrace is captured and saved in the trace variable | ||
# so that it can be used in the error message | ||
# along with time of failure | ||
self.time_of_failure = time.time() | ||
self.error = e | ||
self.state = WorkloadAdapter.STOPPED_WITH_ERROR | ||
return None | ||
|
||
def begin(self): | ||
self._exec_workload_method(WorkloadAdapter.STARTED, "begin") | ||
|
||
def end(self): | ||
self._exec_workload_method(WorkloadAdapter.STOPPED, "end") | ||
|
||
def partial_progress(self, versions: dict[Any, RedpandaVersionTriple]): | ||
res = self._exec_workload_method(self.state, "partial_progress", | ||
versions) | ||
return res if res is not None else PWorkload.DONE | ||
|
||
def get_workload_name(self): | ||
return self.workload.get_workload_name() | ||
|
||
def progress(self, version: RedpandaVersionTriple): | ||
res = self._exec_workload_method(self.state, "progress", version) | ||
return res if res is not None else PWorkload.DONE | ||
|
||
|
||
class RedpandaUpgradeTest(PreallocNodesTest): | ||
def __init__(self, test_context): | ||
# si_settings are needed for LicenseWorkload | ||
super().__init__(test_context=test_context, | ||
num_brokers=3, | ||
si_settings=SISettings(test_context), | ||
node_prealloc_count=1) | ||
|
||
self.installer = self.redpanda._installer | ||
|
||
# workloads that will be excuted during this test | ||
workloads: list[PWorkload] = [ | ||
DummyWorkload(self), | ||
MinimalWorkload(self), | ||
LicenseWorkload(self), | ||
ProducerConsumerWorkload(self), | ||
] | ||
|
||
# setup self as context for the workloads | ||
self.adapted_workloads: list[WorkloadAdapter] = [ | ||
WorkloadAdapter(workload=w, ctx=self, installer=self.installer) | ||
for w in workloads | ||
] | ||
|
||
self.upgrade_steps: list[RedpandaVersionTriple] = [] | ||
|
||
def setUp(self): | ||
# at the end of setUp, self.upgrade_steps will look like this: | ||
# [(22, 1, 11), (22, 1, 10), (22, 1, 11), | ||
# (22, 2, 11), (22, 2, 10), (22, 2, 11), | ||
# (22, 3, 16), (22, 3, 15), (22, 3, 16), | ||
# (23, 1, 7), (23, 1, 6), (23, 1, 7), | ||
# (23, 2, 0)] | ||
|
||
# compute the upgrade steps, merging the upgrade steps of each workload | ||
workloads_steps = [ | ||
self.installer.release_range( | ||
earliest=w.get_earliest_applicable_release(), | ||
latest=w.get_latest_applicable_release()) | ||
for w in self.adapted_workloads | ||
] | ||
|
||
# compute the release from one year ago, this is the oldest supported version | ||
head_line = self.installer.head_version()[0:2] | ||
oldest_supported_line = (head_line[0] - 1, head_line[1]) | ||
if oldest_supported_line[1] == 0: | ||
# at the start of the year dev version has is in the form vX.0, but it's really vX.1 | ||
oldest_supported_line = (oldest_supported_line[0], 1) | ||
|
||
latest_unsupported_line = (oldest_supported_line[0], | ||
oldest_supported_line[1] - 1) | ||
if latest_unsupported_line[1] == 0: | ||
# if going back, version vX.0 is v(X-1).3 | ||
latest_unsupported_line = (latest_unsupported_line[0] - 1, 3) | ||
|
||
latest_unsupported_line = (22, 3) | ||
# keeping only releases older than latest EOL. | ||
forward_upgrade_steps = [ | ||
v for v in sorted(set(sum(workloads_steps, start=[]))) | ||
if v >= latest_unsupported_line | ||
] | ||
|
||
# for each version, include a downgrade step to previous patch, then go to latest patch | ||
self.upgrade_steps: list[RedpandaVersionTriple] = [] | ||
prev = forward_upgrade_steps[0] | ||
for v in forward_upgrade_steps: | ||
if v[0:2] != prev[0:2] and prev[2] > 1: | ||
# if the line has changed, add previous patch and again latest for line | ||
previous_patch = (prev[0], prev[1], prev[2] - 1) | ||
self.upgrade_steps.extend([previous_patch, prev]) | ||
self.upgrade_steps.append(v) | ||
# update the latest_current_line | ||
prev = v | ||
|
||
self.logger.info(f"going through these versions: {self.upgrade_steps}") | ||
|
||
def _check_workload_list(self, | ||
to_check_list: list[WorkloadAdapter], | ||
version_param: RedpandaVersionTriple | ||
| dict[Any, RedpandaVersionTriple], | ||
partial_update: bool = False): | ||
# run checks on all the workloads in the to_check_list | ||
# each check could take multiple runs, so loop on a list of it until exhaustion | ||
|
||
str_update_kind = "" if not partial_update else "partial " | ||
progress_method = "progress" if not partial_update else "partial_progress" | ||
|
||
while len(to_check_list) > 0: | ||
self.logger.info( | ||
f"checking { str_update_kind }progress for {[w.get_workload_name() for w in to_check_list]}" | ||
) | ||
# check progress of each workload in the to_check_list | ||
# and if a workload is done, remove it from the list | ||
status_progress = { | ||
w: getattr(w, progress_method)(version_param) | ||
for w in to_check_list | ||
} | ||
for w, state in status_progress.items(): | ||
if state == PWorkload.DONE: | ||
self.logger.info( | ||
f"{w.get_workload_name()} {str_update_kind}progress check done" | ||
) | ||
to_check_list.remove(w) | ||
|
||
def cluster_version(self) -> int: | ||
return Admin(self.redpanda).get_features()['cluster_version'] | ||
|
||
@cluster(num_nodes=4) | ||
def test_workloads_through_releases(self): | ||
# this callback will be called between each upgrade, in a mixed version state | ||
def mid_upgrade_check(raw_versions: dict[Any, RedpandaVersion]): | ||
rp_versions = { | ||
k: expand_version(self.installer, v) | ||
for k, v in raw_versions.items() | ||
} | ||
next_version = max(rp_versions.values()) | ||
# check only workload that are active and that can operate with next_version | ||
to_check_workloads = [ | ||
w for w in self.adapted_workloads | ||
if w.state == WorkloadAdapter.STARTED | ||
and next_version <= w.get_latest_applicable_release() | ||
] | ||
self._check_workload_list(to_check_list=to_check_workloads, | ||
version_param=rp_versions, | ||
partial_update=True) | ||
|
||
# upgrade loop: for each version | ||
for current_version in self.upgrade_through_versions( | ||
self.upgrade_steps, | ||
already_running=False, | ||
mid_upgrade_check=mid_upgrade_check): | ||
current_version = expand_version(self.installer, current_version) | ||
# setup workload that could start at current_version | ||
for w in self.adapted_workloads: | ||
if w.state == WorkloadAdapter.NOT_STARTED and current_version >= w.get_earliest_applicable_release( | ||
): | ||
self.logger.info(f"setup {w.get_workload_name()}") | ||
w.begin() # this will set in a STARTED state | ||
|
||
# run checks on all the started workload. | ||
# each check could take multiple runs, so loop on a list of it until exhaustion | ||
self._check_workload_list(to_check_list= \ | ||
[w for w in self.adapted_workloads if w.state == WorkloadAdapter.STARTED], | ||
version_param=current_version) | ||
|
||
# stop workload that can't operate with next_version | ||
for w in self.adapted_workloads: | ||
if w.state == WorkloadAdapter.STARTED and current_version == w.get_latest_applicable_release( | ||
): | ||
self.logger.info(f"teardown of {w.get_workload_name()}") | ||
w.end() | ||
|
||
# check workloads stopped with error, and format the exceptions into concat_error | ||
concat_error: list[str] = [] | ||
for w in self.adapted_workloads: | ||
if w.state == WorkloadAdapter.STOPPED_WITH_ERROR: | ||
concat_error.append( | ||
f"{w.get_workload_name()} failed at {w.time_of_failure} - {time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(w.time_of_failure))}" | ||
) | ||
concat_error.extend(traceback.format_exception(w.error)) | ||
# if concat_error is not empty, raise it as an exception | ||
if len(concat_error) > 0: | ||
raise Exception("\n".join(concat_error)) | ||
|
||
# Validate that the data structures written by a mixture of historical | ||
# versions remain readable by our current debug tools | ||
log_viewer = OfflineLogViewer(self.redpanda) | ||
for node in self.redpanda.nodes: | ||
controller_records = log_viewer.read_controller(node=node) | ||
self.logger.info( | ||
f"Read {len(controller_records)} controller records from node {node.name} successfully" | ||
) |