diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml new file mode 100644 index 00000000..a14076ef --- /dev/null +++ b/.github/workflows/slo.yml @@ -0,0 +1,55 @@ +name: SLO + +on: + pull_request: + branches: [main] + workflow_dispatch: + +jobs: + test-slo: + concurrency: + group: slo-${{ github.ref }} + if: (!contains(github.event.pull_request.labels.*.name, 'no slo')) + + runs-on: ubuntu-latest + name: SLO test + permissions: + checks: write + pull-requests: write + contents: read + issues: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Run SLO + uses: ydb-platform/slo-tests@js-version + with: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }} + AWS_CREDENTIALS_B64: ${{ secrets.SLO_AWS_CREDENTIALS }} + AWS_CONFIG_B64: ${{ secrets.SLO_AWS_CONFIG }} + DOCKER_USERNAME: ${{ secrets.SLO_DOCKER_USERNAME }} + DOCKER_PASSWORD: ${{ secrets.SLO_DOCKER_PASSWORD }} + DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }} + DOCKER_FOLDER: ${{ secrets.SLO_DOCKER_FOLDER }} + s3_endpoint: ${{ secrets.SLO_S3_ENDPOINT }} + s3_images_folder: ${{ vars.SLO_S3_IMAGES_FOLDER }} + grafana_domain: ${{ vars.SLO_GRAFANA_DOMAIN }} + grafana_dashboard: ${{ vars.SLO_GRAFANA_DASHBOARD }} + ydb_version: 'newest' + timeBetweenPhases: 30 + shutdownTime: 30 + + language_id0: sync + language0: python-sync + workload_path0: tests/slo + workload_build_context0: ../.. + workload_build_options0: -f Dockerfile + + - uses: actions/upload-artifact@v3 + if: always() + with: + name: slo-logs + path: logs/ diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile new file mode 100644 index 00000000..bcb01d72 --- /dev/null +++ b/tests/slo/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.8 +COPY . /src +WORKDIR /src +RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt +WORKDIR tests/slo + +ENTRYPOINT ["python", "src"] diff --git a/tests/slo/README.md b/tests/slo/README.md new file mode 100644 index 00000000..486cbe99 --- /dev/null +++ b/tests/slo/README.md @@ -0,0 +1,133 @@ +# SLO workload + +SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network +(that is possible situations for distributed DBs with hundreds of nodes) + +### Implementations: + +There are two implementations: + +- `sync` +- `async` (now unimplemented) + +### Usage: + +It has 3 commands: + +- `create` - creates table in database +- `cleanup` - drops table in database +- `run` - runs workload (read and write to table with sets RPS) + +### Run examples with all arguments: + +create: +`python tests/slo/src/ create localhost:2136 /local -t tableName +--min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000 +--write-timeout 10000` + +cleanup: +`python tests/slo/src/ cleanup localhost:2136 /local -t tableName` + +run: +`python tests/slo/src/ run localhost:2136 /local -t tableName +--prom-pgw http://prometheus-pushgateway:9091 -report-period 250 +--read-rps 1000 --read-timeout 10000 +--write-rps 100 --write-timeout 10000 +--time 600 --shutdown-time 30` + +## Arguments for commands: + +### create +`python tests/slo/src/ create [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + -t --table-name table name to create + + -p-min --min-partitions-count minimum amount of partitions in table + -p-max --max-partitions-count maximum amount of partitions in table + -p-size --partition-size partition size in mb + + -c --initial-data-count amount of initially created rows + + --write-timeout write timeout milliseconds + + --batch-size amount of new records in each create request + --threads number of threads to use + +``` + +### cleanup +`python tests/slo/src/ cleanup [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + -t --table-name table name to create +``` + +### run +`python tests/slo/src/ run [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + -t --table-name table name to create + + --prom-pgw prometheus push gateway + --report-period prometheus push period in milliseconds + + --read-rps read RPS + --read-timeout read timeout milliseconds + + --write-rps write RPS + --write-timeout write timeout milliseconds + + --time run time in seconds + --shutdown-time graceful shutdown time in seconds + + --read-threads number of threads to use for write requests + --write-threads number of threads to use for read requests +``` + +## Authentication + +Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication. + +## What's inside +When running `run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. + +- `readJob` reads rows from the table one by one with random identifiers generated by writeJob +- `writeJob` generates and inserts rows +- `metricsJob` periodically sends metrics to Prometheus + +Table have these fields: +- `object_id Uint64` +- `object_hash Uint64 Digest::NumericHash(id)` +- `payload_str UTF8` +- `payload_double Double` +- `payload_timestamp Timestamp` + +Primary key: `("object_hash", "object_id")` + +## Collected metrics +- `oks` - amount of OK requests +- `not_oks` - amount of not OK requests +- `inflight` - amount of requests in flight +- `latency` - summary of latencies in ms +- `attempts` - summary of amount for request + +> You must reset metrics to keep them `0` in prometheus and grafana before beginning and after ending of jobs + +## Look at metrics in grafana +You can get dashboard used in that test [here](https://github.com/ydb-platform/slo-tests/blob/main/k8s/helms/grafana.yaml#L69) - you will need to import json into grafana. diff --git a/tests/slo/generator.py b/tests/slo/generator.py deleted file mode 100644 index e14701c2..00000000 --- a/tests/slo/generator.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding: utf-8 -*- -import dataclasses -import logging -import random -import string -from datetime import datetime - -logger = logging.getLogger(__name__) - - -MaxUi32 = 2**32 - 1 - - -def hash_ui32(value): - return abs(hash(str(value))) % MaxUi32 - - -def generate_random_string(min_len, max_len): - strlen = random.randint(min_len, max_len) - return "".join(random.choices(string.ascii_lowercase, k=strlen)) - - -@dataclasses.dataclass(slots=True) -class Row: - object_id_key: int - object_id: int - payload_str: str - payload_double: float - payload_timestamp: datetime - - # First id in current shard - def get_shard_id(self, partitions_count): - shard_size = int((MaxUi32 + 1) / partitions_count) - return self.object_id_key / shard_size - - -@dataclasses.dataclass -class RowGenerator: - id_counter: int = 0 - - def get(self): - self.id_counter += 1 - if self.id_counter >= MaxUi32: - self.id_counter = 0 - logger.warning("RowGenerator: maxint reached") - - return Row( - object_id_key=hash_ui32(self.id_counter), - object_id=self.id_counter, - payload_str=generate_random_string(20, 40), - payload_double=random.random(), - payload_timestamp=datetime.now(), - ) - - -class PackGenerator: - def __init__(self, args, start_id=0): - self._row_generator = RowGenerator(start_id) - - self._remain = args.initial_data_count - self._pack_size = args.pack_size - self._partitions_count = args.partitions_count - - self._packs = {} - - def get_next_pack(self): - while self._remain: - new_record = self._row_generator.get() - shard_id = new_record.get_shard_id(self._partitions_count) - - self._remain -= 1 - - if shard_id in self._packs: - existing_pack = self._packs[shard_id] - existing_pack.append(new_record) - if len(existing_pack) >= self._pack_size: - return self._packs.pop(shard_id) - else: - self._packs[shard_id] = [new_record] - - for shard_id, pack in self._packs.items(): - if pack: - return self._packs.pop(shard_id) diff --git a/tests/slo/options.py b/tests/slo/options.py deleted file mode 100644 index 4117f1d4..00000000 --- a/tests/slo/options.py +++ /dev/null @@ -1,63 +0,0 @@ -import argparse - - -def add_common_options(parser): - parser.add_argument("endpoint", help="YDB endpoint") - parser.add_argument("db", help="YDB database name") - parser.add_argument("-t", "--table_name", default="key_value", help="Table name") - parser.add_argument("--write_timeout", default=20000, type=int, help="Read requests execution timeout [ms]") - - -def make_create_parser(subparsers): - create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content") - add_common_options(create_parser) - create_parser.add_argument("-p", "--partitions_count", default=64, type=int, help="Number of partition in table") - create_parser.add_argument( - "-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate" - ) - create_parser.add_argument( - "--pack_size", default="100", type=int, help="Number of new records in each create request" - ) - - -def make_run_parser(subparsers, name="run"): - run_parser = subparsers.add_parser(name, help="Run measurable workload") - add_common_options(run_parser) - run_parser.add_argument("--write_rps", default=10, type=int, help="Write request rps") - run_parser.add_argument("--read_rps", default=100, type=int, help="Read request rps") - run_parser.add_argument("--no_write", default=False, action="store_true") - run_parser.add_argument("--no_read", default=False, action="store_true") - run_parser.add_argument("--time", default=10, type=int, help="Time to run (Seconds)") - run_parser.add_argument("--read_timeout", default=70, type=int, help="Read requests execution timeout [ms]") - run_parser.add_argument("--save_result", default=False, action="store_true", help="Save result to file") - run_parser.add_argument("--result_file_name", default="slo_result.json", help="Result json file name") - run_parser.add_argument("--no_prepare", default=False, action="store_true", help="Do not prepare requests") - - -def make_cleanup_parser(subparsers): - cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables") - add_common_options(cleanup_parser) - - -def get_root_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description="YDB Python SLO application", - ) - - subparsers = parser.add_subparsers( - title="subcommands", - dest="subcommand", - help="List of subcommands", - ) - - make_create_parser(subparsers) - make_run_parser(subparsers) - make_cleanup_parser(subparsers) - - return parser - - -def parse_options(): - parser = get_root_parser() - return parser.parse_args() diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt new file mode 100644 index 00000000..877870cc --- /dev/null +++ b/tests/slo/requirements.txt @@ -0,0 +1,4 @@ +requests==2.28.2 +ratelimiter==1.2.0.post0 +prometheus-client==0.17.0 +quantile-estimator==0.1.2 diff --git a/tests/slo/runner.py b/tests/slo/runner.py deleted file mode 100644 index 97f2aa50..00000000 --- a/tests/slo/runner.py +++ /dev/null @@ -1,72 +0,0 @@ -import ydb - -from os import path -from generator import PackGenerator - -from prometheus_client import Summary, Counter - - -def run_create(args, driver): - session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) - tb_name = path.join(args.db, args.table_name) - session.create_table( - tb_name, - ydb.TableDescription() - .with_column(ydb.Column("object_id_key", ydb.OptionalType(ydb.PrimitiveType.Uint32))) - .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint32))) - .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) - .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) - .with_primary_keys("object_id_key", "object_id") - .with_profile( - ydb.TableProfile().with_partitioning_policy( - ydb.PartitioningPolicy().with_uniform_partitions(args.partitions_count) - ) - ), - ) - - prepare_q = """ - DECLARE $items AS List>; - UPSERT INTO `{}` SELECT * FROM AS_TABLE($items); - """ - prepared = session.prepare(prepare_q.format(tb_name)) - - generator = PackGenerator(args) - while data := generator.get_next_pack(): - tx = session.transaction() - tx.execute(prepared, {"$items": data}) - tx.commit() - - -def run_cleanup(args, driver): - session = driver.table_client.session().create() - session.drop_table(path.join(args.db, args.table_name)) - - -def run_from_args(args): - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - credentials=ydb.credentials_from_env_variables(), - grpc_keep_alive_timeout=5000, - ) - - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=5) - - try: - if args.subcommand == "create": - run_create(args, driver) - elif args.subcommand == "run": - pass - elif args.subcommand == "cleanup": - run_cleanup(args, driver) - else: - raise RuntimeError(f"Unknown command {args.subcommand}") - finally: - driver.stop() diff --git a/tests/slo/__init__.py b/tests/slo/src/__init__.py similarity index 100% rename from tests/slo/__init__.py rename to tests/slo/src/__init__.py diff --git a/tests/slo/__main__.py b/tests/slo/src/__main__.py similarity index 100% rename from tests/slo/__main__.py rename to tests/slo/src/__main__.py diff --git a/tests/slo/src/generator.py b/tests/slo/src/generator.py new file mode 100644 index 00000000..87be82d5 --- /dev/null +++ b/tests/slo/src/generator.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +import dataclasses +import logging +import random +import string +from datetime import datetime +from threading import Lock + +logger = logging.getLogger(__name__) + + +MAX_UINT64 = 2**64 - 1 + + +def generate_random_string(min_len, max_len): + strlen = random.randint(min_len, max_len) + return "".join(random.choices(string.ascii_lowercase, k=strlen)) + + +@dataclasses.dataclass +class Row: + object_id: int + payload_str: str + payload_double: float + payload_timestamp: datetime + + +@dataclasses.dataclass +class RowGenerator: + id_counter: int = 0 + lock = Lock() + + def get(self): + with self.lock: + self.id_counter += 1 + if self.id_counter >= MAX_UINT64: + self.id_counter = 0 + logger.warning("RowGenerator: maxint reached") + + return Row( + object_id=self.id_counter, + payload_str=generate_random_string(20, 40), + payload_double=random.random(), + payload_timestamp=datetime.now(), + ) + + +def batch_generator(args, start_id=0): + row_generator = RowGenerator(start_id) + remain = args.initial_data_count + + while True: + size = min(remain, args.batch_size) + if size < 1: + return + yield [row_generator.get() for _ in range(size)] + remain -= size diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py new file mode 100644 index 00000000..db7a1c3f --- /dev/null +++ b/tests/slo/src/jobs.py @@ -0,0 +1,137 @@ +import ydb +import time +import logging +import dataclasses +from random import randint +from ratelimiter import RateLimiter + +import threading + +from metrics import JOB_WRITE_LABEL, JOB_READ_LABEL + +from generator import RowGenerator + + +READ_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); +""" + +WRITE_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +DECLARE $payload_str AS Utf8; +DECLARE $payload_double AS Double; +DECLARE $payload_timestamp AS Timestamp; + +UPSERT INTO `{}` ( + object_id, object_hash, payload_str, payload_double, payload_timestamp +) VALUES ( + $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp +); +""" + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class RequestContext: + query: str + params: dict + session: ydb.Session + timeout: int + + +def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout): + start_time = time.time() + retry_setting = ydb.RetrySettings(max_session_acquire_timeout=timeout) + + with ydb.SessionPool(driver) as pool: + while time.time() - start_time < runtime: + object_id = randint(1, max_id) + with limiter, metrics.measure(JOB_READ_LABEL): + + def transaction(session): + session.transaction().execute( + query, + {"$object_id": object_id}, + commit_tx=True, + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + pool.retry_operation_sync(transaction, retry_settings=retry_setting) + + +def run_read_jobs(args, driver, tb_name, max_id, metrics): + session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) + read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name)) + + read_limiter = RateLimiter(max_calls=args.read_rps, period=1) + futures = [] + for _ in range(args.read_threads): + future = threading.Thread( + name="slo_run_read", + target=run_reads, + args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + +def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): + start_time = time.time() + with ydb.SessionPool(driver) as pool: + while time.time() - start_time < runtime: + row = row_generator.get() + with limiter, metrics.measure(JOB_WRITE_LABEL): + + def transaction(session): + session.transaction().execute( + query, + { + "$object_id": row.object_id, + "$payload_str": row.payload_str, + "$payload_double": row.payload_double, + "$payload_timestamp": row.payload_timestamp, + }, + commit_tx=True, + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + pool.retry_operation_sync(transaction) + + +def run_write_jobs(args, driver, tb_name, max_id, metrics): + session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) + write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name)) + + write_limiter = RateLimiter(max_calls=args.write_rps, period=1) + row_generator = RowGenerator(max_id) + + futures = [] + for _ in range(args.write_threads): + future = threading.Thread( + name="slo_run_write", + target=run_writes, + args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + +def push_metric(limiter, runtime, metrics): + start_time = time.time() + while time.time() - start_time < runtime: + with limiter: + metrics.push() + + +def run_metric_job(args, metrics): + limiter = RateLimiter(max_calls=10**6 // args.report_period, period=1) + future = threading.Thread( + name="slo_run_metrics", + target=push_metric, + args=(limiter, args.time, metrics), + ) + future.start() + return future diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py new file mode 100644 index 00000000..0d09ec61 --- /dev/null +++ b/tests/slo/src/metrics.py @@ -0,0 +1,115 @@ +import time +from contextlib import contextmanager +from importlib.metadata import version + +from os import environ + +environ["PROMETHEUS_DISABLE_CREATED_SERIES"] = "True" + +from prometheus_client import CollectorRegistry, Gauge, Histogram, push_to_gateway # noqa: E402 +from summary import Summary # noqa: E402 + +JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write" +JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" + + +class Metrics: + def __init__(self, push_gateway): + self._push_gtw = push_gateway + self._registry = CollectorRegistry() + self._metrics = dict( + oks=Gauge( + "oks", + "amount of OK requests", + labelnames=("jobName",), + registry=self._registry, + ), + not_oks=Gauge( + "not_oks", + "amount of not OK requests", + labelnames=("jobName",), + registry=self._registry, + ), + inflight=Gauge( + "inflight", + "amount of requests in flight", + labelnames=("jobName",), + registry=self._registry, + ), + latency=Summary( + "latency", + "summary of latencies in ms", + labelnames=("jobName", "status"), + registry=self._registry, + objectives=( + (0.5, 0.01), + (0.99, 0.001), + (1.0, 0.0), + ), + ), + attempts=Histogram( + "attempts", + "histogram of amount of requests", + labelnames=("jobName", "status"), + registry=self._registry, + buckets=tuple(range(1, 11)), + ), + ) + self.reset() + + def __getattr__(self, item): + try: + return self._metrics[item] + except KeyError: + raise AttributeError(f"'{Metrics.__name__}' object has no attribute '{item}'") + + @contextmanager + def measure(self, labels): + start_ts = self.start(labels) + error = None + try: + yield self + except Exception as err: + error = err + finally: + self.stop(labels, start_ts, error=error) + + def start(self, labels): + self.inflight.labels(labels).inc() + return time.time() + + def stop(self, labels, start_time, attempts=1, error=None): + runtime_ms = 1000 * (time.time() - start_time) + + self.inflight.labels(labels).dec() + + if error: + self.not_oks.labels(labels).inc() + self.latency.labels(labels, JOB_STATUS_ERR).observe(runtime_ms) + return + + self.oks.labels(labels).inc() + self.latency.labels(labels, JOB_STATUS_OK).observe(runtime_ms) + self.attempts.labels(labels, JOB_STATUS_OK).observe(attempts) + + def push(self): + push_to_gateway( + self._push_gtw, + job="workload-sync", + registry=self._registry, + grouping_key={ + "sdk": "python-sync", + "sdkVersion": version("ydb"), + }, + ) + + def reset(self): + for label in (JOB_READ_LABEL, JOB_WRITE_LABEL): + self.oks.labels(label).set(0) + self.not_oks.labels(label).set(0) + self.inflight.labels(label).set(0) + + self.latency.clear() + self.attempts.clear() + + self.push() diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py new file mode 100644 index 00000000..8b709ba4 --- /dev/null +++ b/tests/slo/src/options.py @@ -0,0 +1,79 @@ +import argparse + + +def add_common_options(parser): + parser.add_argument("endpoint", help="YDB endpoint") + parser.add_argument("db", help="YDB database name") + parser.add_argument("-t", "--table-name", default="key_value", help="Table name") + + +def make_create_parser(subparsers): + create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content") + add_common_options(create_parser) + + create_parser.add_argument( + "-p-min", "--min-partitions-count", default=6, type=int, help="Minimum amount of partitions in table" + ) + create_parser.add_argument( + "-p-max", "--max-partitions-count", default=1000, type=int, help="Maximum amount of partitions in table" + ) + create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") + create_parser.add_argument( + "-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate" + ) + + create_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + + create_parser.add_argument( + "--batch-size", default=100, type=int, help="Number of new records in each create request" + ) + create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") + + +def make_run_parser(subparsers, name="run"): + run_parser = subparsers.add_parser(name, help="Run measurable workload") + add_common_options(run_parser) + + run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") + run_parser.add_argument("--read-timeout", default=70, type=int, help="Read requests execution timeout [ms]") + + run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") + run_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + + run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") + run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") + + run_parser.add_argument("--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway") + run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") + + run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") + run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") + + +def make_cleanup_parser(subparsers): + cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables") + add_common_options(cleanup_parser) + + +def get_root_parser(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="YDB Python SLO application", + ) + + subparsers = parser.add_subparsers( + title="subcommands", + dest="subcommand", + help="List of subcommands", + ) + + make_create_parser(subparsers) + make_run_parser(subparsers) + make_cleanup_parser(subparsers) + + return parser + + +def parse_options(): + parser = get_root_parser() + return parser.parse_args() diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py new file mode 100644 index 00000000..e63c8a56 --- /dev/null +++ b/tests/slo/src/runner.py @@ -0,0 +1,125 @@ +import ydb +import logging + +from os import path +from generator import batch_generator + +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor + +from jobs import run_read_jobs, run_write_jobs, run_metric_job +from metrics import Metrics + +logger = logging.getLogger(__name__) + + +INSERT_ROWS_TEMPLATE = """ +DECLARE $items AS List>; +UPSERT INTO `{}` +SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp +FROM AS_TABLE($items); +""" + + +def insert_rows(pool, prepared, data, timeout): + def transaction(session: ydb.table.Session): + tx = session.transaction() + tx.execute(prepared, {"$items": data}, settings=ydb.BaseRequestSettings().with_timeout(timeout)) + tx.commit() + + pool.retry_operation_sync(transaction) + logger.info("Insert %s rows", len(data)) + + +def run_create(args, driver, tb_name): + def create_table(session): + session.create_table( + tb_name, + ydb.TableDescription() + .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) + .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) + .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) + .with_primary_keys("object_hash", "object_id") + .with_uniform_partitions(args.min_partitions_count) + .with_partitioning_settings( + ydb.PartitioningSettings() + .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) + .with_min_partitions_count(args.min_partitions_count) + .with_max_partitions_count(args.max_partitions_count) + .with_partition_size_mb(args.partition_size) + ), + ) + + return session.prepare(INSERT_ROWS_TEMPLATE.format(tb_name)) + + with ydb.SessionPool(driver) as pool: + prepared = pool.retry_operation_sync(create_table) + + futures = set() + with ThreadPoolExecutor(max_workers=args.threads, thread_name_prefix="slo_create") as executor: + for batch in batch_generator(args): + futures.add(executor.submit(insert_rows, pool, prepared, batch, args.write_timeout / 1000)) + for f in concurrent.futures.as_completed(futures): + f.result() + + +def run_slo(args, driver, tb_name): + session = driver.table_client.session().create() + result = session.transaction().execute( + "SELECT MAX(`object_id`) as max_id FROM `{}`".format(tb_name), + commit_tx=True, + ) + max_id = result[0].rows[0]["max_id"] + logger.info("Max ID: %s", max_id) + + metrics = Metrics(args.prom_pgw) + + futures = ( + *run_read_jobs(args, driver, tb_name, max_id, metrics), + *run_write_jobs(args, driver, tb_name, max_id, metrics), + run_metric_job(args, metrics), + ) + + for future in futures: + future.join() + + metrics.reset() + + +def run_cleanup(args, driver, tb_name): + session = driver.table_client.session().create() + session.drop_table(tb_name) + + +def run_from_args(args): + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + credentials=ydb.credentials_from_env_variables(), + grpc_keep_alive_timeout=5000, + ) + + table_name = path.join(args.db, args.table_name) + + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=5) + try: + if args.subcommand == "create": + run_create(args, driver, table_name) + elif args.subcommand == "run": + run_slo(args, driver, table_name) + elif args.subcommand == "cleanup": + run_cleanup(args, driver, table_name) + else: + raise RuntimeError(f"Unknown command {args.subcommand}") + except BaseException: + logger.exception("Something went wrong") + raise + finally: + driver.stop() diff --git a/tests/slo/src/summary.py b/tests/slo/src/summary.py new file mode 100644 index 00000000..47c29657 --- /dev/null +++ b/tests/slo/src/summary.py @@ -0,0 +1,64 @@ +import time +from typing import Iterable, Optional, Sequence, Tuple +from threading import Lock + +from prometheus_client import Summary as _Summary, CollectorRegistry, REGISTRY +from prometheus_client.utils import floatToGoString +from prometheus_client.samples import Sample + +from quantile_estimator import TimeWindowEstimator + + +class Summary(_Summary): + DEFAULT_OBJECTIVES = ( + (0.5, 0.01), + (0.99, 0.001), + (1.0, 0.0001), + ) + + def __init__( + self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = "", + subsystem: str = "", + unit: str = "", + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + objectives: Sequence[Tuple[float, float]] = DEFAULT_OBJECTIVES, + ): + self._objectives = objectives + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + unit=unit, + registry=registry, + _labelvalues=_labelvalues, + ) + self._kwargs["objectives"] = objectives + self._lock = Lock() + + def _metric_init(self) -> None: + super()._metric_init() + + self._estimator = TimeWindowEstimator(*self._objectives) + self._created = time.time() + + def observe(self, amount: float) -> None: + super().observe(amount) + with self._lock: + self._estimator.observe(amount) + + def _child_samples(self) -> Iterable[Sample]: + samples = [] + for q, _ in self._objectives: + with self._lock: + value = self._estimator.query(q) + samples.append(Sample("", {"quantile": floatToGoString(q)}, value, None, None)) + + samples.extend(super()._child_samples()) + return tuple(samples)