Skip to content

Commit

Permalink
Merge pull request #5027 from rystsov/availability-tx
Browse files Browse the repository at this point in the history
rm_stm: make recovery from memory reset faster
  • Loading branch information
rystsov committed Jun 22, 2022
2 parents 84fa98c + 15e7d97 commit fc9b0a0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
40 changes: 35 additions & 5 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,35 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
if (tx_seq) {
auto origin = get_abort_origin(pid, tx_seq.value());
if (origin == abort_origin::past) {
// rejecting a delayed abort command to prevent aborting
// a wrong transaction
// An abort request has older tx_seq. It may mean than the request
// was dublicated, delayed and retried later.
//
// Or it may mean that a tx coordinator
// - lost its state
// - rolled back to previous op
// - the previous op happend to be an abort
// - the coordinator retried it
//
// In the first case the least impactful way to reject the request.
// But in the second case rejection will only cause the retry loop
// which blocks a transaction until the hanging tx expires (by
// default within one minute).
//
// It's more unlikely to receiving a message from the past so we
// improving the second case by aborting the ongoing tx before it's
// expired.
//
// If it happens to be the first case then Redpanda rejects a
// client's tx.
auto expiration_it = _mem_state.expiration.find(pid);
if (expiration_it != _mem_state.expiration.end()) {
expiration_it->second.is_expiration_requested = true;
}
// spawing abort in the background and returning an error to
// release locks on the tx coordinator to prevent distributed
// deadlock
ssx::spawn_with_gate(
_gate, [this, pid] { return try_abort_old_tx(pid); });
co_return tx_errc::request_rejected;
}
if (origin == abort_origin::future) {
Expand Down Expand Up @@ -1017,6 +1044,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
co_return errc::generic_tx_error;
}
expiration_it->second.last_update = clock_type::now();
expiration_it->second.is_expiration_requested = false;

auto replicated = r.value();

Expand Down Expand Up @@ -1206,7 +1234,9 @@ void rm_stm::track_tx(
return;
}
_mem_state.expiration[pid] = expiration_info{
.timeout = transaction_timeout_ms, .last_update = clock_type::now()};
.timeout = transaction_timeout_ms,
.last_update = clock_type::now(),
.is_expiration_requested = false};
if (!_is_autoabort_enabled) {
return;
}
Expand Down Expand Up @@ -1245,7 +1275,7 @@ ss::future<> rm_stm::do_abort_old_txes() {
for (auto pid : pids) {
auto expiration_it = _mem_state.expiration.find(pid);
if (expiration_it != _mem_state.expiration.end()) {
if (expiration_it->second.deadline() > clock_type::now()) {
if (!expiration_it->second.is_expired(clock_type::now())) {
continue;
}
}
Expand Down Expand Up @@ -1301,7 +1331,7 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) {

auto expiration_it = _mem_state.expiration.find(pid);
if (expiration_it != _mem_state.expiration.end()) {
if (expiration_it->second.deadline() > clock_type::now()) {
if (!expiration_it->second.is_expired(clock_type::now())) {
co_return;
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,13 @@ class rm_stm final : public persisted_stm {
struct expiration_info {
duration_type timeout;
time_point_type last_update;
bool is_expiration_requested;

time_point_type deadline() const { return last_update + timeout; }

bool is_expired(time_point_type now) const {
return is_expiration_requested || deadline() <= now;
}
};

struct transaction_info {
Expand Down

0 comments on commit fc9b0a0

Please sign in to comment.