diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index b8e6a342a64a..75bd9bbbae4a 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -16,6 +16,7 @@ #include "cloud_storage/types.h" #include "storage/parser_errc.h" #include "storage/types.h" +#include "utils/gate_guard.h" #include "utils/retry_chain_node.h" #include "utils/stream_utils.h" @@ -501,27 +502,29 @@ remote_partition::get_term_last_offset(model::term_id term) const { ss::future> remote_partition::aborted_transactions(offset_range offsets) { + gate_guard guard(_gate); // Here we have to use kafka offsets to locate the segments and // redpanda offsets to extract aborted transactions metadata because // tx-manifests contains redpanda offsets. std::vector result; - auto first_it = _segments.upper_bound(offsets.begin); - if (first_it != _segments.begin()) { + + // that's a stable btree iterator that makes key lookup on increment + auto first_it = upper_bound(offsets.begin); + if (first_it != begin()) { first_it = std::prev(first_it); } - for (auto it = first_it; it != _segments.end(); it++) { + for (auto it = first_it; it != end(); it++) { if (it->first > offsets.end) { break; } - auto& st = it->second; auto tx = co_await ss::visit( - st, - [this, &st, offsets, offset_key = it->first]( + it->second, + [this, offsets, offset_key = it->first]( offloaded_segment_state& off_state) { auto tmp = off_state->materialize(*this, offset_key); auto res = tmp->segment->aborted_transactions( offsets.begin_rp, offsets.end_rp); - st = std::move(tmp); + _segments.insert_or_assign(offset_key, std::move(tmp)); return res; }, [offsets](materialized_segment_ptr& m_state) { @@ -540,10 +543,10 @@ remote_partition::aborted_transactions(offset_range offsets) { "found {} aborted transactions for {}-{} offset range ({}-{} before " "offset translaction)", result.size(), - offsets.begin_rp, offsets.begin, - offsets.end_rp, - offsets.end); + offsets.end, + offsets.begin_rp, + offsets.end_rp); co_return result; } @@ -560,9 +563,9 @@ ss::future<> remote_partition::stop() { co_await std::visit([](auto&& rs) { return rs->stop(); }, rs); } - for (auto& [offset, seg] : _segments) { - vlog(_ctxlog.debug, "remote partition stop {}", offset); - co_await std::visit([](auto&& st) { return st->stop(); }, seg); + for (auto it = begin(); it != end(); it++) { + vlog(_ctxlog.debug, "remote partition stop {}", it->first); + co_await std::visit([](auto&& st) { return st->stop(); }, it->second); } } diff --git a/src/v/cloud_storage/remote_partition.h b/src/v/cloud_storage/remote_partition.h index 3d14fce35729..8878d2326f09 100644 --- a/src/v/cloud_storage/remote_partition.h +++ b/src/v/cloud_storage/remote_partition.h @@ -48,8 +48,7 @@ namespace details { /// iterator stability guarantee by caching the key and /// doing a lookup on every increment. /// This turns iterator increment into O(logN) operation -/// but this is OK since the underlying btree_map has high -/// fan-out ratio so the logarithm base is relatively large. +/// Deleting from underlying btree_map is not supported. template class btree_map_stable_iterator : public boost::iterator_facade< @@ -93,6 +92,10 @@ class btree_map_stable_iterator vassert( _key.has_value(), "btree_map_stable_iterator can't be incremented"); auto it = _map.get().find(*_key); + // _key should be present since deletions are not supported + vassert( + it != _map.get().end(), + "btree_map_stable_iterator can't be incremented"); ++it; if (it == _map.get().end()) { set_end(); @@ -326,6 +329,10 @@ class remote_partition cache& _cache; const partition_manifest& _manifest; s3::bucket_name _bucket; + + // Deleting from _segments is not supported. + // absl::btree_map doesn't provide a pointer stabilty. We are + // using remote_partition::btree_map_stable_iterator to work around this. segment_map_t _segments; eviction_list_t _eviction_list; intrusive_list<