Skip to content

Commit

Permalink
Merge pull request #5704 from LenaAn/iter
Browse files Browse the repository at this point in the history
cloud_storage: use stable iterator for absl::btree
  • Loading branch information
Lena Anyusheva committed Aug 2, 2022
2 parents 0d8ca88 + 28aab3f commit 4b9875b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
29 changes: 16 additions & 13 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/types.h"
#include "storage/parser_errc.h"
#include "storage/types.h"
#include "utils/gate_guard.h"
#include "utils/retry_chain_node.h"
#include "utils/stream_utils.h"

Expand Down Expand Up @@ -501,27 +502,29 @@ remote_partition::get_term_last_offset(model::term_id term) const {

ss::future<std::vector<cluster::rm_stm::tx_range>>
remote_partition::aborted_transactions(offset_range offsets) {
gate_guard guard(_gate);
// Here we have to use kafka offsets to locate the segments and
// redpanda offsets to extract aborted transactions metadata because
// tx-manifests contains redpanda offsets.
std::vector<cluster::rm_stm::tx_range> result;
auto first_it = _segments.upper_bound(offsets.begin);
if (first_it != _segments.begin()) {

// that's a stable btree iterator that makes key lookup on increment
auto first_it = upper_bound(offsets.begin);
if (first_it != begin()) {
first_it = std::prev(first_it);
}
for (auto it = first_it; it != _segments.end(); it++) {
for (auto it = first_it; it != end(); it++) {
if (it->first > offsets.end) {
break;
}
auto& st = it->second;
auto tx = co_await ss::visit(
st,
[this, &st, offsets, offset_key = it->first](
it->second,
[this, offsets, offset_key = it->first](
offloaded_segment_state& off_state) {
auto tmp = off_state->materialize(*this, offset_key);
auto res = tmp->segment->aborted_transactions(
offsets.begin_rp, offsets.end_rp);
st = std::move(tmp);
_segments.insert_or_assign(offset_key, std::move(tmp));
return res;
},
[offsets](materialized_segment_ptr& m_state) {
Expand All @@ -540,10 +543,10 @@ remote_partition::aborted_transactions(offset_range offsets) {
"found {} aborted transactions for {}-{} offset range ({}-{} before "
"offset translaction)",
result.size(),
offsets.begin_rp,
offsets.begin,
offsets.end_rp,
offsets.end);
offsets.end,
offsets.begin_rp,
offsets.end_rp);
co_return result;
}

Expand All @@ -560,9 +563,9 @@ ss::future<> remote_partition::stop() {
co_await std::visit([](auto&& rs) { return rs->stop(); }, rs);
}

for (auto& [offset, seg] : _segments) {
vlog(_ctxlog.debug, "remote partition stop {}", offset);
co_await std::visit([](auto&& st) { return st->stop(); }, seg);
for (auto it = begin(); it != end(); it++) {
vlog(_ctxlog.debug, "remote partition stop {}", it->first);
co_await std::visit([](auto&& st) { return st->stop(); }, it->second);
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ namespace details {
/// iterator stability guarantee by caching the key and
/// doing a lookup on every increment.
/// This turns iterator increment into O(logN) operation
/// but this is OK since the underlying btree_map has high
/// fan-out ratio so the logarithm base is relatively large.
/// Deleting from underlying btree_map is not supported.
template<class TKey, class TVal>
class btree_map_stable_iterator
: public boost::iterator_facade<
Expand Down Expand Up @@ -93,6 +92,10 @@ class btree_map_stable_iterator
vassert(
_key.has_value(), "btree_map_stable_iterator can't be incremented");
auto it = _map.get().find(*_key);
// _key should be present since deletions are not supported
vassert(
it != _map.get().end(),
"btree_map_stable_iterator can't be incremented");
++it;
if (it == _map.get().end()) {
set_end();
Expand Down Expand Up @@ -326,6 +329,10 @@ class remote_partition
cache& _cache;
const partition_manifest& _manifest;
s3::bucket_name _bucket;

// Deleting from _segments is not supported.
// absl::btree_map doesn't provide a pointer stabilty. We are
// using remote_partition::btree_map_stable_iterator to work around this.
segment_map_t _segments;
eviction_list_t _eviction_list;
intrusive_list<
Expand Down

0 comments on commit 4b9875b

Please sign in to comment.