Skip to content

Commit

Permalink
rm_stm: synchronize abort_indexes access
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Aug 18, 2022
1 parent a0e28f5 commit adcc440
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 30 deletions.
121 changes: 91 additions & 30 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1412,30 +1412,43 @@ static void filter_intersecting(

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
return do_aborted_transactions(from, to).finally(
[u = std::move(unit)] {});
});
}

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
}
std::vector<abort_index> intersecting_idxes;
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < from) {
continue;
}
if (idx.first > to) {
continue;
}
std::optional<abort_snapshot> opt;
if (_log_state.last_abort_snapshot.match(idx)) {
opt = _log_state.last_abort_snapshot;
auto opt = _log_state.last_abort_snapshot;
filter_intersecting(result, opt.aborted, from, to);
} else {
opt = co_await load_abort_snapshot(idx);
}
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
intersecting_idxes.push_back(idx);
}
}

filter_intersecting(result, _log_state.aborted, from, to);

for (const auto& idx : intersecting_idxes) {
auto opt = co_await load_abort_snapshot(idx);
if (opt) {
filter_intersecting(result, opt->aborted, from, to);
}
}
co_return result;
}

Expand Down Expand Up @@ -1792,8 +1805,11 @@ ss::future<> rm_stm::apply_control(

_mem_state.forget(pid);

if (_log_state.aborted.size() > _abort_index_segment_size) {
co_await make_snapshot();
if (
_log_state.aborted.size() > _abort_index_segment_size
&& !_is_abort_idx_reduction_requested) {
ssx::spawn_with_gate(
_gate, [this] { return reduce_aborted_list(); });
}
} else if (crt == model::control_record_type::tx_commit) {
_log_state.prepared.erase(pid);
Expand All @@ -1809,6 +1825,18 @@ ss::future<> rm_stm::apply_control(
co_return;
}

ss::future<> rm_stm::reduce_aborted_list() {
if (_is_abort_idx_reduction_requested) {
return ss::now();
}
if (_log_state.aborted.size() <= _abort_index_segment_size) {
return ss::now();
}
_is_abort_idx_reduction_requested = true;
return make_snapshot().finally(
[this] { _is_abort_idx_reduction_requested = false; });
}

void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) {
if (bid.has_idempotent()) {
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
Expand Down Expand Up @@ -1989,6 +2017,41 @@ void rm_stm::fill_snapshot_wo_seqs(T& snapshot) {
}
}

ss::future<> rm_stm::offload_aborted_txns() {
// This method iterates through _log_state.aborted collection
// and the loop's body contains sync points (co_await) so w/o
// further assumptions it's impossible to guarantee that the
// collection isn't updated by another coroutine while the
// current coroutine waits.

// We should prevent it because an update invalidates the ite-
// rator and may lead to undefined behavior. In order to avoid
// this situation, offload_aborted_txns should be invoked only
// under _state_lock's write lock because all the other updators
// use the read lock.
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
}

ss::future<stm_snapshot> rm_stm::take_snapshot() {
auto start_offset = _raft->start_offset();

Expand All @@ -1999,7 +2062,7 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < start_offset) {
// caching expired indexes instead of removing them as we go
// to avoid giving a control to another coroutine and managing
// to avoid giving control to another coroutine and managing
// concurrent access to _log_state.abort_indexes
expired_abort_indexes.push_back(idx);
} else {
Expand All @@ -2023,27 +2086,15 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
_log_state.aborted = std::move(aborted);

if (_log_state.aborted.size() > _abort_index_segment_size) {
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
co_await _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
// software engineer be careful and do not cause a deadlock.
// take_snapshot is invoked under the persisted_stm::_op_lock
// and here here we take write lock (_state_lock). most rm_stm
// operations require its read lock. however they don't depend
// of _op_lock so things are safe now
return offload_aborted_txns().finally([u = std::move(unit)] {});
});
}

iobuf tx_ss_buf;
Expand Down Expand Up @@ -2148,6 +2199,16 @@ rm_stm::load_abort_snapshot(abort_index index) {
}

ss::future<> rm_stm::remove_persistent_state() {
// the write lock drains all ongoing operations and prevents
// modification of _log_state.abort_indexes while we iterate
// through it
return _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
return do_remove_persistent_state().finally([u = std::move(unit)] {});
});
}

ss::future<> rm_stm::do_remove_persistent_state() {
for (const auto& idx : _log_state.abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ class rm_stm final : public persisted_stm {
ss::future<> handle_eviction() override;

private:
ss::future<> do_remove_persistent_state();
ss::future<std::vector<rm_stm::tx_range>>
do_aborted_transactions(model::offset, model::offset);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
ss::future<tx_errc> do_prepare_tx(
Expand Down Expand Up @@ -339,6 +342,9 @@ class rm_stm final : public persisted_stm {
apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, model::offset);

ss::future<> reduce_aborted_list();
ss::future<> offload_aborted_txns();

// The state of this state machine maybe change via two paths
//
// - by reading the already replicated commands from raft and
Expand Down Expand Up @@ -507,6 +513,7 @@ class rm_stm final : public persisted_stm {
void fill_snapshot_wo_seqs(T&);

ss::basic_rwlock<> _state_lock;
bool _is_abort_idx_reduction_requested{false};
absl::flat_hash_map<model::producer_id, ss::lw_shared_ptr<mutex>> _tx_locks;
absl::flat_hash_map<
model::producer_identity,
Expand Down

0 comments on commit adcc440

Please sign in to comment.