From 15e7d9768038f903eaa4eb2067ede0a7c886b804 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 3 Jun 2022 17:01:53 -0700 Subject: [PATCH] rm_stm: make recovery from memory reset faster --- src/v/cluster/rm_stm.cc | 40 +++++++++++++++++++++++++++++++++++----- src/v/cluster/rm_stm.h | 5 +++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 0a27bde17d1b..986a853a876b 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -683,8 +683,35 @@ ss::future rm_stm::do_abort_tx( if (tx_seq) { auto origin = get_abort_origin(pid, tx_seq.value()); if (origin == abort_origin::past) { - // rejecting a delayed abort command to prevent aborting - // a wrong transaction + // An abort request has older tx_seq. It may mean than the request + // was dublicated, delayed and retried later. + // + // Or it may mean that a tx coordinator + // - lost its state + // - rolled back to previous op + // - the previous op happend to be an abort + // - the coordinator retried it + // + // In the first case the least impactful way to reject the request. + // But in the second case rejection will only cause the retry loop + // which blocks a transaction until the hanging tx expires (by + // default within one minute). + // + // It's more unlikely to receiving a message from the past so we + // improving the second case by aborting the ongoing tx before it's + // expired. + // + // If it happens to be the first case then Redpanda rejects a + // client's tx. + auto expiration_it = _mem_state.expiration.find(pid); + if (expiration_it != _mem_state.expiration.end()) { + expiration_it->second.is_expiration_requested = true; + } + // spawing abort in the background and returning an error to + // release locks on the tx coordinator to prevent distributed + // deadlock + ssx::spawn_with_gate( + _gate, [this, pid] { return try_abort_old_tx(pid); }); co_return tx_errc::request_rejected; } if (origin == abort_origin::future) { @@ -1017,6 +1044,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { co_return errc::generic_tx_error; } expiration_it->second.last_update = clock_type::now(); + expiration_it->second.is_expiration_requested = false; auto replicated = r.value(); @@ -1206,7 +1234,9 @@ void rm_stm::track_tx( return; } _mem_state.expiration[pid] = expiration_info{ - .timeout = transaction_timeout_ms, .last_update = clock_type::now()}; + .timeout = transaction_timeout_ms, + .last_update = clock_type::now(), + .is_expiration_requested = false}; if (!_is_autoabort_enabled) { return; } @@ -1245,7 +1275,7 @@ ss::future<> rm_stm::do_abort_old_txes() { for (auto pid : pids) { auto expiration_it = _mem_state.expiration.find(pid); if (expiration_it != _mem_state.expiration.end()) { - if (expiration_it->second.deadline() > clock_type::now()) { + if (!expiration_it->second.is_expired(clock_type::now())) { continue; } } @@ -1301,7 +1331,7 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) { auto expiration_it = _mem_state.expiration.find(pid); if (expiration_it != _mem_state.expiration.end()) { - if (expiration_it->second.deadline() > clock_type::now()) { + if (!expiration_it->second.is_expired(clock_type::now())) { co_return; } } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 65ab2bb9bafe..5f2d7e4461d2 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -182,8 +182,13 @@ class rm_stm final : public persisted_stm { struct expiration_info { duration_type timeout; time_point_type last_update; + bool is_expiration_requested; time_point_type deadline() const { return last_update + timeout; } + + bool is_expired(time_point_type now) const { + return is_expiration_requested || deadline() <= now; + } }; struct transaction_info {