Skip to content

Commit

Permalink
Merge pull request #5812 from VladLazar/fix-5390
Browse files Browse the repository at this point in the history
test: use snapshots for detecting segment removal
  • Loading branch information
Vlad Lazar committed Aug 31, 2022
2 parents 09b9533 + 3f1cb7f commit befdb01
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
2 changes: 1 addition & 1 deletion tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ def safe_isdir(path):

return [p[0] for p in paths if safe_isdir(p[1])]

store = NodeStorage(RedpandaService.DATA_DIR)
store = NodeStorage(node.name, RedpandaService.DATA_DIR)
for ns in listdir(store.data_dir, True):
if ns == '.coprocessor_offset_checkpoints':
continue
Expand Down
26 changes: 25 additions & 1 deletion tests/rptest/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import re
import itertools
from typing import Optional


class Segment:
Expand Down Expand Up @@ -117,10 +118,15 @@ def __repr__(self):
return self.name


class PartitionNotFoundError(Exception):
pass


class NodeStorage:
def __init__(self, data_dir):
def __init__(self, name, data_dir):
self.data_dir = data_dir
self.ns = dict()
self.name = name

def add_namespace(self, ns, path):
n = Namespace(ns, path)
Expand All @@ -134,6 +140,17 @@ def partitions(self, ns, topic):
return [p[1] for p in parts.items()]
return []

def segments(self, ns: str, topic: str,
partition_idx: int) -> Optional[list[Segment]]:
partitions = self.partitions(ns, topic)
if len(partitions) <= partition_idx:
# Segments for unkown partition requested
raise PartitionNotFoundError(
f"Partition {partition_idx} of topic {topic} is not present on node {self.name}"
)

return partitions[partition_idx].segments.values()


class ClusterStorage:
def __init__(self):
Expand All @@ -145,3 +162,10 @@ def add_node(self, node_storage):
def partitions(self, ns, topic):
return itertools.chain(
*map(lambda n: n.partitions(ns, topic), self.nodes))

def segments_by_node(self, ns: str, topic: str,
partition_idx: int) -> dict[str, list[Segment]]:
return {
node.name: node.segments(ns, topic, partition_idx)
for node in self.nodes
}
43 changes: 34 additions & 9 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from rptest.util import Scale
from rptest.util import (
produce_until_segments,
wait_for_segments_removal,
wait_for_removal_of_n_segments,
)


Expand Down Expand Up @@ -91,16 +91,29 @@ def test_write(self):
count=10,
)

# Get a snapshot of the current segments, before tightening the
# retention policy.
original_snapshot = self.redpanda.storage(
all_nodes=True).segments_by_node("kafka", self.topic, 0)

for node, node_segments in original_snapshot.items():
assert len(
node_segments
) >= 10, f"Expected at least 10 segments, but got {len(node_segments)} on {node}"

self.kafka_tools.alter_topic_config(
self.topic,
{
TopicSpec.PROPERTY_RETENTION_BYTES: 5 * self.segment_size,
},
)
wait_for_segments_removal(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=6)

wait_for_removal_of_n_segments(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
n=6,
original_snapshot=original_snapshot)

self.start_consumer()
self.run_validation()

Expand All @@ -124,17 +137,29 @@ def test_write_with_node_failures(self):
count=10,
)

# Get a snapshot of the current segments, before tightening the
# retention policy.
original_snapshot = self.redpanda.storage(
all_nodes=True).segments_by_node("kafka", self.topic, 0)

for node, node_segments in original_snapshot.items():
assert len(
node_segments
) >= 10, f"Expected at least 10 segments, but got {len(node_segments)} on {node}"

self.kafka_tools.alter_topic_config(
self.topic,
{TopicSpec.PROPERTY_RETENTION_BYTES: 5 * self.segment_size},
)

assert self.redpanda
with random_process_kills(self.redpanda) as ctx:
wait_for_segments_removal(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=6)
wait_for_removal_of_n_segments(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
n=6,
original_snapshot=original_snapshot)

self.start_consumer()
self.run_validation()
ctx.assert_actions_triggered()
Expand Down
45 changes: 45 additions & 0 deletions tests/rptest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from requests.exceptions import HTTPError

from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.services.storage import Segment

from ducktape.errors import TimeoutError
import time

Expand Down Expand Up @@ -141,6 +143,49 @@ def done():
err_msg="Segments were not created")


def wait_for_removal_of_n_segments(redpanda, topic: str, partition_idx: int,
n: int,
original_snapshot: dict[str,
list[Segment]]):
"""
Wait until 'n' segments of a partition that are present in the
provided snapshot are removed by all brokers.
:param redpanda: redpanda service used by the test
:param topic: topic to wait on
:param partition_idx: index of partition to wait on
:param n: number of removed segments to wait for
:param original_snapshot: snapshot of segments to compare against
"""
def segments_removed():
current_snapshot = redpanda.storage(all_nodes=True).segments_by_node(
"kafka", topic, 0)

redpanda.logger.debug(
f"Current segment snapshot for topic {topic}: {current_snapshot}")

# Check how many of the original segments were removed
# for each of the nodes in the provided snapshot.
for node, original_segs in original_snapshot.items():
assert node in current_snapshot
current_segs_names = [s.name for s in current_snapshot[node]]

removed_segments = 0
for s in original_segs:
if s.name not in current_segs_names:
removed_segments += 1

if removed_segments < n:
return False

return True

wait_until(segments_removed,
timeout_sec=180,
backoff_sec=5,
err_msg="Segments were not removed from all nodes")


def wait_for_segments_removal(redpanda, topic, partition_idx, count):
"""
Wait until only given number of segments will left in a partitions
Expand Down

0 comments on commit befdb01

Please sign in to comment.