Skip to content

Commit

Permalink
rptest/utils: factor out rate testing util
Browse files Browse the repository at this point in the history
Used here for asserting that producer rate goes to zero when we block
writes due to low space. Turns out this is also a handy utility for
testing other features like quotas.

Generalize and factor out a new test helper class ExpectRate. See the
docstring for implementation details.
  • Loading branch information
Aaron Fabbri committed Jun 23, 2022
1 parent f5aec8b commit 024eab8
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 91 deletions.
97 changes: 6 additions & 91 deletions tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import random
from pickletools import long1
from time import sleep, time, time_ns
from typing import NamedTuple

from ducktape.cluster.cluster import ClusterNode
from ducktape.tests.test import TestContext
Expand All @@ -26,6 +25,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.full_disk import FullDiskHelper
from rptest.utils.partition_metrics import PartitionMetrics
from rptest.utils.expect_rate import ExpectRate, RateTarget

# reduce this?
MAX_MSG: int = 600
Expand All @@ -37,14 +37,6 @@
import sys


def time_ms() -> int:
return time_ns() // 10**6


def sleep_ms(msec: int) -> None:
sleep(msec / 1000)


# XXX This test really needs a raw protocol client (i.e. not librdkafka # based)
# to test cleanly. That, or figure out how to make kafka-python act like one.
class WriteRejectTest(RedpandaTest):
Expand Down Expand Up @@ -139,19 +131,6 @@ def test_write_rejection(self):
self.full_disk.clear_low_space()


class RateTarget(NamedTuple):
max_total_sec: int
target_sec: int
# rates in bytes/sec
target_min_rate: int
target_max_rate: int


class BytesMeasurement(NamedTuple):
bytes: int
timestamp: int


class FullDiskTest(EndToEndTest):
def __init__(self, test_ctx):
extra_rp_conf = {
Expand All @@ -164,72 +143,8 @@ def __init__(self, test_ctx):
assert self.redpanda
self.pm = PartitionMetrics(self.redpanda)
self.full_disk = FullDiskHelper(self.logger, self.redpanda)

#
# Measure cluster-observed produce rate, until either
# - Fail: We exceed max total duration: Throw an exception.
# - Success: We achieve target produce rate for at least the min
# measured duration.
# --- time --->
# .''''''''''''''''''''''''''''''''''''.
# | total duration |
# :'''''''''''''''''''''''''''''''''''':
# | | measured duration |
# :....................................:
# ' ' target_sec '
# Accumulate samples every `t_delta` msec into a vector.
# As total time accumulated exceeds `target sec`, check to see
# if the average rate over the last `target_sec` is within
# target range.

# Samples Vector
# --- time --->
# :'''''''''''''''''''''''''''''''''''':
# |t0 | | | | | | | | | | | |
# :....................................:
# ' ^ ^ ^ '
# | |<- target sec ->|
# | | |
# 0 interval_start newest sample

def _expect_produce_rate(self, target: RateTarget) -> None:

sample_interval_ms = 1000
t_0 = time_ms()
total_msec = 0
i = 0
interval_start = -1
samples: list[BytesMeasurement] = []
while total_msec < target.max_total_sec * 1000:
m = BytesMeasurement(bytes=self.pm.bytes_produced(),
timestamp=time_ms())
samples.append(m)
elapsed_ms = m.timestamp - t_0

# find latest index that is at least target_sec old
idx = 0
while m.timestamp - samples[
idx].timestamp >= target.target_sec * 1000:
interval_start = idx
idx = idx + 1

if interval_start >= 0:
elapsed_bytes = m.bytes - samples[interval_start].bytes
elapsed_msec = m.timestamp - samples[interval_start].timestamp
rate = elapsed_bytes / (elapsed_msec / 1000.0)
if target.target_min_rate <= rate and rate <= target.target_max_rate:
self.logger.info(
f"Rate target met: {rate:.1f} bytes/sec " +
f"over the last {elapsed_msec/1000:.3f} sec.")
return
else:
self.logger.debug(
f"rate: {rate:.1f} for {elapsed_msec/1000:.3f}")

sleep_ms(sample_interval_ms)
total_msec = time_ms() - t_0

raise RuntimeError("Unable to attain target rate in time.")
self.bytes_prod = ExpectRate(lambda: self.pm.bytes_produced(),
self.logger)

@cluster(num_nodes=5, log_allow_list=FDT_LOG_ALLOW_LIST)
def test_full_disk_no_produce(self):
Expand All @@ -248,10 +163,10 @@ def test_full_disk_no_produce(self):

# assert can produce to topic, by total partition bytes metric
producing = RateTarget(max_total_sec=40,
target_sec=10,
target_sec=5,
target_min_rate=100,
target_max_rate=2**40)
self._expect_produce_rate(producing)
self.bytes_prod.expect_rate(producing)

self.full_disk.trigger_low_space()

Expand All @@ -260,6 +175,6 @@ def test_full_disk_no_produce(self):
target_sec=5,
target_min_rate=0,
target_max_rate=1)
self._expect_produce_rate(not_producing)
self.bytes_prod.expect_rate(not_producing)

self.full_disk.clear_low_space()
111 changes: 111 additions & 0 deletions tests/rptest/utils/expect_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from logging import Logger
from time import sleep, time_ns
from typing import Callable, NamedTuple


class RateTarget(NamedTuple):
max_total_sec: int
target_sec: int
# rates in counter/sec
target_min_rate: int
target_max_rate: int


class CounterMeasurement(NamedTuple):
count: int
timestamp: int


def time_ms() -> int:
return time_ns() // 10**6


def sleep_ms(msec: int) -> None:
sleep(msec / 1000)


class ExpectRate:
"""Measure observed counter (e.g. bytes) rate, until:
- Fail: We exceed max total duration: Throw an exception.
- Success: We achieve target rate for at least the min measured duration.
--- time --->
.''''''''''''''''''''''''''''''''''''.
| total duration |
:'''''''''''''''''''''''''''''''''''':