Skip to content

Commit

Permalink
Merge pull request #5315 from jcsp/test-kvstore-deletion
Browse files Browse the repository at this point in the history
tests: update offline_log_viewer and use in ducktape tests
  • Loading branch information
jcsp committed Dec 16, 2022
2 parents 37026d5 + 189e475 commit f12e562
Show file tree
Hide file tree
Showing 11 changed files with 712 additions and 63 deletions.
3 changes: 3 additions & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,7 @@ RUN mkdir -p /opt/scripts && \
curl https://raw.githubusercontent.com/redpanda-data/seastar/2a9504b3238cba4150be59353bf8d0b3a01fe39c/scripts/seastar-addr2line -o /opt/scripts/seastar-addr2line && \
chmod +x /opt/scripts/seastar-addr2line

RUN mkdir -p /opt/scripts/offline_log_viewer
COPY --chown=0:0 tools/offline_log_viewer /opt/scripts/offline_log_viewer

CMD service ssh start && tail -f /dev/null
3 changes: 2 additions & 1 deletion tests/docker/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
!tests/docker/ssh
!tests/setup.py
!tests/python
!tests/go
!tests/go
!tools/offline_log_viewer
41 changes: 41 additions & 0 deletions tests/rptest/clients/offline_log_viewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import json


class OfflineLogViewer:
"""
Wrap tools/offline_log_viewer for use in tests: this is for tests that
want to peek at the structures, but also for validating the tool itself.
"""
def __init__(self, redpanda):
self._redpanda = redpanda

def _cmd(self, suffix):
viewer_path = "python3 /opt/scripts/offline_log_viewer/viewer.py"
return f"{viewer_path} --path {self._redpanda.DATA_DIR} {suffix}"

def read_kvstore(self, node):
cmd = self._cmd("--type kvstore")
kvstore_json = node.account.ssh_output(cmd, combine_stderr=False)
return json.loads(kvstore_json)

def read_controller(self, node):
cmd = self._cmd("--type controller")
controller_json = node.account.ssh_output(cmd, combine_stderr=False)
try:
return json.loads(controller_json)
except json.decoder.JSONDecodeError:
# Log the bad output before re-raising
self._redpanda.logger.error(
f"Invalid JSON output: {controller_json}")
import time
time.sleep(3600)
raise
125 changes: 108 additions & 17 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,108 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from ducktape.utils.util import wait_until
import time
import json

from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from requests.exceptions import HTTPError

from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import SISettings
from rptest.util import wait_for_segments_removal
from ducktape.mark import parametrize
from rptest.services.admin import Admin


def topic_storage_purged(redpanda, topic: str):
def get_kvstore_topic_key_counts(redpanda):
"""
Count the keys in KVStore that relate to Kafka topics: this excludes all
internal topic items: if no Kafka topics exist, this should be zero for
all nodes.
:returns: dict of Node to integer
"""

viewer = OfflineLogViewer(redpanda)

# Find the raft group IDs of internal topics
admin = Admin(redpanda)
internal_group_ids = set()
for ntp in [
('redpanda', 'controller', 0),
('kafka_internal', 'id_allocator', 0),
]:
namespace, topic, partition = ntp
try:
p = admin.get_partitions(topic=topic,
namespace=namespace,
partition=partition)
except HTTPError as e:
# OK if internal topic doesn't exist (e.g. id_allocator
# doesn't have to exist)
if e.response.status_code != 404:
raise
else:
internal_group_ids.add(p['raft_group_id'])

result = {}
for n in redpanda.nodes:
kvstore_data = viewer.read_kvstore(node=n)

excess_keys = []
for shard, items in kvstore_data.items():
keys = [i['key'] for i in items]

for k in keys:
if k['keyspace'] == "cluster":
# Not a per-partition key
continue

if k['data'].get('group', None) in internal_group_ids:
# One of the internal topics
continue

if k['data'].get('ntp', {}).get('topic', None) == 'controller':
# Controller storage item
continue

excess_keys.append(k)

redpanda.logger.info(
f"{n.name}.{shard} Excess Keys {json.dumps(excess_keys,indent=2)}"
)

key_count = len(excess_keys)
result[n] = key_count

return result


def topic_storage_purged(redpanda, topic_name):
storage = redpanda.storage()
return all(map(lambda n: topic not in n.ns["kafka"].topics, storage.nodes))
logs_removed = all(
map(lambda n: topic_name not in n.ns["kafka"].topics, storage.nodes))

if not logs_removed:
return False

# Once logs are removed, also do more expensive inspection of
# kvstore to check that per-partition kvstore contents are
# gone. The user doesn't care about this, but it is important
# to avoid bugs that would cause kvstore to bloat through
# topic creation/destruction cycles.

topic_key_counts = get_kvstore_topic_key_counts(redpanda)
if any([v > 0 for v in topic_key_counts.values()]):
return False

return True


class TopicDeleteTest(RedpandaTest):
Expand All @@ -44,7 +128,9 @@ def __init__(self, test_context):
self.kafka_tools = KafkaCliTools(self.redpanda)

@cluster(num_nodes=3)
def topic_delete_test(self):
@parametrize(with_restart=False)
@parametrize(with_restart=True)
def topic_delete_test(self, with_restart):
def produce_until_partitions():
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
Expand All @@ -55,6 +141,16 @@ def produce_until_partitions():
backoff_sec=2,
err_msg="Expected partition did not materialize")

if with_restart:
# Do a restart to encourage writes and flushes, especially to
# the kvstore.
self.redpanda.restart_nodes(self.redpanda.nodes)

# Sanity check the kvstore checks: there should be at least one kvstore entry
# per partition while the topic exists.
assert sum(get_kvstore_topic_key_counts(
self.redpanda).values()) >= self.topics[0].partition_count

self.kafka_tools.delete_topic(self.topic)

try:
Expand Down Expand Up @@ -294,17 +390,12 @@ def check_compaction():
pass
producer.free()

def topic_storage_purged():
storage = self.redpanda.storage()
return all(
map(lambda n: topic_name not in n.ns["kafka"].topics,
storage.nodes))

try:
wait_until(lambda: topic_storage_purged(),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")
wait_until(
lambda: topic_storage_purged(self.redpanda, topic_name),
timeout_sec=60,
backoff_sec=2,
err_msg="Topic storage was not removed")

except:
# On errors, dump listing of the storage location
Expand Down
10 changes: 10 additions & 0 deletions tests/rptest/tests/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.end_to_end import EndToEndTest
Expand Down Expand Up @@ -259,6 +260,15 @@ def _consumer_offsets_present():
assert self._rand_consumer.consumer_status.validator.total_reads >= self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL
assert self._cg_consumer.consumer_status.validator.valid_reads >= wrote_at_least

# Validate that the data structures written by a mixture of historical
# versions remain readable by our current debug tools
log_viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.nodes:
controller_records = log_viewer.read_controller(node=node)
self.logger.info(
f"Read {len(controller_records)} controller records from node {node.name} successfully"
)


class UpgradeWithWorkloadTest(EndToEndTest):
"""
Expand Down
Loading

0 comments on commit f12e562

Please sign in to comment.