Skip to content

Commit

Permalink
tests: validate kvstore cleanup in topic_delete_test
Browse files Browse the repository at this point in the history
Partitions have various kvstore items, stored by
the raft + storage layers.  When the topic is
deleted, they should be deleted too.

Use our offline log decoder tool to inspect
the kvstore out-of-band and validate the keys
go away when the topic is deleted.
  • Loading branch information
jcsp committed Jul 4, 2022
1 parent a30d5d3 commit 959c779
Showing 1 changed file with 90 additions and 21 deletions.
111 changes: 90 additions & 21 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until

import time
import re
from ducktape.mark import parametrize
import json

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
Expand All @@ -20,6 +19,75 @@
from rptest.services.metrics_check import MetricCheck


def get_kvstore_topic_key_counts(redpanda):
"""
Count the keys in KVStore that relate to Kafka topics: this excludes all
internal topic items: if no Kafka topics exist, this should be zero for
all nodes.
:returns: dict of Node to integer
"""

viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py"
viewer_cmd = f"{viewer_path} --type kvstore --path {redpanda.DATA_DIR}"

result = {}
for n in redpanda.nodes:
kvstore_json = n.account.ssh_output(viewer_cmd, combine_stderr=False)
assert len(kvstore_json) > 10

kvstore_data = json.loads(kvstore_json)

excess_keys = []
for shard, items in kvstore_data.items():
keys = [i['key'] for i in items]

for k in keys:
if k['keyspace'] == "cluster":
# Not a per-partition key
continue

if k['data'].get('group', None) in [0, 1, 2]:
# One of the internal topics
continue

if k['data'].get('ntp', {}).get('topic', None) == 'controller':
# Controller storage item
continue

excess_keys.append(k)

redpanda.logger.info(
f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}"
)

key_count = len(excess_keys)
result[n] = key_count

return result


def topic_storage_purged(redpanda, topic_name):
storage = redpanda.storage()
logs_removed = all(
map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes))

if not logs_removed:
return False

# Once logs are removed, also do more expensive inspection of
# kvstore to check that per-partition kvstore contents are
# gone. The user doesn't care about this, but it is important
# to avoid bugs that would cause kvstore to bloat through
# topic creation/destruction cycles.

topic_key_counts = get_kvstore_topic_key_counts(redpanda)
if any([v > 0 for v in topic_key_counts.values()]):
return False

return True


class TopicDeleteTest(RedpandaTest):
"""
Verify that topic deletion cleans up storage.
Expand All @@ -37,7 +105,9 @@ def __init__(self, test_context):
self.kafka_tools = KafkaCliTools(self.redpanda)

@cluster(num_nodes=3)
def topic_delete_test(self):
@parametrize(with_restart=False)
@parametrize(with_restart=True)
def topic_delete_test(self, with_restart):
def produce_until_partitions():
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
Expand All @@ -48,16 +118,20 @@ def produce_until_partitions():
backoff_sec=2,
err_msg="Expected partition did not materialize")

self.kafka_tools.delete_topic(self.topic)
if with_restart:
# Do a restart to encourage writes and flushes, especially to
# the kvstore.
self.redpanda.restart_nodes(self.redpanda.nodes)

def topic_storage_purged():
storage = self.redpanda.storage()
return all(
map(lambda n: self.topic not in n.ns["kafka"].topics,
storage.nodes))
# Sanity check the kvstore checks: there should be at least one kvstore entry
# per partition while the topic exists.
assert sum(get_kvstore_topic_key_counts(
self.redpanda).values()) >= self.topics[0].partition_count

self.kafka_tools.delete_topic(self.topic)

try:
wait_until(lambda: topic_storage_purged(),
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg="Topic storage was not removed")
Expand Down Expand Up @@ -136,17 +210,12 @@ def check_compaction():
pass
producer.free()

def topic_storage_purged():
storage = self.redpanda.storage()
return all(
map(lambda n: topic_name not in n.ns["kafka"].topics,
storage.nodes))

try:
wait_until(lambda: topic_storage_purged(),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")
wait_until(
lambda: topic_storage_purged(self.redpanda, topic_name),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")

except:
# On errors, dump listing of the storage location
Expand Down

0 comments on commit 959c779

Please sign in to comment.