Skip to content

Commit

Permalink
rm_stm: synchronize abort_indexes access
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Aug 13, 2022
1 parent b163c8b commit b3fd299
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 28 deletions.
92 changes: 64 additions & 28 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,15 @@ static void filter_intersecting(

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
return do_aborted_transactions(from, to).finally(
[u = std::move(unit)] {});
});
}

ss::future<std::vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
Expand Down Expand Up @@ -2008,6 +2017,41 @@ void rm_stm::fill_snapshot_wo_seqs(T& snapshot) {
}
}

ss::future<> rm_stm::offload_aborted_txns() {
// This method iterates through _log_state.aborted collection
// and the loop's body contains sync points (co_await) so w/o
// further assumptions it's impossible to guarantee that the
// collection isn't updated by another coroutine while the
// current coroutine waits.

// We should prevent it because an update invalidates the ite-
// rator and may lead to undefined behavior. In order to avoid
// this situation, offload_aborted_txns should be invoked only
// under _state_lock's write lock because all the other updators
// use the read lock.
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
}

ss::future<stm_snapshot> rm_stm::take_snapshot() {
auto start_offset = _raft->start_offset();

Expand All @@ -2018,7 +2062,7 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < start_offset) {
// caching expired indexes instead of removing them as we go
// to avoid giving a control to another coroutine and managing
// to avoid giving control to another coroutine and managing
// concurrent access to _log_state.abort_indexes
expired_abort_indexes.push_back(idx);
} else {
Expand All @@ -2042,30 +2086,15 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
_log_state.aborted = std::move(aborted);

if (_log_state.aborted.size() > _abort_index_segment_size) {
std::sort(
std::begin(_log_state.aborted),
std::end(_log_state.aborted),
[](tx_range a, tx_range b) { return a.first < b.first; });

abort_snapshot snapshot{
.first = model::offset::max(), .last = model::offset::min()};
// DANGER: _log_state.aborted should be processed all at once without
// pauses (co_await) otherwise there is a risk that other fiber may
// modify it
for (auto const& entry : _log_state.aborted) {
snapshot.first = std::min(snapshot.first, entry.first);
snapshot.last = std::max(snapshot.last, entry.last);
snapshot.aborted.push_back(entry);
if (snapshot.aborted.size() == _abort_index_segment_size) {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
co_await _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
// software engineer be careful and do not cause a deadlock.
// take_snapshot is invoked under the persisted_stm::_op_lock
// and here here we take write lock (_state_lock). most rm_stm
// operations require its read lock. however they don't depend
// of _op_lock so things are safe now
return offload_aborted_txns().finally([u = std::move(unit)] {});
});
}

iobuf tx_ss_buf;
Expand Down Expand Up @@ -2170,9 +2199,16 @@ rm_stm::load_abort_snapshot(abort_index index) {
}

ss::future<> rm_stm::remove_persistent_state() {
// DANGER: _log_state.abort_indexes should be processed all at once without
// pauses (co_await) otherwise there is a risk that other fiber may modify
// it
// the write lock drains all ongoing operations and prevents
// modification of _log_state.abort_indexes while we iterate
// through it
return _state_lock.hold_write_lock().then(
[this](ss::basic_rwlock<>::holder unit) {
return do_remove_persistent_state().finally([u = std::move(unit)] {});
});
}

ss::future<> rm_stm::do_remove_persistent_state() {
for (const auto& idx : _log_state.abort_indexes) {
auto filename = abort_idx_name(idx.first, idx.last);
co_await _abort_snapshot_mgr.remove_snapshot(filename);
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ class rm_stm final : public persisted_stm {
ss::future<> handle_eviction() override;

private:
ss::future<> do_remove_persistent_state();
ss::future<std::vector<rm_stm::tx_range>>
do_aborted_transactions(model::offset, model::offset);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
ss::future<tx_errc> do_prepare_tx(
Expand Down Expand Up @@ -340,6 +343,7 @@ class rm_stm final : public persisted_stm {
void apply_data(model::batch_identity, model::offset);

ss::future<> reduce_aborted_list();
ss::future<> offload_aborted_txns();

// The state of this state machine maybe change via two paths
//
Expand Down

0 comments on commit b3fd299

Please sign in to comment.