Skip to content

Commit

Permalink
slo: add create/cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Valeriya Popova committed Jun 8, 2023
1 parent 2956229 commit 40fb5a4
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "3.9"
version: "3.3"
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:latest
Expand Down
Empty file added tests/slo/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions tests/slo/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import gc

from options import parse_options
from runner import run_from_args


if __name__ == "__main__":
args = parse_options()
gc.disable()
run_from_args(args)
83 changes: 83 additions & 0 deletions tests/slo/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
import dataclasses
import logging
import random
import string
from datetime import datetime

logger = logging.getLogger(__name__)


MaxUi32 = 2**32 - 1


def hash_ui32(value):
return abs(hash(str(value))) % MaxUi32


def generate_random_string(min_len, max_len):
strlen = random.randint(min_len, max_len)
return "".join(random.choices(string.ascii_lowercase, k=strlen))


@dataclasses.dataclass(slots=True)
class Row:
object_id_key: int
object_id: int
payload_str: str
payload_double: float
payload_timestamp: datetime

# First id in current shard
def get_shard_id(self, partitions_count):
shard_size = int((MaxUi32 + 1) / partitions_count)
return self.object_id_key / shard_size


@dataclasses.dataclass
class RowGenerator:
id_counter: int = 0

def get(self):
self.id_counter += 1
if self.id_counter >= MaxUi32:
self.id_counter = 0
logger.warning("RowGenerator: maxint reached")

return Row(
object_id_key=hash_ui32(self.id_counter),
object_id=self.id_counter,
payload_str=generate_random_string(20, 40),
payload_double=random.random(),
payload_timestamp=datetime.now(),
)


class PackGenerator:
def __init__(self, args, start_id=0):
self._row_generator = RowGenerator(start_id)

self._remain = args.initial_data_count
self._pack_size = args.pack_size
self._partitions_count = args.partitions_count

self._packs = {}

def get_next_pack(self):
while self._remain:
new_record = self._row_generator.get()
shard_id = new_record.get_shard_id(self._partitions_count)

self._remain -= 1

if shard_id in self._packs:
existing_pack = self._packs[shard_id]
existing_pack.append(new_record)
if len(existing_pack) >= self._pack_size:
return self._packs.pop(shard_id)
else:
self._packs[shard_id] = [new_record]

for shard_id, pack in self._packs.items():
if pack:
return self._packs.pop(shard_id)
63 changes: 63 additions & 0 deletions tests/slo/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import argparse


def add_common_options(parser):
parser.add_argument("endpoint", help="YDB endpoint")
parser.add_argument("db", help="YDB database name")
parser.add_argument("-t", "--table_name", default="key_value", help="Table name")
parser.add_argument("--write_timeout", default=20000, type=int, help="Read requests execution timeout [ms]")


def make_create_parser(subparsers):
create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content")
add_common_options(create_parser)
create_parser.add_argument("-p", "--partitions_count", default=64, type=int, help="Number of partition in table")
create_parser.add_argument(
"-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate"
)
create_parser.add_argument(
"--pack_size", default="100", type=int, help="Number of new records in each create request"
)


def make_run_parser(subparsers, name="run"):
run_parser = subparsers.add_parser(name, help="Run measurable workload")
add_common_options(run_parser)
run_parser.add_argument("--write_rps", default=10, type=int, help="Write request rps")
run_parser.add_argument("--read_rps", default=100, type=int, help="Read request rps")
run_parser.add_argument("--no_write", default=False, action="store_true")
run_parser.add_argument("--no_read", default=False, action="store_true")
run_parser.add_argument("--time", default=10, type=int, help="Time to run (Seconds)")
run_parser.add_argument("--read_timeout", default=70, type=int, help="Read requests execution timeout [ms]")
run_parser.add_argument("--save_result", default=False, action="store_true", help="Save result to file")
run_parser.add_argument("--result_file_name", default="slo_result.json", help="Result json file name")
run_parser.add_argument("--no_prepare", default=False, action="store_true", help="Do not prepare requests")


def make_cleanup_parser(subparsers):
cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables")
add_common_options(cleanup_parser)


def get_root_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="YDB Python SLO application",
)

subparsers = parser.add_subparsers(
title="subcommands",
dest="subcommand",
help="List of subcommands",
)

make_create_parser(subparsers)
make_run_parser(subparsers)
make_cleanup_parser(subparsers)

return parser


def parse_options():
parser = get_root_parser()
return parser.parse_args()
72 changes: 72 additions & 0 deletions tests/slo/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import ydb

from os import path
from generator import PackGenerator

from prometheus_client import Summary, Counter


def run_create(args, driver):
session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
tb_name = path.join(args.db, args.table_name)
session.create_table(
tb_name,
ydb.TableDescription()
.with_column(ydb.Column("object_id_key", ydb.OptionalType(ydb.PrimitiveType.Uint32)))
.with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint32)))
.with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
.with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double)))
.with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp)))
.with_primary_keys("object_id_key", "object_id")
.with_profile(
ydb.TableProfile().with_partitioning_policy(
ydb.PartitioningPolicy().with_uniform_partitions(args.partitions_count)
)
),
)

prepare_q = """
DECLARE $items AS List<Struct<
object_id_key: Uint32,
object_id: Uint32,
payload_str: Utf8,
payload_double: Double,
payload_timestamp: Timestamp>>;
UPSERT INTO `{}` SELECT * FROM AS_TABLE($items);
"""
prepared = session.prepare(prepare_q.format(tb_name))

generator = PackGenerator(args)
while data := generator.get_next_pack():
tx = session.transaction()
tx.execute(prepared, {"$items": data})
tx.commit()


def run_cleanup(args, driver):
session = driver.table_client.session().create()
session.drop_table(path.join(args.db, args.table_name))


def run_from_args(args):
driver_config = ydb.DriverConfig(
args.endpoint,
database=args.db,
credentials=ydb.credentials_from_env_variables(),
grpc_keep_alive_timeout=5000,
)

with ydb.Driver(driver_config) as driver:
driver.wait(timeout=5)

try:
if args.subcommand == "create":
run_create(args, driver)
elif args.subcommand == "run":
pass
elif args.subcommand == "cleanup":
run_cleanup(args, driver)
else:
raise RuntimeError(f"Unknown command {args.subcommand}")
finally:
driver.stop()

0 comments on commit 40fb5a4

Please sign in to comment.