From 9ac4f04044a5495025939534f3f479c037088f47 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sun, 26 Jun 2022 12:24:45 -0700 Subject: [PATCH] partition_movement_test: use new workloads duration of partition_movement_test.py falls down from 12 minutes 39.137 seconds to 8 minutes 53.753 seconds --- tests/rptest/tests/end_to_end.py | 2 + tests/rptest/tests/partition_movement_test.py | 312 ++++++++---------- 2 files changed, 146 insertions(+), 168 deletions(-) diff --git a/tests/rptest/tests/end_to_end.py b/tests/rptest/tests/end_to_end.py index 144ea3e62b7a3..54c4552a58e87 100644 --- a/tests/rptest/tests/end_to_end.py +++ b/tests/rptest/tests/end_to_end.py @@ -24,6 +24,7 @@ from rptest.util import wait_until from rptest.services.redpanda import RedpandaService from rptest.clients.default import DefaultClient +from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.services.verifiable_consumer import VerifiableConsumer from rptest.services.verifiable_producer import VerifiableProducer, is_int_with_prefix @@ -91,6 +92,7 @@ def start_redpanda(self, si_settings=self.si_settings) self.redpanda.start() self._client = DefaultClient(self.redpanda) + self._rpk_client = RpkTool(self.redpanda) self._admin_client = Admin(self.redpanda) @property diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 07ce9509ced51..1cd1e045a5728 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -9,13 +9,11 @@ import random import time -from numpy import record import requests from rptest.services.cluster import cluster -from ducktape.utils.util import wait_until -from rptest.clients.kafka_cat import KafkaCat -from ducktape.mark import ok_to_fail +from rptest.util import wait_until +from rptest.services.rw_workload import RWWorkload from rptest.clients.types import TopicSpec from rptest.clients.rpk import RpkTool @@ -23,9 +21,6 @@ from rptest.services.admin import Admin from rptest.tests.partition_movement import PartitionMovementMixin from rptest.services.honey_badger import HoneyBadger -from rptest.services.rpk_producer import RpkProducer -from rptest.services.kaf_producer import KafProducer -from rptest.services.rpk_consumer import RpkConsumer from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings # Errors we should tolerate when moving partitions around @@ -78,19 +73,18 @@ def test_moving_not_fully_initialized_partition(self): hb.set_exception(n, 'raftgen_service::failure_probes', 'vote') topic = "topic-1" partition = 0 - spec = TopicSpec(name=topic, partition_count=1, replication_factor=3) - self.client().create_topic(spec) - admin = Admin(self.redpanda) + self._rpk_client.create_topic(topic, 1, 3) # choose a random topic-partition self.logger.info(f"selected topic-partition: {topic}-{partition}") # get the partition's replica set, including core assignments. the kafka # api doesn't expose core information, so we use the redpanda admin api. - assignments = self._get_assignments(admin, topic, partition) + assignments = self._get_assignments(self._admin_client, topic, + partition) self.logger.info(f"assignments for {topic}-{partition}: {assignments}") - brokers = admin.get_brokers() + brokers = self._admin_client.get_brokers() # replace all node cores in assignment for assignment in assignments: for broker in brokers: @@ -100,10 +94,11 @@ def test_moving_not_fully_initialized_partition(self): self.logger.info( f"new assignments for {topic}-{partition}: {assignments}") - r = admin.set_partition_replicas(topic, partition, assignments) + self._admin_client.set_partition_replicas(topic, partition, + assignments) def status_done(): - info = admin.get_partitions(topic, partition) + info = self._admin_client.get_partitions(topic, partition) self.logger.info( f"current assignments for {topic}-{partition}: {info}") converged = self._equal_assignments(info["replicas"], assignments) @@ -116,7 +111,8 @@ def status_done(): wait_until(status_done, timeout_sec=60, backoff_sec=2) def derived_done(): - info = self._get_current_partitions(admin, topic, partition) + info = self._get_current_partitions(self._admin_client, topic, + partition) self.logger.info( f"derived assignments for {topic}-{partition}: {info}") return self._equal_assignments(info, assignments) @@ -133,14 +129,9 @@ def test_empty(self): topics = [] for partition_count in range(1, 5): for replication_factor in (1, 3): - name = f"topic{len(topics)}" - spec = TopicSpec(name=name, - partition_count=partition_count, - replication_factor=replication_factor) - topics.append(spec) - - for spec in topics: - self.client().create_topic(spec) + name = f"topic_{partition_count}_{replication_factor}" + self._rpk_client.create_topic(name, partition_count, + replication_factor) for _ in range(25): self._move_and_verify() @@ -164,65 +155,37 @@ def test_static(self): self.logger.info(f"Creating topics...") for spec in topics: - self.client().create_topic(spec) + self._rpk_client.create_topic(spec.name, spec.partition_count, + spec.replication_factor) num_records = 1000 - produced = set( - ((f"key-{i:08d}", f"record-{i:08d}") for i in range(num_records))) + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() for spec in topics: - self.logger.info(f"Producing to {spec}") - producer = KafProducer(self.test_context, self.redpanda, spec.name, - num_records) - producer.start() - self.logger.info( - f"Finished producing to {spec}, waiting for producer...") - producer.wait() - producer.free() - self.logger.info(f"Producer stop complete.") + self.logger.info(f"Start producing to {spec}") + rw_workload.remote_start(spec.name, self.redpanda.brokers(), + spec.name, spec.partition_count) + for spec in topics: + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_workload.ensure_progress(spec.name, 100, 10) + self.logger.info(f"initiate movement") for _ in range(25): self._move_and_verify() for spec in topics: - self.logger.info(f"Verifying records in {spec}") - - consumer = RpkConsumer(self.test_context, - self.redpanda, - spec.name, - ignore_errors=False, - retries=0) - consumer.start() - timeout = 30 - t1 = time.time() - consumed = set() - while consumed != produced: - if time.time() > t1 + timeout: - self.logger.error( - f"Validation failed for topic {spec.name}. Produced {len(produced)}, consumed {len(consumed)}" - ) - self.logger.error( - f"Messages consumed but not produced: {sorted(consumed - produced)}" - ) - self.logger.error( - f"Messages produced but not consumed: {sorted(produced - consumed)}" - ) - assert set(consumed) == produced - else: - time.sleep(5) - for m in consumer.messages: - self.logger.info(f"message: {m}") - consumed = set([(m['key'], m['value']) - for m in consumer.messages]) - - self.logger.info(f"Stopping consumer...") - consumer.stop() - self.logger.info(f"Awaiting consumer...") - consumer.wait() - self.logger.info(f"Freeing consumer...") - consumer.free() - - self.logger.info(f"Finished verifying records in {spec}") + self.logger.info( + f"waiting for 100 r&w operations after the movement") + rw_workload.ensure_progress(spec.name, 100, 10) + + for spec in topics: + self.logger.info( + f"waiting for 100 r&w operations after the movement") + rw_workload.has_cleared(spec.name, num_records, 30) + self.logger.info(f"stopping the {spec.name} workload") + rw_workload.remote_stop(spec.name) + rw_workload.remote_wait(spec.name) def _get_scale_params(self): """ @@ -235,27 +198,36 @@ def _get_scale_params(self): return throughput, records, moves - @cluster(num_nodes=5, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS) + @cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS) def test_dynamic(self): """ Move partitions with active consumer / producer """ - throughput, records, moves = self._get_scale_params() + _, records, moves = self._get_scale_params() self.start_redpanda(num_nodes=3, extra_rp_conf={"default_topic_replications": 3}) - spec = TopicSpec(name="topic", partition_count=3, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name + topic = "topic" + self._rpk_client.create_topic(topic, 3, 3) - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + rw_workload.remote_start(topic, self.redpanda.brokers(), topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_workload.ensure_progress(topic, 100, 10) + + self.logger.info(f"initiate movement") for _ in range(moves): self._move_and_verify() - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + + self.logger.info(f"waiting for 100 r&w operations") + rw_workload.ensure_progress(topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_workload.has_cleared(topic, records, 45) + + self.logger.info(f"stopping the workload") + rw_workload.remote_stop(topic) + rw_workload.remote_wait(topic) @cluster(num_nodes=5, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS) def test_move_consumer_offsets_intranode(self): @@ -291,7 +263,7 @@ def test_move_consumer_offsets_intranode(self): consumer_timeout_sec=45, min_records=records) - @cluster(num_nodes=5, + @cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + RESTART_LOG_ALLOW_LIST) def test_bootstrapping_after_move(self): @@ -299,19 +271,24 @@ def test_bootstrapping_after_move(self): Move partitions with active consumer / producer """ self.start_redpanda(num_nodes=3) - spec = TopicSpec(name="topic", partition_count=3, replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1) - self.start_consumer(1) - self.await_startup() + topic = "topic" + self._rpk_client.create_topic(topic, 3, 3) + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + rw_workload.remote_start(topic, self.redpanda.brokers(), topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") # execute single move self._move_and_verify() - self.run_validation(enable_idempotence=False, consumer_timeout_sec=45) + self.logger.info(f"waiting for 100 r&w operations") + rw_workload.ensure_progress(topic, 100, 10) + self.logger.info(f"checkin if cleared 1000 operations") + rw_workload.has_cleared(topic, 1000, 45) + rw_workload.remote_stop(topic) + rw_workload.remote_wait(topic) # snapshot offsets rpk = RpkTool(self.redpanda) - partitions = rpk.describe_topic(spec.name) + partitions = rpk.describe_topic(topic) offset_map = {} for p in partitions: offset_map[p.id] = p.high_watermark @@ -320,10 +297,9 @@ def test_bootstrapping_after_move(self): self.redpanda.restart_nodes(self.redpanda.nodes) def offsets_are_recovered(): - return all([ offset_map[p.id] == p.high_watermark - for p in rpk.describe_topic(spec.name) + for p in rpk.describe_topic(topic) ]) wait_until(offsets_are_recovered, 30, 2) @@ -335,9 +311,8 @@ def test_invalid_destination(self): """ self.start_redpanda(num_nodes=3) - spec = TopicSpec(name="topic", partition_count=1, replication_factor=1) - self.client().create_topic(spec) - topic = spec.name + topic = "topic" + self._rpk_client.create_topic(topic, 1, 1) partition = 0 admin = Admin(self.redpanda) @@ -423,38 +398,27 @@ def test_overlapping_changes(self): # Create topic with enough data that inter-node movement # will take a while. name = f"movetest" - spec = TopicSpec(name=name, partition_count=1, replication_factor=3) - self.client().create_topic(spec) - - # Wait for the partition to have a leader (`rpk produce` errors - # out if it tries to write data before this) - def partition_ready(): - return KafkaCat(self.redpanda).get_partition_leader( - name, 0)[0] is not None - - wait_until(partition_ready, timeout_sec=10, backoff_sec=0.5) - - # Write a substantial amount of data to the topic - msg_size = 512 * 1024 - write_bytes = 512 * 1024 * 1024 - producer = RpkProducer(self._ctx, - self.redpanda, - name, - msg_size=msg_size, - msg_count=int(write_bytes / msg_size)) + partition_count = 1 + self._rpk_client.create_topic(name, partition_count, 3) + + self._admin_client.await_stable_leader(name, + partition=0, + replication=3, + timeout_s=10, + backoff_s=0.5) + + msgs = 1024 + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + self.logger.info(f"Producing to {name}") t1 = time.time() - producer.start() - - # This is an absurdly low expected throughput, but necessarily - # so to run reliably on current test runners, which share an EBS - # backend among many parallel tests. 10MB/s has been empirically - # shown to be too high an expectation. - expect_bps = 1 * 1024 * 1024 - expect_runtime = write_bytes / expect_bps - producer.wait(timeout_sec=expect_runtime) - + rw_workload.remote_start(name, self.redpanda.brokers(), name, + partition_count) + rw_workload.ensure_progress(name, msgs, 45) + rw_workload.remote_stop(name) + rw_workload.remote_wait(name) self.logger.info( - f"Write complete {write_bytes} in {time.time() - t1} seconds") + f"Write complete {msgs} in {time.time() - t1} seconds") # - Admin API redirects writes but not reads. Because we want synchronous # status after submitting operations, send all operations to the controller @@ -489,15 +453,14 @@ def partition_ready(): # An update to partition properties should succeed # (issue https://github.com/redpanda-data/redpanda/issues/2300) - rpk = RpkTool(self.redpanda) assert admin.get_partitions(name, 0)['status'] == "in_progress" - rpk.alter_topic_config(name, "retention.ms", "3600000") + self._rpk_client.alter_topic_config(name, "retention.ms", "3600000") # A deletion should succeed - assert name in rpk.list_topics() + assert name in self._rpk_client.list_topics() assert admin.get_partitions(name, 0)['status'] == "in_progress" - rpk.delete_topic(name) - assert name not in rpk.list_topics() + self._rpk_client.delete_topic(name) + assert name not in self._rpk_client.list_topics() @cluster(num_nodes=4) def test_deletion_stops_move(self): @@ -510,19 +473,20 @@ def test_deletion_stops_move(self): # create a single topic with replication factor of 1 topic = 'test-topic' - rpk = RpkTool(self.redpanda) - rpk.create_topic(topic, 1, 1) + partition_count = 1 + self._rpk_client.create_topic(topic, partition_count, 1) partition = 0 num_records = 1000 self.logger.info(f"Producing to {topic}") - producer = KafProducer(self.test_context, self.redpanda, topic, - num_records) - producer.start() - self.logger.info( - f"Finished producing to {topic}, waiting for producer...") - producer.wait() - producer.free() + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + rw_workload.remote_start(topic, self.redpanda.brokers(), topic, + partition_count) + rw_workload.ensure_progress(topic, num_records, 45) + rw_workload.remote_stop(topic) + rw_workload.remote_wait(topic) + self.logger.info(f"Producer stop complete.") admin = Admin(self.redpanda) @@ -573,11 +537,11 @@ def get_status(): wait_until(lambda: get_status() == 'in_progress', 10, 1) # delete the topic - rpk.delete_topic(topic) + self._rpk_client.delete_topic(topic) # start the node back up self.redpanda.start_node(node) # create topic again - rpk.create_topic(topic, 1, 1) + self._rpk_client.create_topic(topic, 1, 1) wait_until(lambda: get_status() == 'done', 10, 1) @cluster(num_nodes=5) @@ -644,7 +608,7 @@ def _get_scale_params(self): partitions = 1 if self.debug_mode else 10 return throughput, records, moves, partitions - @cluster(num_nodes=5) + @cluster(num_nodes=4) def test_shadow_indexing(self): """ Test interaction between the shadow indexing and the partition movement. @@ -655,21 +619,28 @@ def test_shadow_indexing(self): throughput, records, moves, partitions = self._get_scale_params() self.start_redpanda(num_nodes=3) - spec = TopicSpec(name="topic", - partition_count=partitions, - replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + self.topic = "topic" + self._rpk_client.create_topic(self.topic, partitions, 3) + + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + rw_workload.remote_start(self.topic, self.redpanda.brokers(), + self.topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_workload.ensure_progress(self.topic, 100, 10) + for _ in range(moves): self._move_and_verify() - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) - @cluster(num_nodes=5) + self.logger.info(f"waiting for 100 r&w operations") + rw_workload.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_workload.has_cleared(self.topic, records, 45) + self.logger.info(f"stopping the workload") + rw_workload.remote_stop(self.topic) + rw_workload.remote_wait(self.topic) + + @cluster(num_nodes=4) def test_cross_shard(self): """ Test interaction between the shadow indexing and the partition movement. @@ -678,14 +649,15 @@ def test_cross_shard(self): throughput, records, moves, partitions = self._get_scale_params() self.start_redpanda(num_nodes=3) - spec = TopicSpec(name="topic", - partition_count=partitions, - replication_factor=3) - self.client().create_topic(spec) - self.topic = spec.name - self.start_producer(1, throughput=throughput) - self.start_consumer(1) - self.await_startup() + self.topic = "topic" + self._rpk_client.create_topic(self.topic, partitions, 3) + + rw_workload = RWWorkload(self.test_context, self.redpanda) + rw_workload.start() + rw_workload.remote_start(self.topic, self.redpanda.brokers(), + self.topic, 3) + self.logger.info(f"waiting for 100 r&w operations to warm up") + rw_workload.ensure_progress(self.topic, 100, 10) admin = Admin(self.redpanda) topic = self.topic @@ -699,6 +671,10 @@ def test_cross_shard(self): admin.set_partition_replicas(topic, partition, assignments) self._wait_post_move(topic, partition, assignments, 360) - self.run_validation(enable_idempotence=False, - consumer_timeout_sec=45, - min_records=records) + self.logger.info(f"waiting for 100 r&w operations") + rw_workload.ensure_progress(self.topic, 100, 10) + self.logger.info(f"checkin if cleared {records} operations") + rw_workload.has_cleared(self.topic, records, 45) + self.logger.info(f"stopping the workload") + rw_workload.remote_stop(self.topic) + rw_workload.remote_wait(self.topic)