diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 3c45abae3ce78..d82654fc216c5 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -9,15 +9,105 @@ from rptest.services.cluster import cluster from ducktape.utils.util import wait_until - -import time -import re +from ducktape.mark import parametrize +from requests.exceptions import HTTPError +import json from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.services.rpk_producer import RpkProducer from rptest.services.metrics_check import MetricCheck +from rptest.services.admin import Admin + + +def get_kvstore_topic_key_counts(redpanda): + """ + Count the keys in KVStore that relate to Kafka topics: this excludes all + internal topic items: if no Kafka topics exist, this should be zero for + all nodes. + + :returns: dict of Node to integer + """ + + viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py" + viewer_cmd = f"{viewer_path} --type kvstore --path {redpanda.DATA_DIR}" + + # Find the raft group IDs of internal topics + admin = Admin(redpanda) + internal_group_ids = set() + for ntp in [ + ('redpanda', 'controller', 0), + ('kafka_internal', 'id_allocator', 0), + ]: + namespace, topic, partition = ntp + try: + p = admin.get_partitions(topic=topic, + namespace=namespace, + partition=partition) + except HTTPError as e: + # OK if internal topic doesn't exist (e.g. id_allocator + # doesn't have to exist) + if e.response.status_code != 404: + raise + else: + internal_group_ids.add(p['raft_group_id']) + + result = {} + for n in redpanda.nodes: + kvstore_json = n.account.ssh_output(viewer_cmd, combine_stderr=False) + assert len(kvstore_json) > 10 + + kvstore_data = json.loads(kvstore_json) + + excess_keys = [] + for shard, items in kvstore_data.items(): + keys = [i['key'] for i in items] + + for k in keys: + if k['keyspace'] == "cluster": + # Not a per-partition key + continue + + if k['data'].get('group', None) in internal_group_ids: + # One of the internal topics + continue + + if k['data'].get('ntp', {}).get('topic', None) == 'controller': + # Controller storage item + continue + + excess_keys.append(k) + + redpanda.logger.info( + f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}" + ) + + key_count = len(excess_keys) + result[n] = key_count + + return result + + +def topic_storage_purged(redpanda, topic_name): + storage = redpanda.storage() + logs_removed = all( + map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes)) + + if not logs_removed: + return False + + # Once logs are removed, also do more expensive inspection of + # kvstore to check that per-partition kvstore contents are + # gone. The user doesn't care about this, but it is important + # to avoid bugs that would cause kvstore to bloat through + # topic creation/destruction cycles. + + topic_key_counts = get_kvstore_topic_key_counts(redpanda) + if any([v > 0 for v in topic_key_counts.values()]): + return False + + return True class TopicDeleteTest(RedpandaTest): @@ -37,7 +127,9 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) @cluster(num_nodes=3) - def topic_delete_test(self): + @parametrize(with_restart=False) + @parametrize(with_restart=True) + def topic_delete_test(self, with_restart): def produce_until_partitions(): self.kafka_tools.produce(self.topic, 1024, 1024) storage = self.redpanda.storage() @@ -48,16 +140,20 @@ def produce_until_partitions(): backoff_sec=2, err_msg="Expected partition did not materialize") - self.kafka_tools.delete_topic(self.topic) + if with_restart: + # Do a restart to encourage writes and flushes, especially to + # the kvstore. + self.redpanda.restart_nodes(self.redpanda.nodes) - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: self.topic not in n.ns["kafka"].topics, - storage.nodes)) + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + + self.kafka_tools.delete_topic(self.topic) try: - wait_until(lambda: topic_storage_purged(), + wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), timeout_sec=30, backoff_sec=2, err_msg="Topic storage was not removed") @@ -136,17 +232,12 @@ def check_compaction(): pass producer.free() - def topic_storage_purged(): - storage = self.redpanda.storage() - return all( - map(lambda n: topic_name not in n.ns["kafka"].topics, - storage.nodes)) - try: - wait_until(lambda: topic_storage_purged(), - timeout_sec=60, - backoff_sec=2, - err_msg="Topic storage was not removed") + wait_until( + lambda: topic_storage_purged(self.redpanda, topic_name), + timeout_sec=60, + backoff_sec=2, + err_msg="Topic storage was not removed") except: # On errors, dump listing of the storage location