Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic recovery download with capped size. Download not more than retention.policy #6797

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cloud_storage/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ partition_downloader::download_log_with_capped_size(
model::offset_delta start_delta{0};
for (auto it = offset_map.rbegin(); it != offset_map.rend(); it++) {
const auto& meta = it->second.meta;
if (total_size > max_size) {
if (total_size != 0 && total_size + meta.size_bytes > max_size) {
vlog(
_ctxlog.debug,
"Max size {} reached, skipping {}",
Expand Down
9 changes: 5 additions & 4 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from typing import NamedTuple, Optional, Callable, Sequence, Tuple

from ducktape.cluster.cluster import ClusterNode
from ducktape.mark import ok_to_fail
from ducktape.tests.test import TestContext
from ducktape.utils.util import wait_until

Expand All @@ -36,6 +35,7 @@
verify_file_layout,
gen_manifest_path,
get_on_disk_size_per_ntp,
get_expected_ntp_restored_size,
is_close_size,
EMPTY_SEGMENT_SIZE,
default_log_segment_size,
Expand Down Expand Up @@ -721,7 +721,7 @@ class SizeBasedRetention(BaseCase):
The test generates 20MB of data per ntp, than after the restart it recovers
the topic with size limit set to 10MB per ntp.
The verification takes into account individual segment size. The recovery process
should restore at least 10MB but not more than 10MB + segment size."""
should restore at not more than 10MB but not less than 10MB - oldest segment size."""
def __init__(self, s3_client, kafka_tools, rpk_client, s3_bucket, logger,
rpk_producer_maker, topics):
self.topics = topics
Expand Down Expand Up @@ -779,12 +779,14 @@ def validate_cluster(self, baseline, restored):
]

size_bytes_per_ntp = get_on_disk_size_per_ntp(restored)
expected_restored_size_per_ntp = get_expected_ntp_restored_size(
baseline, self.restored_size_bytes)

for ntp, size_bytes in size_bytes_per_ntp.items():
self.logger.info(
f"Partition {ntp} had size {size_bytes} on disk after recovery"
)
assert is_close_size(size_bytes, self.restored_size_bytes), \
assert is_close_size(size_bytes, expected_restored_size_per_ntp[ntp]), \
f"Too much or not enough data restored, expected {self.restored_size_bytes} got {size_bytes}"

for topic in self.topics:
Expand Down Expand Up @@ -1364,7 +1366,6 @@ def test_fast3(self):
self.rpk_producer_maker, topics)
self.do_run(test_case)

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/4887
@cluster(num_nodes=3, log_allow_list=TRANSIENT_ERRORS)
def test_size_based_retention(self):
"""Test topic recovery with size based retention policy.
Expand Down
37 changes: 37 additions & 0 deletions tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,43 @@ def get_on_disk_size_per_ntp(chk):
return size_bytes_per_ntp


def get_expected_ntp_restored_size(nodes_segments_report: dict[str,
dict[str,
(str,
int)]],
retention_policy: int):
""" Get expected retestored ntp disk size
We expect that redpanda will restore max
amount of segments with total size less
than retention_policy
"""
size_bytes_per_ntp = {}
segments_sizes_per_ntp = {}
for _node, report in nodes_segments_report.items():
tmp_partition_size = defaultdict(int)
tmp_segments_sizes = defaultdict(dict)
for path, summary in report.items():
segment = _parse_checksum_entry(path, summary, True)
ntp = segment.ntp
size = summary[1]
tmp_partition_size[ntp] += size
tmp_segments_sizes[ntp][segment.base_offset] = size
for ntp, size in tmp_partition_size.items():
if not ntp in size_bytes_per_ntp or size_bytes_per_ntp[ntp] < size:
size_bytes_per_ntp[ntp] = size
segments_sizes_per_ntp[ntp] = tmp_segments_sizes[ntp]
expected_restored_sizes = {}
for ntp, segments in segments_sizes_per_ntp.items():
expected_restored_sizes[ntp] = 0
for segment in sorted(segments.keys(), reverse=True):
if expected_restored_sizes[ntp] + segments_sizes_per_ntp[ntp][
segment] > retention_policy:
break
expected_restored_sizes[ntp] += segments_sizes_per_ntp[ntp][
segment]
return expected_restored_sizes