diff --git a/tests/rptest/tests/end_to_end.py b/tests/rptest/tests/end_to_end.py index 766c9c63aae2e..8ac8d399460bd 100644 --- a/tests/rptest/tests/end_to_end.py +++ b/tests/rptest/tests/end_to_end.py @@ -156,6 +156,14 @@ def client(self): assert self._client is not None return self._client + def rpk_client(self): + assert self._rpk_client is not None + return self._rpk_client + + def admin_client(self): + assert self._admin_client is not None + return self._admin_client + @property def debug_mode(self): """ diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 116d8512c92b1..e92a8072b9478 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -13,22 +13,19 @@ from rptest.services.cluster import cluster from ducktape.utils.util import wait_until -from rptest.clients.kafka_cat import KafkaCat from ducktape.mark import matrix +from rptest.services.rw_verifier import RWVerifier from rptest.utils.mode_checks import skip_debug_mode from rptest.clients.types import TopicSpec -from rptest.clients.rpk import RpkTool from rptest.tests.end_to_end import EndToEndTest from rptest.services.admin import Admin from rptest.services.redpanda_installer import InstallOptions, RedpandaInstaller from rptest.tests.partition_movement import PartitionMovementMixin from rptest.util import wait_until_result from rptest.services.honey_badger import HoneyBadger -from rptest.services.rpk_producer import RpkProducer -from rptest.services.kaf_producer import KafProducer -from rptest.services.rpk_consumer import RpkConsumer from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings +from ducktape.errors import TimeoutError # Errors we should tolerate when moving partitions around PARTITION_MOVEMENT_LOG_ERRORS = [ @@ -84,19 +81,18 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade): hb.set_exception(n, 'raftgen_service::failure_probes', 'vote') topic = "topic-1" partition = 0 - spec = TopicSpec(name=topic, partition_count=1, replication_factor=3) - self.client().create_topic(spec) - admin = Admin(self.redpanda) + self.rpk_client().create_topic(topic, 1, 3) # choose a random topic-partition self.logger.info(f"selected topic-partition: {topic}-{partition}") # get the partition's replica set, including core assignments. the kafka # api doesn't expose core information, so we use the redpanda admin api. - assignments = self._get_assignments(admin, topic, partition) + assignments = self._get_assignments(self._admin_client, topic, + partition) self.logger.info(f"assignments for {topic}-{partition}: {assignments}") - brokers = admin.get_brokers() + brokers = self._admin_client.get_brokers() # replace all node cores in assignment for assignment in assignments: for broker in brokers: @@ -106,10 +102,11 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade): self.logger.info( f"new assignments for {topic}-{partition}: {assignments}") - r = admin.set_partition_replicas(topic, partition, assignments) + self._admin_client.set_partition_replicas(topic, partition, + assignments) def status_done(): - info = admin.get_partitions(topic, partition) + info = self._admin_client.get_partitions(topic, partition) self.logger.info( f"current assignments for {topic}-{partition}: {info}") converged = self._equal_assignments(info["replicas"], assignments) @@ -122,7 +119,8 @@ def status_done(): wait_until(status_done, timeout_sec=60, backoff_sec=2) def derived_done(): - info = self._get_current_partitions(admin, topic, partition) + info = self._get_current_partitions(self._admin_client, topic, + partition) self.logger.info( f"derived assignments for {topic}-{partition}: {info}") return self._equal_assignments(info, assignments) @@ -144,104 +142,13 @@ def test_empty(self, num_to_upgrade): topics = [] for partition_count in range(1, 5): for replication_factor in (1, 3): - name = f"topic{len(topics)}" - spec = TopicSpec(name=name, - partition_count=partition_count, - replication_factor=replication_factor) - topics.append(spec) - - for spec in topics: - self.client().create_topic(spec) - - for _ in range(25): - self._move_and_verify() - - @cluster(num_nodes=4, - log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + - PREV_VERSION_LOG_ALLOW_LIST) - @matrix(num_to_upgrade=[0, 2]) - def test_static(self, num_to_upgrade): - """ - Move partitions with data, but no active producers or consumers. - """ - self.logger.info(f"Starting redpanda...") - test_mixed_versions = num_to_upgrade > 0 - install_opts = InstallOptions( - install_previous_version=test_mixed_versions, - num_to_upgrade=num_to_upgrade) - self.start_redpanda(num_nodes=3, install_opts=install_opts) - - topics = [] - for partition_count in range(1, 5): - for replication_factor in (1, 3): - name = f"topic{len(topics)}" - spec = TopicSpec(name=name, - partition_count=partition_count, - replication_factor=replication_factor) - topics.append(spec) - - self.logger.info(f"Creating topics...") - for spec in topics: - self.client().create_topic(spec) - - num_records = 1000 - produced = set( - ((f"key-{i:08d}", f"record-{i:08d}") for i in range(num_records))) - - for spec in topics: - self.logger.info(f"Producing to {spec}") - producer = KafProducer(self.test_context, self.redpanda, spec.name, - num_records) - producer.start() - self.logger.info( - f"Finished producing to {spec}, waiting for producer...") - producer.wait() - producer.free() - self.logger.info(f"Producer stop complete.") + name = f"topic_{partition_count}_{replication_factor}" + self.rpk_client().create_topic(name, partition_count, + replication_factor) for _ in range(25): self._move_and_verify() - for spec in topics: - self.logger.info(f"Verifying records in {spec}") - - consumer = RpkConsumer(self.test_context, - self.redpanda, - spec.name, - ignore_errors=False, - retries=0) - consumer.start() - timeout = 30 - t1 = time.time() - consumed = set() - while consumed != produced: - if time.time() > t1 + timeout: - self.logger.error( - f"Validation failed for topic {spec.name}. Produced {len(produced)}, consumed {len(consumed)}" - ) - self.logger.error( - f"Messages consumed but not produced: {sorted(consumed - produced)}" - ) - self.logger.error( - f"Messages produced but not consumed: {sorted(produced - consumed)}" - ) - assert set(consumed) == produced - else: - time.sleep(5) - for m in consumer.messages: - self.logger.info(f"message: {m}") - consumed = set([(m['key'], m['value']) - for m in consumer.messages]) - - self.logger.info(f"Stopping consumer...") - consumer.stop() - self.logger.info(f"Awaiting consumer...") - consumer.wait() - self.logger.info(f"Freeing consumer...") - consumer.free() - - self.logger.info(f"Finished verifying records in {spec}") - def _get_scale_params(self): """ Helper for reducing traffic generation parameters @@ -253,7 +160,7 @@ def _get_scale_params(self): return throughput, records, moves - @cluster(num_nodes=5, + @cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + PREV_VERSION_LOG_ALLOW_LIST) @matrix(num_to_upgrade=[0, 2]) @@ -261,7 +168,7 @@ def test_dynamic(self, num_to_upgrade): """ Move partitions with active consumer / producer """ - throughput, records, moves = self._get_scale_params() + _, records, moves = self._get_scale_params() test_mixed_versions = num_to_upgrade > 0 install_opts = InstallOptions( @@ -270,18 +177,27 @@ def test_dynamic(self, num_to_upgrade): self.start_redpanda(num_nodes=3, install_opts=install_opts, extra_rp_conf={"default_topic_replications": 3}) - spec = TopicSpec(name="topic", partition_count=3, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name + topic = "topic" + self.rpk_client().create_topic(topic, 3, 3) - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(topic, self.redpanda.brokers(), topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_verifier.ensure_progress(topic, 100, 10) + + self.logger.info(f"initiate movement") for _ in range(moves): self._move_and_verify() - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_verifier.has_cleared(topic, records, 45) + + self.logger.info(f"stopping the workload") + rw_verifier.remote_stop(topic) + rw_verifier.remote_wait(topic) @cluster(num_nodes=5, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + @@ -314,23 +230,24 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade): self.start_consumer(1) self.await_startup() - admin = Admin(self.redpanda) topic = "__consumer_offsets" partition = 0 for _ in range(moves): - assignments = self._get_assignments(admin, topic, partition) + assignments = self._get_assignments(self.admin_client(), topic, + partition) for a in assignments: # Bounce between core 0 and 1 a['core'] = (a['core'] + 1) % 2 - admin.set_partition_replicas(topic, partition, assignments) + self.admin_client().set_partition_replicas(topic, partition, + assignments) self._wait_post_move(topic, partition, assignments, 360) self.run_validation(enable_idempotence=False, consumer_timeout_sec=45, min_records=records) - @cluster(num_nodes=5, + @cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + RESTART_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST) @matrix(num_to_upgrade=[0, 2]) @@ -343,23 +260,25 @@ def test_bootstrapping_after_move(self, num_to_upgrade): install_previous_version=test_mixed_versions, num_to_upgrade=num_to_upgrade) self.start_redpanda(num_nodes=3, install_opts=install_opts) - spec = TopicSpec(name="topic", partition_count=3, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1) - self.start_consumer(1) - self.await_startup() + topic = "topic" + self.rpk_client().create_topic(topic, 3, 3) + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(topic, self.redpanda.brokers(), topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") # execute single move self._move_and_verify() - self.run_validation(enable_idempotence=False, consumer_timeout_sec=45) - - # snapshot offsets - rpk = RpkTool(self.redpanda) + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(topic, 100, 10) + self.logger.info(f"checkin if cleared 1000 operations") + rw_verifier.has_cleared(topic, 1000, 45) + rw_verifier.remote_stop(topic) + rw_verifier.remote_wait(topic) def has_offsets_for_all_partitions(): # NOTE: partitions may not be returned if their fields can't be # populated, e.g. during leadership changes. - partitions = list(rpk.describe_topic(spec.name)) + partitions = list(self.rpk_client().describe_topic(topic)) if len(partitions) == 3: return (True, partitions) return (False, None) @@ -374,7 +293,7 @@ def has_offsets_for_all_partitions(): self.redpanda.restart_nodes(self.redpanda.nodes) def offsets_are_recovered(): - partitions_after = list(rpk.describe_topic(spec.name)) + partitions_after = list(self.rpk_client().describe_topic(topic)) if len(partitions_after) != 3: return False return all([ @@ -395,14 +314,13 @@ def test_invalid_destination(self, num_to_upgrade): install_previous_version=test_mixed_versions, num_to_upgrade=num_to_upgrade) self.start_redpanda(num_nodes=3, install_opts=install_opts) - spec = TopicSpec(name="topic", partition_count=1, replication_factor=1) - self.client().create_topic(spec) - topic = spec.name + topic = "topic" + self.rpk_client().create_topic(topic, 1, 1) partition = 0 - admin = Admin(self.redpanda) - brokers = admin.get_brokers() - assignments = self._get_assignments(admin, topic, partition) + brokers = self.admin_client().get_brokers() + assignments = self._get_assignments(self.admin_client(), topic, + partition) # Pick a node id where the topic currently isn't allocated valid_dest = list( @@ -417,7 +335,8 @@ def test_invalid_destination(self, num_to_upgrade): # A valid node but an invalid core assignments = [{"node_id": valid_dest, "core": invalid_shard}] try: - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas( + topic, partition, assignments) except requests.exceptions.HTTPError as e: assert e.response.status_code == 400 else: @@ -426,7 +345,8 @@ def test_invalid_destination(self, num_to_upgrade): # An invalid node but a valid core assignments = [{"node_id": invalid_dest, "core": 0}] try: - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas( + topic, partition, assignments) except requests.exceptions.HTTPError as e: assert e.response.status_code == 400 else: @@ -436,7 +356,8 @@ def test_invalid_destination(self, num_to_upgrade): # Reproducer for https://github.com/redpanda-data/redpanda/issues/2286 assignments = [{"node_id": valid_dest, "core": 3.14}] try: - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas( + topic, partition, assignments) except requests.exceptions.HTTPError as e: assert e.response.status_code == 400 else: @@ -444,7 +365,8 @@ def test_invalid_destination(self, num_to_upgrade): assignments = [{"node_id": 3.14, "core": 0}] try: - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas( + topic, partition, assignments) except requests.exceptions.HTTPError as e: assert e.response.status_code == 400 else: @@ -459,7 +381,8 @@ def test_invalid_destination(self, num_to_upgrade): "core": 0 }] try: - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas( + topic, partition, assignments) except requests.exceptions.HTTPError as e: assert e.response.status_code == 400 else: @@ -467,7 +390,8 @@ def test_invalid_destination(self, num_to_upgrade): # Finally a valid move assignments = [{"node_id": valid_dest, "core": 0}] - r = admin.set_partition_replicas(topic, partition, assignments) + r = self.admin_client().set_partition_replicas(topic, partition, + assignments) assert r.status_code == 200 @cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) @@ -488,44 +412,6 @@ def test_overlapping_changes(self, num_to_upgrade): install_previous_version=test_mixed_versions) self.start_redpanda(num_nodes=4, install_opts=install_opts) - node_ids = {1, 2, 3, 4} - - # Create topic with enough data that inter-node movement - # will take a while. - name = f"movetest" - spec = TopicSpec(name=name, partition_count=1, replication_factor=3) - self.client().create_topic(spec) - - # Wait for the partition to have a leader (`rpk produce` errors - # out if it tries to write data before this) - def partition_ready(): - return KafkaCat(self.redpanda).get_partition_leader( - name, 0)[0] is not None - - wait_until(partition_ready, timeout_sec=10, backoff_sec=0.5) - - # Write a substantial amount of data to the topic - msg_size = 512 * 1024 - write_bytes = 512 * 1024 * 1024 - producer = RpkProducer(self._ctx, - self.redpanda, - name, - msg_size=msg_size, - msg_count=int(write_bytes / msg_size)) - t1 = time.time() - producer.start() - - # This is an absurdly low expected throughput, but necessarily - # so to run reliably on current test runners, which share an EBS - # backend among many parallel tests. 10MB/s has been empirically - # shown to be too high an expectation. - expect_bps = 1 * 1024 * 1024 - expect_runtime = write_bytes / expect_bps - producer.wait(timeout_sec=expect_runtime) - - self.logger.info( - f"Write complete {write_bytes} in {time.time() - t1} seconds") - # - Admin API redirects writes but not reads. Because we want synchronous # status after submitting operations, send all operations to the controller # leader. This is not necessary for operations to work, just to simplify @@ -533,9 +419,37 @@ def partition_ready(): # - Because we will later verify that a 503 is sent in response to # a move request to an in_progress topic, set retry_codes=[] to # disable default retries on 503. - admin_node = self.redpanda.controller() + admin_node = self.await_controller() + admin_node = self.redpanda.get_node(admin_node) admin = Admin(self.redpanda, default_node=admin_node, retry_codes=[]) + node_ids = {1, 2, 3, 4} + + # Create topic with enough data that inter-node movement + # will take a while. + name = f"movetest" + partition_count = 1 + self.rpk_client().create_topic(name, partition_count, 3) + + admin.await_stable_leader(name, + partition=0, + replication=3, + timeout_s=10, + backoff_s=0.5) + + msgs = 1024 + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + self.logger.info(f"Producing to {name}") + t1 = time.time() + rw_verifier.remote_start(name, self.redpanda.brokers(), name, + partition_count) + rw_verifier.ensure_progress(name, msgs, 45) + rw_verifier.remote_stop(name) + rw_verifier.remote_wait(name) + self.logger.info( + f"Write complete {msgs} in {time.time() - t1} seconds") + # Start an inter-node move, which should take some time # to complete because of recovery network traffic assignments = self._get_assignments(admin, name, 0) @@ -559,15 +473,14 @@ def partition_ready(): # An update to partition properties should succeed # (issue https://github.com/redpanda-data/redpanda/issues/2300) - rpk = RpkTool(self.redpanda) assert admin.get_partitions(name, 0)['status'] == "in_progress" - rpk.alter_topic_config(name, "retention.ms", "3600000") + self.rpk_client().alter_topic_config(name, "retention.ms", "3600000") # A deletion should succeed - assert name in rpk.list_topics() + assert name in self.rpk_client().list_topics() assert admin.get_partitions(name, 0)['status'] == "in_progress" - rpk.delete_topic(name) - assert name not in rpk.list_topics() + self.rpk_client().delete_topic(name) + assert name not in self.rpk_client().list_topics() @cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) @matrix(num_to_upgrade=[0, 2]) @@ -585,26 +498,28 @@ def test_deletion_stops_move(self, num_to_upgrade): # create a single topic with replication factor of 1 topic = 'test-topic' - self.rpk_client().create_topic(topic, 1, 1) + partition_count = 1 + self.rpk_client().create_topic(topic, partition_count, 1) partition = 0 num_records = 1000 self.logger.info(f"Producing to {topic}") - producer = KafProducer(self.test_context, self.redpanda, topic, - num_records) - producer.start() - self.logger.info( - f"Finished producing to {topic}, waiting for producer...") - producer.wait() - producer.free() + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(topic, self.redpanda.brokers(), topic, + partition_count) + rw_verifier.ensure_progress(topic, num_records, 45) + rw_verifier.remote_stop(topic) + rw_verifier.remote_wait(topic) + self.logger.info(f"Producer stop complete.") - admin = Admin(self.redpanda) # get current assignments - assignments = self._get_assignments(admin, topic, partition) + assignments = self._get_assignments(self.admin_client(), topic, + partition) assert len(assignments) == 1 self.logger.info(f"assignments for {topic}-{partition}: {assignments}") - brokers = admin.get_brokers() + brokers = self.admin_client().get_brokers() self.logger.info(f"available brokers: {brokers}") candidates = list( filter(lambda b: b['node_id'] != assignments[0]['node_id'], @@ -622,25 +537,22 @@ def test_deletion_stops_move(self, num_to_upgrade): alive_hosts = [ n.account.hostname for n in self.redpanda.nodes if n != node ] - controller_leader = admin.await_stable_leader( - topic="controller", - partition=0, - namespace="redpanda", + controller_leader = self.await_controller( hosts=alive_hosts, - check=lambda node_id: node_id != self.redpanda.idx(node), - timeout_s=30) + check=lambda node_id: node_id != self.redpanda.idx(node)) controller_leader = self.redpanda.get_node(controller_leader) - admin.set_partition_replicas(topic, - partition, - target_assignment, - node=controller_leader) + self.admin_client().set_partition_replicas(topic, + partition, + target_assignment, + node=controller_leader) # check that the status is in progress def get_status(): try: - partition_info = admin.get_partitions(topic, partition) + partition_info = self.admin_client().get_partitions( + topic, partition) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: self.logger.info( @@ -663,34 +575,39 @@ def get_status(): self.rpk_client().create_topic(topic, 1, 1) wait_until(lambda: get_status() == 'done', 10, 1) - @cluster(num_nodes=5) + @cluster(num_nodes=4) def test_down_replicate(self): """ Test changing replication factor from 3 -> 1 """ - throughput, records, _ = self._get_scale_params() + _, records, _ = self._get_scale_params() partition_count = 5 self.start_redpanda(num_nodes=3) - admin = Admin(self.redpanda) - spec = TopicSpec(partition_count=partition_count, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + self.topic = "test-topic" + self.rpk_client().create_topic(self.topic, partition_count, 3) + + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(self.topic, self.redpanda.brokers(), + self.topic, partition_count) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_verifier.ensure_progress(self.topic, 100, 10) for partition in range(0, partition_count): - assignments = self._get_assignments(admin, self.topic, partition) + assignments = self._get_assignments(self.admin_client(), + self.topic, partition) new_assignment = [assignments[0]] - admin.set_partition_replicas(self.topic, partition, new_assignment) + self.admin_client().set_partition_replicas(self.topic, partition, + new_assignment) self._wait_post_move(self.topic, partition, new_assignment, 60) - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_verifier.has_cleared(self.topic, records, 45) - @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + @cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST) def test_availability_when_one_node_down(self): """ Test availability during partition reconfiguration. @@ -698,20 +615,24 @@ def test_availability_when_one_node_down(self): The test validates if a partition is available when one of its replicas is down during reconfiguration. """ - throughput, records, _ = self._get_scale_params() + _, records, _ = self._get_scale_params() partition_count = 1 self.start_redpanda(num_nodes=4) - admin = Admin(self.redpanda) - spec = TopicSpec(partition_count=partition_count, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name + + self.topic = "test-topic" + self.rpk_client().create_topic(self.topic, partition_count, 3) + partition_id = random.randint(0, partition_count - 1) - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(self.topic, self.redpanda.brokers(), + self.topic, partition_count) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_verifier.ensure_progress(self.topic, 100, 10) - assignments = self._get_assignments(admin, self.topic, partition_id) + assignments = self._get_assignments(self.admin_client(), self.topic, + partition_id) self.logger.info( f"current assignment for {self.topic}/{partition_id}: {assignments}" ) @@ -719,7 +640,7 @@ def test_availability_when_one_node_down(self): for a in assignments: current_replicas.add(a['node_id']) # replace single replica - brokers = admin.get_brokers() + brokers = self.admin_client().get_brokers() to_select = [ b for b in brokers if b['node_id'] not in current_replicas ] @@ -735,9 +656,8 @@ def test_availability_when_one_node_down(self): self.redpanda.stop_node(self.redpanda.get_node(to_stop)) def new_controller_available(): - controller_id = admin.get_partition_leader(namespace="redpanda", - topic="controller", - partition=0) + controller_id = self.admin_client().get_partition_leader( + namespace="redpanda", topic="controller", partition=0) self.logger.debug( f"current controller: {controller_id}, stopped node: {to_stop}" ) @@ -745,10 +665,11 @@ def new_controller_available(): wait_until(new_controller_available, 30, 1) # ask partition to move - admin.set_partition_replicas(self.topic, partition_id, assignments) + self.admin_client().set_partition_replicas(self.topic, partition_id, + assignments) def status_done(): - info = admin.get_partitions(self.topic, partition_id) + info = self.admin_client().get_partitions(self.topic, partition_id) self.logger.info( f"current assignments for {self.topic}/{partition_id}: {info}") converged = self._equal_assignments(info["replicas"], assignments) @@ -757,9 +678,10 @@ def status_done(): # wait until redpanda reports complete wait_until(status_done, timeout_sec=40, backoff_sec=2) - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_verifier.has_cleared(self.topic, records, 45) class SIPartitionMovementTest(PartitionMovementMixin, EndToEndTest): @@ -808,7 +730,7 @@ def _partial_upgrade(self, num_to_upgrade: int): start_timeout=90, stop_timeout=90) - @cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) + @cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) @matrix(num_to_upgrade=[0, 2]) @skip_debug_mode # rolling restarts require more reliable recovery that a slow debug mode cluster can provide def test_shadow_indexing(self, num_to_upgrade): @@ -824,16 +746,20 @@ def test_shadow_indexing(self, num_to_upgrade): install_opts = InstallOptions( install_previous_version=test_mixed_versions) self.start_redpanda(num_nodes=3, install_opts=install_opts) - installer = self.redpanda._installer - - spec = TopicSpec(name="topic", - partition_count=partitions, - replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + self.topic = "topic" + self.rpk_client().create_topic(self.topic, partitions, 3) + + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(self.topic, self.redpanda.brokers(), + self.topic, partitions) + self.logger.info(f"waiting for 100 r&w operations to warm up") + try: + rw_verifier.ensure_progress(self.topic, 100, 30) + except TimeoutError: + rw_verifier.remote_stop(self.topic) + rw_verifier.remote_wait(self.topic) + raise # We will start an upgrade halfway through the test: this ensures # that a single-version cluster existed for long enough to actually @@ -847,11 +773,15 @@ def test_shadow_indexing(self, num_to_upgrade): self._move_and_verify() - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_verifier.has_cleared(self.topic, records, 45) + self.logger.info(f"stopping the workload") + rw_verifier.remote_stop(self.topic) + rw_verifier.remote_wait(self.topic) - @cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) + @cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST) @matrix(num_to_upgrade=[0, 2]) @skip_debug_mode # rolling restarts require more reliable recovery that a slow debug mode cluster can provide def test_cross_shard(self, num_to_upgrade): @@ -866,17 +796,21 @@ def test_cross_shard(self, num_to_upgrade): install_previous_version=test_mixed_versions) self.start_redpanda(num_nodes=3, install_opts=install_opts) - spec = TopicSpec(name="topic", - partition_count=partitions, - replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + self.topic = "topic" + self.rpk_client().create_topic(self.topic, partitions, 3) + + rw_verifier = RWVerifier(self.test_context, self.redpanda) + rw_verifier.start() + rw_verifier.remote_start(self.topic, self.redpanda.brokers(), + self.topic, partitions) + self.logger.info(f"waiting for 100 r&w operations to warm up") + try: + rw_verifier.ensure_progress(self.topic, 100, 30) + except TimeoutError: + rw_verifier.remote_stop(self.topic) + rw_verifier.remote_wait(self.topic) + raise - admin = Admin(self.redpanda) - topic = self.topic partition = 0 # We will start an upgrade halfway through the test: this ensures @@ -893,9 +827,14 @@ def test_cross_shard(self, num_to_upgrade): for a in assignments: # Bounce between core 0 and 1 a['core'] = (a['core'] + 1) % 2 - admin.set_partition_replicas(topic, partition, assignments) - self._wait_post_move(topic, partition, assignments, 360) - - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + self.admin_client().set_partition_replicas(self.topic, partition, + assignments) + self._wait_post_move(self.topic, partition, assignments, 360) + + self.logger.info(f"waiting for 100 r&w operations") + rw_verifier.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_verifier.has_cleared(self.topic, records, 45) + self.logger.info(f"stopping the workload") + rw_verifier.remote_stop(self.topic) + rw_verifier.remote_wait(self.topic)