Skip to content

Commit

Permalink
Merge pull request #5556 from rystsov/truncate-persisted-stm
Browse files Browse the repository at this point in the history
Transactions: remove expired abort indexes
  • Loading branch information
rystsov committed Aug 19, 2022
2 parents df1b532 + adcc440 commit 2fbdd52
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class persisted_stm
void make_snapshot_in_background() final;
ss::future<> ensure_snapshot_exists(model::offset) final;
model::offset max_collectible_offset() override;
ss::future<> remove_persistent_state();
virtual ss::future<> remove_persistent_state();

ss::future<> make_snapshot();
/*
Expand Down
171 changes: 138 additions & 33 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
namespace cluster {
using namespace std::chrono_literals;

static ss::sstring abort_idx_name(model::offset first, model::offset last) {
return fmt::format("abort.idx.{}.{}", first, last);
}

static const model::violation_recovery_policy crash{
model::violation_recovery_policy::crash};
static const model::violation_recovery_policy best_effort{
Expand Down Expand Up @@ -1408,30 +1412,43 @@ static void filter_intersecting(

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
return do_aborted_transactions(from, to).finally(
[u = std::move(unit)] {});
});
}

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
}
for (auto& idx : _log_state.abort_indexes) {
std::vector<abort_index> intersecting_idxes;
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < from) {
continue;
}
if (idx.first > to) {
continue;
}
std::optional<abort_snapshot> opt;
if (_log_state.last_abort_snapshot.match(idx)) {
opt = _log_state.last_abort_snapshot;
auto opt = _log_state.last_abort_snapshot;
filter_intersecting(result, opt.aborted, from, to);
} else {
opt = co_await load_abort_snapshot(idx);
}
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
intersecting_idxes.push_back(idx);
}
}

filter_intersecting(result, _log_state.aborted, from, to);

for (const auto& idx : intersecting_idxes) {
auto opt = co_await load_abort_snapshot(idx);
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
}
}
co_return result;
}

Expand Down Expand Up @@ -1788,8 +1805,11 @@ ss::future<> rm_stm::apply_control(

_mem_state.forget(pid);

if (_log_state.aborted.size() > _abort_index_segment_size) {
co_await make_snapshot();
if (
_log_state.aborted.size() > _abort_index_segment_size
&& !_is_abort_idx_reduction_requested) {
ssx::spawn_with_gate(
_gate, [this] { return reduce_aborted_list(); });
}
} else if (crt == model::control_record_type::tx_commit) {
_log_state.prepared.erase(pid);
Expand All @@ -1805,6 +1825,18 @@ ss::future<> rm_stm::apply_control(
co_return;
}

ss::future<> rm_stm::reduce_aborted_list() {
if (_is_abort_idx_reduction_requested) {
return ss::now();
}
if (_log_state.aborted.size() <= _abort_index_segment_size) {
return ss::now();
}
_is_abort_idx_reduction_requested = true;
return make_snapshot().finally(
[this] { _is_abort_idx_reduction_requested = false; });
}

void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) {
if (bid.has_idempotent()) {
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
Expand Down Expand Up @@ -1985,29 +2017,84 @@ void rm_stm::fill_snapshot_wo_seqs(T& snapshot) {
}
}

ss::future<> rm_stm::offload_aborted_txns() {
// This method iterates through _log_state.aborted collection
// and the loop's body contains sync points (co_await) so w/o
// further assumptions it's impossible to guarantee that the
// collection isn't updated by another coroutine while the
// current coroutine waits.

// We should prevent it because an update invalidates the ite-
// rator and may lead to undefined behavior. In order to avoid
// this situation, offload_aborted_txns should be invoked only
// under _state_lock's write lock because all the other updators
// use the read lock.
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
}

ss::future<stm_snapshot> rm_stm::take_snapshot() {
if (_log_state.aborted.size() > _abort_index_segment_size) {
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
auto start_offset = _raft->start_offset();

std::vector<abort_index> abort_indexes;
std::vector<abort_index> expired_abort_indexes;
abort_indexes.reserve(_log_state.abort_indexes.size());

for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < start_offset) {
// caching expired indexes instead of removing them as we go
// to avoid giving control to another coroutine and managing
// concurrent access to _log_state.abort_indexes
expired_abort_indexes.push_back(idx);
} else {
abort_indexes.push_back(idx);
}
_log_state.aborted = snapshot.aborted;
}
_log_state.abort_indexes = std::move(abort_indexes);

for (const auto& idx : expired_abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}

std::vector<tx_range> aborted;
aborted.reserve(_log_state.aborted.size());
std::copy_if(
_log_state.aborted.begin(),
_log_state.aborted.end(),
std::back_inserter(aborted),
[start_offset](tx_range range) { return range.last >= start_offset; });
_log_state.aborted = std::move(aborted);

if (_log_state.aborted.size() > _abort_index_segment_size) {
co_await _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
// software engineer be careful and do not cause a deadlock.
// take_snapshot is invoked under the persisted_stm::_op_lock
// and here here we take write lock (_state_lock). most rm_stm
// operations require its read lock. however they don't depend
// of _op_lock so things are safe now
return offload_aborted_txns().finally([u = std::move(unit)] {});
});
}

iobuf tx_ss_buf;
Expand Down Expand Up @@ -2062,8 +2149,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
reflection::adl<abort_snapshot>{}.to(snapshot_data, snapshot);
int32_t snapshot_size = snapshot_data.size_bytes();

auto filename = fmt::format(
"abort.idx.{}.{}", snapshot.first, snapshot.last);
auto filename = abort_idx_name(snapshot.first, snapshot.last);
auto writer = co_await _abort_snapshot_mgr.start_snapshot(filename);

iobuf metadata_buf;
Expand All @@ -2077,7 +2163,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {

ss::future<std::optional<rm_stm::abort_snapshot>>
rm_stm::load_abort_snapshot(abort_index index) {
auto filename = fmt::format("abort.idx.{}.{}", index.first, index.last);
auto filename = abort_idx_name(index.first, index.last);

auto reader = co_await _abort_snapshot_mgr.open_snapshot(filename);
if (!reader) {
Expand Down Expand Up @@ -2112,6 +2198,25 @@ rm_stm::load_abort_snapshot(abort_index index) {
co_return data;
}

ss::future<> rm_stm::remove_persistent_state() {
// the write lock drains all ongoing operations and prevents
// modification of _log_state.abort_indexes while we iterate
// through it
return _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
return do_remove_persistent_state().finally([u = std::move(unit)] {});
});
}

ss::future<> rm_stm::do_remove_persistent_state() {
for (const auto& idx : _log_state.abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}
co_await _abort_snapshot_mgr.remove_partial_snapshots();
co_return co_await persisted_stm::remove_persistent_state();
}

ss::future<> rm_stm::handle_eviction() {
return _state_lock.hold_write_lock().then(
[this]([[maybe_unused]] ss::basic_rwlock<>::holder unit) {
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,15 @@ class rm_stm final : public persisted_stm {

ss::future<std::error_code> mark_expired(model::producer_identity pid);

ss::future<> remove_persistent_state() override;

protected:
ss::future<> handle_eviction() override;

private:
ss::future<> do_remove_persistent_state();
ss::future<std::vector<rm_stm::tx_range>>
do_aborted_transactions(model::offset, model::offset);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
ss::future<tx_errc> do_prepare_tx(
Expand Down Expand Up @@ -337,6 +342,9 @@ class rm_stm final : public persisted_stm {
apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, model::offset);

ss::future<> reduce_aborted_list();
ss::future<> offload_aborted_txns();

// The state of this state machine maybe change via two paths
//
// - by reading the already replicated commands from raft and
Expand Down Expand Up @@ -505,6 +513,7 @@ class rm_stm final : public persisted_stm {
void fill_snapshot_wo_seqs(T&);

ss::basic_rwlock<> _state_lock;
bool _is_abort_idx_reduction_requested{false};
absl::flat_hash_map<model::producer_id, ss::lw_shared_ptr<mutex>> _tx_locks;
absl::flat_hash_map<
model::producer_identity,
Expand Down
Loading

0 comments on commit 2fbdd52

Please sign in to comment.