diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index f4258c7d4ea3f..48498e61708d3 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -769,13 +769,7 @@ ss::future> rm_stm::do_replicate( }) .finally([u = std::move(unit)] {}); } else if (bid.has_idempotent()) { - request_id rid{.pid = bid.pid, .seq = bid.first_seq}; - return with_request_lock( - rid, - [this, enqueued, bid, opts, b = std::move(b)]() mutable { - return replicate_seq( - bid, std::move(b), opts, enqueued); - }) + return replicate_seq(bid, std::move(b), opts, enqueued) .finally([u = std::move(unit)] {}); } @@ -914,6 +908,14 @@ rm_stm::known_seq(model::batch_identity bid) const { return std::nullopt; } +std::optional rm_stm::tail_seq(model::producer_identity pid) const { + auto pid_seq = _log_state.seq_table.find(pid); + if (pid_seq == _log_state.seq_table.end()) { + return std::nullopt; + } + return pid_seq->second.seq; +} + void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { @@ -1057,7 +1059,7 @@ ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, - [[maybe_unused]] ss::lw_shared_ptr> enqueued) { + ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout)) { co_return errc::not_leader; } @@ -1069,35 +1071,179 @@ ss::future> rm_stm::replicate_seq( "first_seq={} of the batch should be less or equal to last_seq={}", bid.first_seq, bid.last_seq); - outcome->set_value(); co_return errc::generic_tx_error; } + // getting session by client's pid + ss::lw_shared_ptr session; + auto session_it = _inflight_requests.find(bid.pid); + if (session_it == _inflight_requests.end()) { + session = ss::make_lw_shared(); + _inflight_requests.try_emplace(bid.pid, session); + } else { + session = session_it->second; + } + + // critical section, rm_stm evaluates the request its order and + // whether or not it's duplicated + // ASSUMING THAT ALL INFLIGHT REPLICATION REQUESTS WILL SUCCEED + // if any of the request fail we expect that the leader step down + auto u = co_await session->lock.get_units(); + + if (!_c->is_leader() || _c->term() != synced_term) { + co_return errc::not_leader; + } + + if (session->term < synced_term) { + // we don't care about old inflight requests because the sync + // guarantee (all replicated records of the last term are + // applied) makes sure that the passed inflight records got + // into _log_state->seq_table + session->forget(); + session->term = synced_term; + } + + // checking if the request (identified by seq) is already resolved + // checking among the pending requests + auto cached_r = session->known_seq(bid.last_seq); + if (cached_r) { + co_return cached_r.value(); + } + // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + co_return raft::replicate_result{.last_offset = cached_offset.value()}; + } + + // checking if the request is already being processed + for (auto inflight : session->cache) { + if (inflight->last_seq == bid.last_seq && inflight->is_processing) { + // found an inflight request, parking the current request + // until the former is resolved + auto promise = ss::make_lw_shared< + available_promise>>(); + inflight->parked.push_back(promise); + u.return_all(); + co_return co_await promise->get_future(); + } + } + + // apparantly we process an unseen request + // checking if it isn't out of order with the already procceses + // or inflight requests + if (session->cache.size() == 0) { + // no inflight requests > checking processed + auto tail = tail_seq(bid.pid); + if (tail) { + if (!is_sequence(tail.value(), bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest processed request + co_return errc::sequence_out_of_order; + } + } else { + if (bid.first_seq != 0) { + // first request in a session should have seq=0 + co_return errc::sequence_out_of_order; + } + } + } else { + if (!is_sequence(session->tail_seq, bid.first_seq)) { + // there is a gap between the request'ss seq number + // and the seq number of the latest inflight request + co_return errc::sequence_out_of_order; + } + } + + // updating last observed seq, since we hold the mutex + // it's guranteed to be latest + session->tail_seq = bid.last_seq; + + auto request = ss::make_lw_shared(); + request->last_seq = bid.last_seq; + request->is_processing = true; + session->cache.push_back(request); + + // request comes in the right order, it's ok to replicate + ss::lw_shared_ptr ss; + auto has_failed = false; + try { + ss = _c->replicate_in_stages(synced_term, std::move(br), opts); + co_await std::move(ss->request_enqueued); + } catch (...) { + vlog( + clusterlog.warn, + "replication failed with {}", + std::current_exception()); + has_failed = true; + } + + result r = errc::success; + + if (has_failed) { + if (_c->is_leader() && _c->term() == synced_term) { + // we may not care about the requests waiting on the lock + // as soon as we release the lock the leader or term will + // be changed so the pending fibers won't pass the initial + // checks and be rejected with not_leader + co_await _c->step_down(); + } + u.return_all(); + r = errc::generic_tx_error; + } else { + try { + u.return_all(); + enqueued->set_value(); + r = co_await std::move(ss->replicate_finished); + } catch (...) { vlog( clusterlog.warn, - "Status of the original attempt is unknown (still is in-flight " - "or failed). Failing a retried batch with the same pid:{} & " - "seq:{} to avoid duplicates", - bid.pid, - bid.last_seq); - // kafka clients don't automaticly handle fatal errors - // so when they encounter the error they will be forced - // to propagate it to the app layer - co_return errc::generic_tx_error; + "replication failed with {}", + std::current_exception()); + r = errc::generic_tx_error; } - co_return raft::replicate_result{.last_offset = cached_offset.value()}; } - if (!check_seq(bid)) { - co_return errc::sequence_out_of_order; + // we don't need session->lock because we never interleave + // access to is_processing and offset with sync point (await) + request->is_processing = false; + request->r = r; + for (auto pending : request->parked) { + pending->set_value(r); } - auto r = co_await _c->replicate(synced_term, std::move(br), opts); - if (r) { - set_seq(bid, r.value().last_offset); + request->parked.clear(); + + if (!r) { + // if r was failed at the consensus level (not because has_failed) + // it should guarantee that all follow up replication requests fail + // too but just in case stepping down to minimize the risk + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } + co_return r; + } + + while (session->cache.size() > 0 + && !session->cache.front()->is_processing) { + auto front = session->cache.front(); + if (!front->r) { + break; + } + auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + if (inserted) { + seq_it->second.pid = bid.pid; + seq_it->second.seq = front->last_seq; + seq_it->second.last_offset = front->r.value().last_offset; + } else { + seq_it->second.update( + front->last_seq, front->r.value().last_offset); + } + session->cache.pop_front(); } + + if (session->cache.size() == 0 && session->lock.ready()) { + _inflight_requests.erase(bid.pid); + } + co_return r; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 49b4ba7a01a96..c170f17b8c088 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -268,6 +268,7 @@ class rm_stm final : public persisted_stm { std::optional known_seq(model::batch_identity) const; void set_seq(model::batch_identity, model::offset); void reset_seq(model::batch_identity); + std::optional tail_seq(model::producer_identity) const; ss::future> do_replicate( model::batch_identity, @@ -409,22 +410,43 @@ class rm_stm final : public persisted_stm { } }; - template - auto with_request_lock(request_id rid, Func&& f) { - auto lock_it = _request_locks.find(rid); - if (lock_it == _request_locks.end()) { - lock_it - = _request_locks.emplace(rid, ss::make_lw_shared()).first; + struct inflight_request { + int32_t last_seq{-1}; + result r = errc::success; + bool is_processing; + std::vector< + ss::lw_shared_ptr>>> + parked; + }; + + struct inflight_requests { + mutex lock; + int32_t tail_seq{-1}; + model::term_id term; + ss::circular_buffer> cache; + + void forget() { + for (auto inflight : cache) { + if (inflight->is_processing) { + for (auto pending : inflight->parked) { + pending->set_value(errc::generic_tx_error); + } + } + } + cache.clear(); + tail_seq = -1; } - auto r_lock = lock_it->second; - return r_lock->with(std::forward(f)) - .finally([this, r_lock, rid]() { - if (r_lock->waiters() == 0) { - // this fiber is the last holder of the lock - _request_locks.erase(rid); - } - }); - } + + std::optional> + known_seq(int32_t last_seq) const { + for (auto seq : cache) { + if (seq->last_seq == last_seq && !seq->is_processing) { + return seq->r; + } + } + return std::nullopt; + } + }; ss::lw_shared_ptr get_tx_lock(model::producer_id pid) { auto lock_it = _tx_locks.find(pid); @@ -443,7 +465,10 @@ class rm_stm final : public persisted_stm { ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; - absl::flat_hash_map> _request_locks; + absl::flat_hash_map< + model::producer_identity, + ss::lw_shared_ptr> + _inflight_requests; log_state _log_state; mem_state _mem_state; ss::timer auto_abort_timer;