Skip to content

Commit

Permalink
Merge pull request redpanda-data#7460 from jcsp/issue-7433-test
Browse files Browse the repository at this point in the history
tests: add topic_delete_unavailable_test, tweak tiered storage topic deletion order
  • Loading branch information
jcsp committed Jan 4, 2023
2 parents 6199646 + e2d5652 commit 1a3b9c0
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,14 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
.then([this, ntp] { _unmanage_watchers.notify(ntp, ntp.tp.partition); })
.then([partition] { return partition->stop(); })
.then([partition] { return partition->remove_persistent_state(); })
.then([this, ntp] { return _storage.log_mgr().remove(ntp); })
.then([partition, mode] {
if (mode == partition_removal_mode::global) {
return partition->remove_remote_persistent_state();
} else {
return ss::now();
}
})
.then([this, ntp] { return _storage.log_mgr().remove(ntp); })
.finally([partition] {}); // in the end remove partition
}

Expand Down
29 changes: 27 additions & 2 deletions tests/rptest/archival/s3_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import boto3
import re
from botocore.config import Config
from botocore.exceptions import ClientError
from time import sleep
from functools import wraps
import time
import datetime
from typing import Iterator, NamedTuple, Union
from typing import Iterator, NamedTuple, Union, Optional


class SlowDown(Exception):
Expand Down Expand Up @@ -289,7 +290,24 @@ def _list_objects(self, bucket, token=None, limit=1000):
else:
raise

def list_objects(self, bucket) -> Iterator[S3ObjectMetadata]:
def _key_to_topic(self, key: str):
# Segment objects: <hash>/<ns>/<topic>/<partition>_<revision>/...
# Manifest objects: <hash>/meta/<ns>/<topic>/<partition>_<revision>/...
# Topic manifest objects: <hash>/meta/<ns>/<topic>/topic_manifest.json
m = re.search(".+/(.+)/(.+)/(\d+_\d+/|topic_manifest.json)", key)
if m is None:
return None
else:
return m.group(2)

def list_objects(
self,
bucket,
topic: Optional[str] = None) -> Iterator[S3ObjectMetadata]:
"""
:param bucket: S3 bucket name
:param topic: Optional, if set then only return objects belonging to this topic
"""
token = None
truncated = True
while truncated:
Expand All @@ -298,6 +316,13 @@ def list_objects(self, bucket) -> Iterator[S3ObjectMetadata]:
truncated = bool(res['IsTruncated'])
if 'Contents' in res:
for item in res['Contents']:

# Apply optional topic filtering
if topic is not None and self._key_to_topic(
item['Key']) != topic:
self.logger.debug(f"Skip {item['Key']} for {topic}")
continue

yield S3ObjectMetadata(Bucket=bucket,
Key=item['Key'],
ETag=item['ETag'][1:-1],
Expand Down
135 changes: 101 additions & 34 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import SISettings
from rptest.util import wait_for_segments_removal
from rptest.util import wait_for_segments_removal, firewall_blocked
from rptest.services.admin import Admin


Expand Down Expand Up @@ -171,7 +171,8 @@ def produce_until_partitions():


class TopicDeleteCloudStorageTest(RedpandaTest):
topics = (TopicSpec(partition_count=3,
partition_count = 3
topics = (TopicSpec(partition_count=partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE), )

def __init__(self, test_context):
Expand All @@ -183,30 +184,113 @@ def __init__(self, test_context):
num_brokers=test_context.cluster.available().size(),
si_settings=self.si_settings)

self._s3_port = self.si_settings.cloud_storage_api_endpoint_port

self.kafka_tools = KafkaCliTools(self.redpanda)

def _populate_topic(self):
def _populate_topic(self, topic_name):
"""
Get system into state where there is data in both local
and remote storage for the topic.
"""
# Set retention to 5MB
self.kafka_tools.alter_topic_config(
self.topic, {'retention.local.target.bytes': 5 * 1024 * 1024})
topic_name, {'retention.local.target.bytes': 5 * 1024 * 1024})

# Write out 10MB
self.kafka_tools.produce(self.topic,
# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560)
num_records=2560 * self.partition_count)

# Wait for segments evicted from local storage
for i in range(0, 3):
wait_for_segments_removal(self.redpanda, self.topic, i, 5)
for i in range(0, self.partition_count):
wait_for_segments_removal(self.redpanda, topic_name, i, 5)

# Confirm objects in remote storage
before_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
assert sum(1 for _ in before_objects) > 0
objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
assert sum(1 for _ in objects) > 0

@cluster(
num_nodes=3,
log_allow_list=[
'exception while executing partition operation: {type: deletion'
])
def topic_delete_unavailable_test(self):
"""
Test deleting while the S3 backend is unavailable: we should see
that local deletion proceeds, and remote deletion eventually
gives up.
"""
self._populate_topic(self.topic)
objects_before = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=self.topic))
assert len(objects_before) > 0

with firewall_blocked(self.redpanda.nodes, self._s3_port):
self.kafka_tools.delete_topic(self.topic)

# From user's point of view, deletion succeeds
assert self.topic not in self.kafka_tools.list_topics()

# Local storage deletion should proceed even if remote can't
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=1)

# Erase timeout is hardcoded 60 seconds, wait long enough
# for it to give up.
time.sleep(90)

# Confirm our firewall block is really working, nothing was deleted
objects_after = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket))
assert len(objects_after) >= len(objects_before)

# Check that after the controller backend experiences errors trying
# to execute partition deletion, it is still happily able to execute
# other operations on unrelated topics, i.e. has not stalled applying.
next_topic = "next_topic"
self.kafka_tools.create_topic(
TopicSpec(name=next_topic,
partition_count=self.partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE))
self._populate_topic(next_topic)
after_keys = set(o.Key for o in self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=next_topic))
assert len(after_keys) > 0

self.kafka_tools.delete_topic(next_topic)
wait_until(lambda: topic_storage_purged(self.redpanda, next_topic),
timeout_sec=30,
backoff_sec=1)

wait_until(lambda: self._topic_remote_deleted(next_topic),
timeout_sec=30,
backoff_sec=1)

# The controller gave up on deleting the original topic, objects
# are left behind in the object store. This condition can be updated
# if we ever implement a mechanism for automatically GCing objects after
# a drop in the object storage backend.
final_objects = set(
self.s3_client.list_objects(self.si_settings.cloud_storage_bucket,
topic=self.topic))
assert len(final_objects) >= len(objects_before)

def _topic_remote_deleted(self, topic_name: str):
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
self.logger.debug(f"Objects after topic {topic_name} deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

@cluster(num_nodes=3)
@parametrize(disable_delete=False)
Expand All @@ -218,7 +302,7 @@ def topic_delete_cloud_storage_test(self, disable_delete):
self.kafka_tools.alter_topic_config(
self.topic, {'redpanda.remote.delete': 'false'})

self._populate_topic()
self._populate_topic(self.topic)

objects_before = set(
self.redpanda.s3_client.list_objects(
Expand All @@ -232,18 +316,6 @@ def topic_delete_cloud_storage_test(self, disable_delete):
timeout_sec=30,
backoff_sec=1)

def remote_empty():
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
self.logger.debug("Objects after topic deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

if disable_delete:
# Unfortunately there is no alternative ot sleeping here:
# we need to confirm not only that objects aren't deleted
Expand All @@ -260,7 +332,9 @@ def remote_empty():
else:
# The counter-test that deletion _doesn't_ happen in read replicas
# is done as part of read_replica_e2e_test
wait_until(remote_empty, timeout_sec=30, backoff_sec=1)
wait_until(lambda: self._topic_remote_deleted(self.topic),
timeout_sec=30,
backoff_sec=1)

# TODO: include transactional data so that we verify that .txrange
# objects are deleted.
Expand All @@ -269,13 +343,6 @@ def remote_empty():
# catch the case where there are segments in S3 not reflected in the
# manifest.

# TODO: test making the S3 backend unavailable during the topic
# delete. The delete action should be acked, but internally
# redpanda should keep retrying the S3 part until it succeeds.
# - When we bring the S3 backend back it shoudl succeed
# - If we restart redpanda before bringing the S3 backend back
# it should also succeed.

@cluster(num_nodes=4)
def partition_movement_test(self):
"""
Expand All @@ -290,7 +357,7 @@ def partition_movement_test(self):

admin = Admin(self.redpanda)

self._populate_topic()
self._populate_topic(self.topic)

objects_before = set(o.Key
for o in self.redpanda.s3_client.list_objects(
Expand Down

0 comments on commit 1a3b9c0

Please sign in to comment.