Skip to content

Commit

Permalink
tests: validate kvstore cleanup in topic_delete_test
Browse files Browse the repository at this point in the history
Partitions have various kvstore items, stored by
the raft + storage layers.  When the topic is
deleted, they should be deleted too.

Use our offline log decoder tool to inspect
the kvstore out-of-band and validate the keys
go away when the topic is deleted.
  • Loading branch information
jcsp committed Jul 4, 2022
1 parent 0d993d2 commit 52ad639
Showing 1 changed file with 112 additions and 21 deletions.
133 changes: 112 additions & 21 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,105 @@

from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until

import time
import re
from ducktape.mark import parametrize
from requests.exceptions import HTTPError
import json

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.admin import Admin


def get_kvstore_topic_key_counts(redpanda):
"""
Count the keys in KVStore that relate to Kafka topics: this excludes all
internal topic items: if no Kafka topics exist, this should be zero for
all nodes.
:returns: dict of Node to integer
"""

viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py"
viewer_cmd = f"{viewer_path} --type kvstore --path {redpanda.DATA_DIR}"

# Find the raft group IDs of internal topics
admin = Admin(redpanda)
internal_group_ids = set()
for ntp in [
('redpanda', 'controller', 0),
('kafka_internal', 'id_allocator', 0),
]:
namespace, topic, partition = ntp
try:
p = admin.get_partitions(topic=topic,
namespace=namespace,
partition=partition)
except HTTPError as e:
# OK if internal topic doesn't exist (e.g. id_allocator
# doesn't have to exist)
if e.response.status_code != 404:
raise
else:
internal_group_ids.add(p['raft_group_id'])

result = {}
for n in redpanda.nodes:
kvstore_json = n.account.ssh_output(viewer_cmd, combine_stderr=False)
assert len(kvstore_json) > 10

kvstore_data = json.loads(kvstore_json)

excess_keys = []
for shard, items in kvstore_data.items():
keys = [i['key'] for i in items]

for k in keys:
if k['keyspace'] == "cluster":
# Not a per-partition key
continue

if k['data'].get('group', None) in internal_group_ids:
# One of the internal topics
continue

if k['data'].get('ntp', {}).get('topic', None) == 'controller':
# Controller storage item
continue

excess_keys.append(k)

redpanda.logger.info(
f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}"
)

key_count = len(excess_keys)
result[n] = key_count

return result


def topic_storage_purged(redpanda, topic_name):
storage = redpanda.storage()
logs_removed = all(
map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes))

if not logs_removed:
return False

# Once logs are removed, also do more expensive inspection of
# kvstore to check that per-partition kvstore contents are
# gone. The user doesn't care about this, but it is important
# to avoid bugs that would cause kvstore to bloat through
# topic creation/destruction cycles.

topic_key_counts = get_kvstore_topic_key_counts(redpanda)
if any([v > 0 for v in topic_key_counts.values()]):
return False

return True


class TopicDeleteTest(RedpandaTest):
Expand All @@ -37,7 +127,9 @@ def __init__(self, test_context):
self.kafka_tools = KafkaCliTools(self.redpanda)

@cluster(num_nodes=3)
def topic_delete_test(self):
@parametrize(with_restart=False)
@parametrize(with_restart=True)
def topic_delete_test(self, with_restart):
def produce_until_partitions():
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
Expand All @@ -48,16 +140,20 @@ def produce_until_partitions():
backoff_sec=2,
err_msg="Expected partition did not materialize")

self.kafka_tools.delete_topic(self.topic)
if with_restart:
# Do a restart to encourage writes and flushes, especially to
# the kvstore.
self.redpanda.restart_nodes(self.redpanda.nodes)

def topic_storage_purged():
storage = self.redpanda.storage()
return all(
map(lambda n: self.topic not in n.ns["kafka"].topics,
storage.nodes))
# Sanity check the kvstore checks: there should be at least one kvstore entry
# per partition while the topic exists.
assert sum(get_kvstore_topic_key_counts(
self.redpanda).values()) >= self.topics[0].partition_count

self.kafka_tools.delete_topic(self.topic)

try:
wait_until(lambda: topic_storage_purged(),
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg="Topic storage was not removed")
Expand Down Expand Up @@ -136,17 +232,12 @@ def check_compaction():
pass
producer.free()

def topic_storage_purged():
storage = self.redpanda.storage()
return all(
map(lambda n: topic_name not in n.ns["kafka"].topics,
storage.nodes))

try:
wait_until(lambda: topic_storage_purged(),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")
wait_until(
lambda: topic_storage_purged(self.redpanda, topic_name),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")

except:
# On errors, dump listing of the storage location
Expand Down

0 comments on commit 52ad639

Please sign in to comment.