Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions: remove expired abort indexes #5556

Merged
merged 3 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class persisted_stm
void make_snapshot_in_background() final;
ss::future<> ensure_snapshot_exists(model::offset) final;
model::offset max_collectible_offset() override;
ss::future<> remove_persistent_state();
virtual ss::future<> remove_persistent_state();

ss::future<> make_snapshot();
/*
Expand Down
171 changes: 138 additions & 33 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
namespace cluster {
using namespace std::chrono_literals;

static ss::sstring abort_idx_name(model::offset first, model::offset last) {
return fmt::format("abort.idx.{}.{}", first, last);
}

static const model::violation_recovery_policy crash{
model::violation_recovery_policy::crash};
static const model::violation_recovery_policy best_effort{
Expand Down Expand Up @@ -1408,30 +1412,43 @@ static void filter_intersecting(

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
rystsov marked this conversation as resolved.
Show resolved Hide resolved
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
return do_aborted_transactions(from, to).finally(
[u = std::move(unit)] {});
});
}

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
}
for (auto& idx : _log_state.abort_indexes) {
std::vector<abort_index> intersecting_idxes;
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < from) {
continue;
}
if (idx.first > to) {
continue;
}
std::optional<abort_snapshot> opt;
if (_log_state.last_abort_snapshot.match(idx)) {
opt = _log_state.last_abort_snapshot;
auto opt = _log_state.last_abort_snapshot;
filter_intersecting(result, opt.aborted, from, to);
} else {
opt = co_await load_abort_snapshot(idx);
}
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
intersecting_idxes.push_back(idx);
}
}

filter_intersecting(result, _log_state.aborted, from, to);

for (const auto& idx : intersecting_idxes) {
rystsov marked this conversation as resolved.
Show resolved Hide resolved
auto opt = co_await load_abort_snapshot(idx);
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
}
}
co_return result;
}

Expand Down Expand Up @@ -1788,8 +1805,11 @@ ss::future<> rm_stm::apply_control(

_mem_state.forget(pid);

if (_log_state.aborted.size() > _abort_index_segment_size) {
co_await make_snapshot();
if (
_log_state.aborted.size() > _abort_index_segment_size
&& !_is_abort_idx_reduction_requested) {
ssx::spawn_with_gate(
_gate, [this] { return reduce_aborted_list(); });
rystsov marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (crt == model::control_record_type::tx_commit) {
_log_state.prepared.erase(pid);
Expand All @@ -1805,6 +1825,18 @@ ss::future<> rm_stm::apply_control(
co_return;
}

ss::future<> rm_stm::reduce_aborted_list() {
if (_is_abort_idx_reduction_requested) {
return ss::now();
}
if (_log_state.aborted.size() <= _abort_index_segment_size) {
return ss::now();
}
_is_abort_idx_reduction_requested = true;
return make_snapshot().finally(
[this] { _is_abort_idx_reduction_requested = false; });
}

void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) {
if (bid.has_idempotent()) {
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
Expand Down Expand Up @@ -1985,29 +2017,84 @@ void rm_stm::fill_snapshot_wo_seqs(T& snapshot) {
}
}

ss::future<> rm_stm::offload_aborted_txns() {
// This method iterates through _log_state.aborted collection
// and the loop's body contains sync points (co_await) so w/o
// further assumptions it's impossible to guarantee that the
// collection isn't updated by another coroutine while the
// current coroutine waits.

// We should prevent it because an update invalidates the ite-
// rator and may lead to undefined behavior. In order to avoid
// this situation, offload_aborted_txns should be invoked only
// under _state_lock's write lock because all the other updators
// use the read lock.
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
}

ss::future<stm_snapshot> rm_stm::take_snapshot() {
if (_log_state.aborted.size() > _abort_index_segment_size) {
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
auto start_offset = _raft->start_offset();

std::vector<abort_index> abort_indexes;
std::vector<abort_index> expired_abort_indexes;
abort_indexes.reserve(_log_state.abort_indexes.size());
rystsov marked this conversation as resolved.
Show resolved Hide resolved

for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < start_offset) {
// caching expired indexes instead of removing them as we go
// to avoid giving control to another coroutine and managing
// concurrent access to _log_state.abort_indexes
expired_abort_indexes.push_back(idx);
} else {
abort_indexes.push_back(idx);
}
_log_state.aborted = snapshot.aborted;
}
_log_state.abort_indexes = std::move(abort_indexes);
rystsov marked this conversation as resolved.
Show resolved Hide resolved

for (const auto& idx : expired_abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}

std::vector<tx_range> aborted;
aborted.reserve(_log_state.aborted.size());
std::copy_if(
_log_state.aborted.begin(),
_log_state.aborted.end(),
std::back_inserter(aborted),
[start_offset](tx_range range) { return range.last >= start_offset; });
_log_state.aborted = std::move(aborted);

rystsov marked this conversation as resolved.
Show resolved Hide resolved
if (_log_state.aborted.size() > _abort_index_segment_size) {
co_await _state_lock.hold_write_lock().then(
rystsov marked this conversation as resolved.
Show resolved Hide resolved
[this](ss::basic_rwlock<>::holder unit) {
// software engineer be careful and do not cause a deadlock.
// take_snapshot is invoked under the persisted_stm::_op_lock
// and here here we take write lock (_state_lock). most rm_stm
// operations require its read lock. however they don't depend
// of _op_lock so things are safe now
return offload_aborted_txns().finally([u = std::move(unit)] {});
});
}

iobuf tx_ss_buf;
Expand Down Expand Up @@ -2062,8 +2149,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
reflection::adl<abort_snapshot>{}.to(snapshot_data, snapshot);
int32_t snapshot_size = snapshot_data.size_bytes();

auto filename = fmt::format(
"abort.idx.{}.{}", snapshot.first, snapshot.last);
auto filename = abort_idx_name(snapshot.first, snapshot.last);
auto writer = co_await _abort_snapshot_mgr.start_snapshot(filename);

iobuf metadata_buf;
Expand All @@ -2077,7 +2163,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {

ss::future<std::optional<rm_stm::abort_snapshot>>
rm_stm::load_abort_snapshot(abort_index index) {
auto filename = fmt::format("abort.idx.{}.{}", index.first, index.last);
auto filename = abort_idx_name(index.first, index.last);

auto reader = co_await _abort_snapshot_mgr.open_snapshot(filename);
if (!reader) {
Expand Down Expand Up @@ -2112,6 +2198,25 @@ rm_stm::load_abort_snapshot(abort_index index) {
co_return data;
}

ss::future<> rm_stm::remove_persistent_state() {
// the write lock drains all ongoing operations and prevents
// modification of _log_state.abort_indexes while we iterate
// through it
return _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
return do_remove_persistent_state().finally([u = std::move(unit)] {});
});
}

ss::future<> rm_stm::do_remove_persistent_state() {
for (const auto& idx : _log_state.abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}
co_await _abort_snapshot_mgr.remove_partial_snapshots();
co_return co_await persisted_stm::remove_persistent_state();
rystsov marked this conversation as resolved.
Show resolved Hide resolved
}

ss::future<> rm_stm::handle_eviction() {
return _state_lock.hold_write_lock().then(
[this]([[maybe_unused]] ss::basic_rwlock<>::holder unit) {
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,15 @@ class rm_stm final : public persisted_stm {

ss::future<std::error_code> mark_expired(model::producer_identity pid);

ss::future<> remove_persistent_state() override;

protected:
ss::future<> handle_eviction() override;

private:
ss::future<> do_remove_persistent_state();
ss::future<std::vector<rm_stm::tx_range>>
do_aborted_transactions(model::offset, model::offset);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
ss::future<tx_errc> do_prepare_tx(
Expand Down Expand Up @@ -337,6 +342,9 @@ class rm_stm final : public persisted_stm {
apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, model::offset);

ss::future<> reduce_aborted_list();
ss::future<> offload_aborted_txns();

// The state of this state machine maybe change via two paths
//
// - by reading the already replicated commands from raft and
Expand Down Expand Up @@ -505,6 +513,7 @@ class rm_stm final : public persisted_stm {
void fill_snapshot_wo_seqs(T&);

ss::basic_rwlock<> _state_lock;
bool _is_abort_idx_reduction_requested{false};
absl::flat_hash_map<model::producer_id, ss::lw_shared_ptr<mutex>> _tx_locks;
absl::flat_hash_map<
model::producer_identity,
Expand Down
Loading