From 024eab8db137026b0edeced0c0eab9490146f228 Mon Sep 17 00:00:00 2001 From: Aaron Fabbri Date: Sun, 5 Jun 2022 21:35:01 -0700 Subject: [PATCH] rptest/utils: factor out rate testing util Used here for asserting that producer rate goes to zero when we block writes due to low space. Turns out this is also a handy utility for testing other features like quotas. Generalize and factor out a new test helper class ExpectRate. See the docstring for implementation details. --- tests/rptest/tests/full_disk_test.py | 97 ++--------------------- tests/rptest/utils/expect_rate.py | 111 +++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 91 deletions(-) create mode 100644 tests/rptest/utils/expect_rate.py diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index bbf42994e5f8..e2837fccdca9 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -9,7 +9,6 @@ import random from pickletools import long1 from time import sleep, time, time_ns -from typing import NamedTuple from ducktape.cluster.cluster import ClusterNode from ducktape.tests.test import TestContext @@ -26,6 +25,7 @@ from rptest.tests.redpanda_test import RedpandaTest from rptest.utils.full_disk import FullDiskHelper from rptest.utils.partition_metrics import PartitionMetrics +from rptest.utils.expect_rate import ExpectRate, RateTarget # reduce this? MAX_MSG: int = 600 @@ -37,14 +37,6 @@ import sys -def time_ms() -> int: - return time_ns() // 10**6 - - -def sleep_ms(msec: int) -> None: - sleep(msec / 1000) - - # XXX This test really needs a raw protocol client (i.e. not librdkafka # based) # to test cleanly. That, or figure out how to make kafka-python act like one. class WriteRejectTest(RedpandaTest): @@ -139,19 +131,6 @@ def test_write_rejection(self): self.full_disk.clear_low_space() -class RateTarget(NamedTuple): - max_total_sec: int - target_sec: int - # rates in bytes/sec - target_min_rate: int - target_max_rate: int - - -class BytesMeasurement(NamedTuple): - bytes: int - timestamp: int - - class FullDiskTest(EndToEndTest): def __init__(self, test_ctx): extra_rp_conf = { @@ -164,72 +143,8 @@ def __init__(self, test_ctx): assert self.redpanda self.pm = PartitionMetrics(self.redpanda) self.full_disk = FullDiskHelper(self.logger, self.redpanda) - - # - # Measure cluster-observed produce rate, until either - # - Fail: We exceed max total duration: Throw an exception. - # - Success: We achieve target produce rate for at least the min - # measured duration. - # --- time ---> - # .''''''''''''''''''''''''''''''''''''. - # | total duration | - # :'''''''''''''''''''''''''''''''''''': - # | | measured duration | - # :....................................: - # ' ' target_sec ' - # Accumulate samples every `t_delta` msec into a vector. - # As total time accumulated exceeds `target sec`, check to see - # if the average rate over the last `target_sec` is within - # target range. - - # Samples Vector - # --- time ---> - # :'''''''''''''''''''''''''''''''''''': - # |t0 | | | | | | | | | | | | - # :....................................: - # ' ^ ^ ^ ' - # | |<- target sec ->| - # | | | - # 0 interval_start newest sample - - def _expect_produce_rate(self, target: RateTarget) -> None: - - sample_interval_ms = 1000 - t_0 = time_ms() - total_msec = 0 - i = 0 - interval_start = -1 - samples: list[BytesMeasurement] = [] - while total_msec < target.max_total_sec * 1000: - m = BytesMeasurement(bytes=self.pm.bytes_produced(), - timestamp=time_ms()) - samples.append(m) - elapsed_ms = m.timestamp - t_0 - - # find latest index that is at least target_sec old - idx = 0 - while m.timestamp - samples[ - idx].timestamp >= target.target_sec * 1000: - interval_start = idx - idx = idx + 1 - - if interval_start >= 0: - elapsed_bytes = m.bytes - samples[interval_start].bytes - elapsed_msec = m.timestamp - samples[interval_start].timestamp - rate = elapsed_bytes / (elapsed_msec / 1000.0) - if target.target_min_rate <= rate and rate <= target.target_max_rate: - self.logger.info( - f"Rate target met: {rate:.1f} bytes/sec " + - f"over the last {elapsed_msec/1000:.3f} sec.") - return - else: - self.logger.debug( - f"rate: {rate:.1f} for {elapsed_msec/1000:.3f}") - - sleep_ms(sample_interval_ms) - total_msec = time_ms() - t_0 - - raise RuntimeError("Unable to attain target rate in time.") + self.bytes_prod = ExpectRate(lambda: self.pm.bytes_produced(), + self.logger) @cluster(num_nodes=5, log_allow_list=FDT_LOG_ALLOW_LIST) def test_full_disk_no_produce(self): @@ -248,10 +163,10 @@ def test_full_disk_no_produce(self): # assert can produce to topic, by total partition bytes metric producing = RateTarget(max_total_sec=40, - target_sec=10, + target_sec=5, target_min_rate=100, target_max_rate=2**40) - self._expect_produce_rate(producing) + self.bytes_prod.expect_rate(producing) self.full_disk.trigger_low_space() @@ -260,6 +175,6 @@ def test_full_disk_no_produce(self): target_sec=5, target_min_rate=0, target_max_rate=1) - self._expect_produce_rate(not_producing) + self.bytes_prod.expect_rate(not_producing) self.full_disk.clear_low_space() diff --git a/tests/rptest/utils/expect_rate.py b/tests/rptest/utils/expect_rate.py new file mode 100644 index 000000000000..37e13cb4a5e2 --- /dev/null +++ b/tests/rptest/utils/expect_rate.py @@ -0,0 +1,111 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from logging import Logger +from time import sleep, time_ns +from typing import Callable, NamedTuple + + +class RateTarget(NamedTuple): + max_total_sec: int + target_sec: int + # rates in counter/sec + target_min_rate: int + target_max_rate: int + + +class CounterMeasurement(NamedTuple): + count: int + timestamp: int + + +def time_ms() -> int: + return time_ns() // 10**6 + + +def sleep_ms(msec: int) -> None: + sleep(msec / 1000) + + +class ExpectRate: + """Measure observed counter (e.g. bytes) rate, until: + - Fail: We exceed max total duration: Throw an exception. + - Success: We achieve target rate for at least the min measured duration. + + --- time ---> + .''''''''''''''''''''''''''''''''''''. + | total duration | + :'''''''''''''''''''''''''''''''''''': + | | measured duration | + :....................................: + ' ' target_sec ' + + Accumulate samples every `sample_interval_ms` msec into a vector. As total + time accumulated exceeds `target sec`, check to see if the average rate over + the last `target_sec` is within target range. + + Samples Vector + --- time ---> + :'''''''''''''''''''''''''''''''''''': + |t0 | | | | | | | | | | | | + :....................................: + ' ^ ^ ^ ' + | |<- target sec ->| + | | | + 0 interval_start newest sample""" + def __init__(self, + sample: Callable[[], int], + logger: Logger, + units: str = "bytes"): + self.logger = logger + self.sample = sample + self.units = units + + def expect_rate(self, target: RateTarget) -> None: + + sample_interval_ms = 1000 + ts = lambda x: x.timestamp + total_msec = 0 + interval_start = -1 + samples: list[CounterMeasurement] = [] + t_0 = time_ms() + delta_t = 0 + while total_msec < target.max_total_sec * 1000: + m = CounterMeasurement(count=self.sample(), timestamp=time_ms()) + samples.append(m) + elapsed_msec = ts(m) - t_0 + + # find latest index that is at least target_sec old + for i, sample in enumerate(samples): + delta_t = ts(m) - ts(sample) + if delta_t < target.target_sec * 1000: + break + interval_start = i + + if interval_start >= 0: + elapsed_count = m.count - samples[interval_start].count + elapsed_msec = m.timestamp - samples[interval_start].timestamp + rate = elapsed_count / (elapsed_msec / 1000.0) + if target.target_min_rate <= rate and rate <= target.target_max_rate: + self.logger.info( + f"Rate target met: {rate:.1f} {self.units}/sec " + + f"over the last {elapsed_msec/1000:.3f} sec.") + return + else: + self.logger.debug( + f"rate: {rate:.1f} for {elapsed_msec/1000:.3f}") + else: + self.logger.debug( + f"(waiting, {delta_t/1000:.1f}/{target.target_sec} sec) " + + f"{self.units}: {m.count}") + + sleep_ms(sample_interval_ms) + total_msec = time_ms() - t_0 + + raise RuntimeError("Unable to attain target rate in time.") \ No newline at end of file