Skip to content

Commit

Permalink
Merge pull request #5201 from ajfabbri/write-rej-followup
Browse files Browse the repository at this point in the history
Factor out rate testing util for quota testing, etc.
  • Loading branch information
dotnwat committed Jun 28, 2022
2 parents b683792 + 024eab8 commit 9da14b5
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 91 deletions.
97 changes: 6 additions & 91 deletions tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import random
from pickletools import long1
from time import sleep, time, time_ns
from typing import NamedTuple

from ducktape.cluster.cluster import ClusterNode
from ducktape.tests.test import TestContext
Expand All @@ -26,6 +25,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.full_disk import FullDiskHelper
from rptest.utils.partition_metrics import PartitionMetrics
from rptest.utils.expect_rate import ExpectRate, RateTarget

# reduce this?
MAX_MSG: int = 600
Expand All @@ -37,14 +37,6 @@
import sys


def time_ms() -> int:
return time_ns() // 10**6


def sleep_ms(msec: int) -> None:
sleep(msec / 1000)


# XXX This test really needs a raw protocol client (i.e. not librdkafka # based)
# to test cleanly. That, or figure out how to make kafka-python act like one.
class WriteRejectTest(RedpandaTest):
Expand Down Expand Up @@ -139,19 +131,6 @@ def test_write_rejection(self):
self.full_disk.clear_low_space()


class RateTarget(NamedTuple):
max_total_sec: int
target_sec: int
# rates in bytes/sec
target_min_rate: int
target_max_rate: int


class BytesMeasurement(NamedTuple):
bytes: int
timestamp: int


class FullDiskTest(EndToEndTest):
def __init__(self, test_ctx):
extra_rp_conf = {
Expand All @@ -164,72 +143,8 @@ def __init__(self, test_ctx):
assert self.redpanda
self.pm = PartitionMetrics(self.redpanda)
self.full_disk = FullDiskHelper(self.logger, self.redpanda)

#
# Measure cluster-observed produce rate, until either
# - Fail: We exceed max total duration: Throw an exception.
# - Success: We achieve target produce rate for at least the min
# measured duration.
# --- time --->
# .''''''''''''''''''''''''''''''''''''.
# | total duration |
# :'''''''''''''''''''''''''''''''''''':
# | | measured duration |
# :....................................:
# ' ' target_sec '
# Accumulate samples every `t_delta` msec into a vector.
# As total time accumulated exceeds `target sec`, check to see
# if the average rate over the last `target_sec` is within
# target range.

# Samples Vector
# --- time --->
# :'''''''''''''''''''''''''''''''''''':
# |t0 | | | | | | | | | | | |
# :....................................:
# ' ^ ^ ^ '
# | |<- target sec ->|
# | | |
# 0 interval_start newest sample

def _expect_produce_rate(self, target: RateTarget) -> None:

sample_interval_ms = 1000
t_0 = time_ms()
total_msec = 0
i = 0
interval_start = -1
samples: list[BytesMeasurement] = []
while total_msec < target.max_total_sec * 1000:
m = BytesMeasurement(bytes=self.pm.bytes_produced(),
timestamp=time_ms())
samples.append(m)
elapsed_ms = m.timestamp - t_0

# find latest index that is at least target_sec old
idx = 0
while m.timestamp - samples[
idx].timestamp >= target.target_sec * 1000:
interval_start = idx
idx = idx + 1

if interval_start >= 0:
elapsed_bytes = m.bytes - samples[interval_start].bytes
elapsed_msec = m.timestamp - samples[interval_start].timestamp
rate = elapsed_bytes / (elapsed_msec / 1000.0)
if target.target_min_rate <= rate and rate <= target.target_max_rate:
self.logger.info(
f"Rate target met: {rate:.1f} bytes/sec " +
f"over the last {elapsed_msec/1000:.3f} sec.")
return
else:
self.logger.debug(
f"rate: {rate:.1f} for {elapsed_msec/1000:.3f}")

sleep_ms(sample_interval_ms)
total_msec = time_ms() - t_0

raise RuntimeError("Unable to attain target rate in time.")
self.bytes_prod = ExpectRate(lambda: self.pm.bytes_produced(),
self.logger)

@cluster(num_nodes=5, log_allow_list=FDT_LOG_ALLOW_LIST)
def test_full_disk_no_produce(self):
Expand All @@ -248,10 +163,10 @@ def test_full_disk_no_produce(self):

# assert can produce to topic, by total partition bytes metric
producing = RateTarget(max_total_sec=40,
target_sec=10,
target_sec=5,
target_min_rate=100,
target_max_rate=2**40)
self._expect_produce_rate(producing)
self.bytes_prod.expect_rate(producing)

self.full_disk.trigger_low_space()

Expand All @@ -260,6 +175,6 @@ def test_full_disk_no_produce(self):
target_sec=5,
target_min_rate=0,
target_max_rate=1)
self._expect_produce_rate(not_producing)
self.bytes_prod.expect_rate(not_producing)

self.full_disk.clear_low_space()
111 changes: 111 additions & 0 deletions tests/rptest/utils/expect_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from logging import Logger
from time import sleep, time_ns
from typing import Callable, NamedTuple


class RateTarget(NamedTuple):
max_total_sec: int
target_sec: int
# rates in counter/sec
target_min_rate: int
target_max_rate: int


class CounterMeasurement(NamedTuple):
count: int
timestamp: int


def time_ms() -> int:
return time_ns() // 10**6


def sleep_ms(msec: int) -> None:
sleep(msec / 1000)


class ExpectRate:
"""Measure observed counter (e.g. bytes) rate, until:
- Fail: We exceed max total duration: Throw an exception.
- Success: We achieve target rate for at least the min measured duration.
--- time --->
.''''''''''''''''''''''''''''''''''''.
| total duration |
:'''''''''''''''''''''''''''''''''''':
| | measured duration |
:....................................:
' ' target_sec '
Accumulate samples every `sample_interval_ms` msec into a vector. As total
time accumulated exceeds `target sec`, check to see if the average rate over
the last `target_sec` is within target range.
Samples Vector
--- time --->
:'''''''''''''''''''''''''''''''''''':
|t0 | | | | | | | | | | | |
:....................................:
' ^ ^ ^ '
| |<- target sec ->|
| | |
0 interval_start newest sample"""
def __init__(self,
sample: Callable[[], int],
logger: Logger,
units: str = "bytes"):
self.logger = logger
self.sample = sample
self.units = units

def expect_rate(self, target: RateTarget) -> None:

sample_interval_ms = 1000
ts = lambda x: x.timestamp
total_msec = 0
interval_start = -1
samples: list[CounterMeasurement] = []
t_0 = time_ms()
delta_t = 0
while total_msec < target.max_total_sec * 1000:
m = CounterMeasurement(count=self.sample(), timestamp=time_ms())
samples.append(m)
elapsed_msec = ts(m) - t_0

# find latest index that is at least target_sec old
for i, sample in enumerate(samples):
delta_t = ts(m) - ts(sample)
if delta_t < target.target_sec * 1000:
break
interval_start = i

if interval_start >= 0:
elapsed_count = m.count - samples[interval_start].count
elapsed_msec = m.timestamp - samples[interval_start].timestamp
rate = elapsed_count / (elapsed_msec / 1000.0)
if target.target_min_rate <= rate and rate <= target.target_max_rate:
self.logger.info(
f"Rate target met: {rate:.1f} {self.units}/sec " +
f"over the last {elapsed_msec/1000:.3f} sec.")
return
else:
self.logger.debug(
f"rate: {rate:.1f} for {elapsed_msec/1000:.3f}")
else:
self.logger.debug(
f"(waiting, {delta_t/1000:.1f}/{target.target_sec} sec) " +
f"{self.units}: {m.count}")

sleep_ms(sample_interval_ms)
total_msec = time_ms() - t_0

raise RuntimeError("Unable to attain target rate in time.")

0 comments on commit 9da14b5

Please sign in to comment.