diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4004498c2d47..0547135d0617 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -175,14 +175,12 @@ kafka_stages partition::replicate_in_stages( } } - ss::lw_shared_ptr res; if (_rm_stm) { - res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); - } else { - res = _raft->replicate_in_stages(std::move(r), opts); + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } - auto replicate_finished = res->replicate_finished.then( + auto res = _raft->replicate_in_stages(std::move(r), opts); + auto replicate_finished = res.replicate_finished.then( [this](result r) { if (!r) { return ret_t(r.error()); @@ -193,7 +191,7 @@ kafka_stages partition::replicate_in_stages( return ret_t(kafka_result{new_offset}); }); return kafka_stages( - std::move(res->request_enqueued), std::move(replicate_finished)); + std::move(res.request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index f1435e79a497..328ebfca2b67 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -785,7 +785,7 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } -raft::replicate_stages rm_stm::replicate_in_stages( +kafka_stages rm_stm::replicate_in_stages( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -804,10 +804,10 @@ raft::replicate_stages rm_stm::replicate_in_stages( enqueued->set_value(); } }); - return raft::replicate_stages(std::move(f), std::move(replicate_finished)); + return kafka_stages(std::move(f), std::move(replicate_finished)); } -ss::future> rm_stm::replicate( +ss::future> rm_stm::replicate( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -824,7 +824,7 @@ rm_stm::transfer_leadership(std::optional target) { }); } -ss::future> rm_stm::do_replicate( +ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, raft::replicate_options opts, @@ -854,6 +854,11 @@ ss::future<> rm_stm::stop() { return raft::state_machine::stop(); } +ss::future<> rm_stm::start() { + _translator = _c->get_offset_translator_state(); + return persisted_stm::start(); +} + rm_stm::transaction_info::status_t rm_stm::get_tx_status(model::producer_identity pid) const { if (_mem_state.preparing.contains(pid)) { @@ -947,7 +952,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return false; } - seq.update(bid.last_seq, model::offset{-1}); + seq.update(bid.last_seq, kafka::offset{-1}); seq.pid = bid.pid; seq.last_write_timestamp = last_write_timestamp; @@ -957,7 +962,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return true; } -std::optional +std::optional rm_stm::known_seq(model::batch_identity bid) const { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq == _log_state.seq_table.end()) { @@ -982,7 +987,7 @@ std::optional rm_stm::tail_seq(model::producer_identity pid) const { return pid_seq->second.seq; } -void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { +void rm_stm::set_seq(model::batch_identity bid, kafka::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { if (pid_seq->second.seq == bid.last_seq) { @@ -995,14 +1000,14 @@ void rm_stm::reset_seq(model::batch_identity bid) { _log_state.seq_table.erase(bid.pid); auto& seq = _log_state.seq_table[bid.pid]; seq.seq = bid.last_seq; - seq.last_offset = model::offset{-1}; + seq.last_offset = kafka::offset{-1}; seq.pid = bid.pid; seq.last_write_timestamp = model::timestamp::now().value(); _oldest_session = std::min( _oldest_session, model::timestamp(seq.last_write_timestamp)); } -ss::future> +ss::future> rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { if (!check_tx_permitted()) { co_return errc::generic_tx_error; @@ -1056,7 +1061,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // this isn't the first attempt in the tx we should try dedupe auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + if (cached_offset.value() < kafka::offset{0}) { vlog( clusterlog.warn, "Status of the original attempt is unknown (still is " @@ -1070,8 +1075,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // to propagate it to the app layer co_return errc::generic_tx_error; } - co_return raft::replicate_result{ - .last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } if (!check_seq(bid)) { @@ -1107,26 +1111,28 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { expiration_it->second.last_update = clock_type::now(); expiration_it->second.is_expiration_requested = false; - auto replicated = r.value(); + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); - set_seq(bid, replicated.last_offset); + set_seq(bid, new_offset); - auto last_offset = model::offset(replicated.last_offset()); if (!_mem_state.tx_start.contains(bid.pid)) { - auto base_offset = model::offset( - last_offset() - (bid.record_count - 1)); + auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); _mem_state.tx_starts.insert(base_offset); _mem_state.estimated.erase(bid.pid); } - co_return replicated; + + co_return kafka_result{.last_offset = new_offset}; } -ss::future> rm_stm::replicate_seq( +ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1185,7 +1191,7 @@ ss::future> rm_stm::replicate_seq( // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - co_return raft::replicate_result{.last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } // checking if the request is already being processed @@ -1193,8 +1199,8 @@ ss::future> rm_stm::replicate_seq( if (inflight->last_seq == bid.last_seq && inflight->is_processing) { // found an inflight request, parking the current request // until the former is resolved - auto promise = ss::make_lw_shared< - available_promise>>(); + auto promise + = ss::make_lw_shared>>(); inflight->parked.push_back(promise); u.return_all(); co_return co_await promise->get_future(); @@ -1279,20 +1285,26 @@ ss::future> rm_stm::replicate_seq( // we don't need session->lock because we never interleave // access to is_processing and offset with sync point (await) request->is_processing = false; - request->r = r; + if (r) { + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + request->r = ret_t(kafka_result{new_offset}); + } else { + request->r = ret_t(r.error()); + } for (auto& pending : request->parked) { - pending->set_value(r); + pending->set_value(request->r); } request->parked.clear(); - if (!r) { + if (!request->r) { // if r was failed at the consensus level (not because has_failed) // it should guarantee that all follow up replication requests fail // too but just in case stepping down to minimize the risk if (_c->is_leader() && _c->term() == synced_term) { co_await _c->step_down(); } - co_return r; + co_return request->r; } // requests get into session->cache in seq order so when we iterate @@ -1324,13 +1336,15 @@ ss::future> rm_stm::replicate_seq( _inflight_requests.erase(bid.pid); } - co_return r; + co_return request->r; } -ss::future> rm_stm::replicate_msg( +ss::future> rm_stm::replicate_msg( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { co_return errc::not_leader; } @@ -1338,7 +1352,14 @@ ss::future> rm_stm::replicate_msg( auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); co_await std::move(ss.request_enqueued); enqueued->set_value(); - co_return co_await std::move(ss.replicate_finished); + auto r = co_await std::move(ss.replicate_finished); + + if (!r) { + co_return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + co_return ret_t(kafka_result{new_offset}); } model::offset rm_stm::last_stable_offset() { @@ -1785,12 +1806,13 @@ ss::future<> rm_stm::apply_control( void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) { if (bid.has_idempotent()) { auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + auto translated = from_log_offset(last_offset); if (inserted) { seq_it->second.pid = bid.pid; seq_it->second.seq = bid.last_seq; - seq_it->second.last_offset = last_offset; + seq_it->second.last_offset = translated; } else { - seq_it->second.update(bid.last_seq, last_offset); + seq_it->second.update(bid.last_seq, translated); } seq_it->second.last_write_timestamp = bid.first_timestamp.value(); _oldest_session = std::min(_oldest_session, bid.first_timestamp); @@ -1849,11 +1871,21 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { seq_entry seq; seq.pid = seq_v1.pid; seq.seq = seq_v1.seq; - seq.last_offset = seq_v1.last_offset; + try { + seq.last_offset = from_log_offset(seq_v1.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seq.seq_cache.reserve(seq_v1.seq_cache.size()); for (auto& item : seq_v1.seq_cache) { - seq.seq_cache.push_back( - seq_cache_entry{.seq = item.seq, .offset = item.offset}); + try { + seq.seq_cache.push_back(seq_cache_entry{ + .seq = item.seq, .offset = from_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } seq.last_write_timestamp = seq_v1.last_write_timestamp; data.seqs.push_back(std::move(seq)); @@ -1870,7 +1902,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { auto seq = seq_entry{ .pid = seq_v0.pid, .seq = seq_v0.seq, - .last_offset = model::offset{-1}, + .last_offset = kafka::offset{-1}, .last_write_timestamp = seq_v0.last_write_timestamp}; data.seqs.push_back(std::move(seq)); } @@ -1989,12 +2021,22 @@ ss::future rm_stm::take_snapshot() { seq_entry_v1 seqs; seqs.pid = entry.pid; seqs.seq = entry.seq; - seqs.last_offset = entry.last_offset; + try { + seqs.last_offset = to_log_offset(entry.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seqs.last_write_timestamp = entry.last_write_timestamp; seqs.seq_cache.reserve(seqs.seq_cache.size()); for (auto& item : entry.seq_cache) { - seqs.seq_cache.push_back( - seq_cache_entry_v1{.seq = item.seq, .offset = item.offset}); + try { + seqs.seq_cache.push_back(seq_cache_entry_v1{ + .seq = item.seq, .offset = to_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } tx_ss.seqs.push_back(std::move(seqs)); } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index fcb0ea4b4656..0be0ae891598 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -21,6 +21,7 @@ #include "raft/logger.h" #include "raft/state_machine.h" #include "raft/types.h" +#include "storage/offset_translator_state.h" #include "storage/snapshot.h" #include "utils/available_promise.h" #include "utils/expiring_promise.h" @@ -74,14 +75,14 @@ class rm_stm final : public persisted_stm { struct seq_cache_entry { int32_t seq{-1}; - model::offset offset; + kafka::offset offset; }; struct seq_entry { static const int seq_cache_size = 5; model::producer_identity pid; int32_t seq{-1}; - model::offset last_offset{-1}; + kafka::offset last_offset{-1}; ss::circular_buffer seq_cache; model::timestamp::type last_write_timestamp; @@ -99,7 +100,7 @@ class rm_stm final : public persisted_stm { return ret; } - void update(int32_t new_seq, model::offset new_offset) { + void update(int32_t new_seq, kafka::offset new_offset) { if (new_seq < seq) { return; } @@ -109,7 +110,7 @@ class rm_stm final : public persisted_stm { return; } - if (seq >= 0 && last_offset >= model::offset{0}) { + if (seq >= 0 && last_offset >= kafka::offset{0}) { auto entry = seq_cache_entry{.seq = seq, .offset = last_offset}; seq_cache.push_back(entry); while (seq_cache.size() >= seq_entry::seq_cache_size) { @@ -169,12 +170,12 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + ss::future> replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options); @@ -184,6 +185,8 @@ class rm_stm final : public persisted_stm { ss::future<> stop() override; + ss::future<> start() override; + void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } void testing_only_enable_transactions() { _is_tx_enabled = true; } @@ -273,27 +276,27 @@ class rm_stm final : public persisted_stm { ss::future<> save_abort_snapshot(abort_snapshot); bool check_seq(model::batch_identity); - std::optional known_seq(model::batch_identity) const; - void set_seq(model::batch_identity, model::offset); + std::optional known_seq(model::batch_identity) const; + void set_seq(model::batch_identity, kafka::offset); void reset_seq(model::batch_identity); std::optional tail_seq(model::producer_identity) const; - ss::future> do_replicate( + ss::future> do_replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); - ss::future> replicate_seq( + ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> replicate_msg( + ss::future> replicate_msg( model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); @@ -423,10 +426,9 @@ class rm_stm final : public persisted_stm { // original request is replicated. struct inflight_request { int32_t last_seq{-1}; - result r = errc::success; + result r = errc::success; bool is_processing; - std::vector< - ss::lw_shared_ptr>>> + std::vector>>> parked; }; @@ -466,8 +468,7 @@ class rm_stm final : public persisted_stm { tail_seq = -1; } - std::optional> - known_seq(int32_t last_seq) const { + std::optional> known_seq(int32_t last_seq) const { for (auto& seq : cache) { if (seq->last_seq == last_seq && !seq->is_processing) { return seq->r; @@ -487,6 +488,20 @@ class rm_stm final : public persisted_stm { return lock_it->second; } + kafka::offset from_log_offset(model::offset old_offset) { + if (old_offset > model::offset{-1}) { + return kafka::offset(_translator->from_log_offset(old_offset)()); + } + return kafka::offset(old_offset()); + } + + model::offset to_log_offset(kafka::offset new_offset) { + if (new_offset > model::offset{-1}) { + return _translator->to_log_offset(model::offset(new_offset())); + } + return model::offset(new_offset()); + } + transaction_info::status_t get_tx_status(model::producer_identity pid) const; std::optional @@ -519,6 +534,7 @@ class rm_stm final : public persisted_stm { bool _is_tx_enabled{false}; ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; + ss::lw_shared_ptr _translator; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index 1d3187181691..a9c1d6a3b252 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -150,7 +150,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { wait_for_confirmed_leader(); wait_for_meta_initialized(); - std::vector offsets; + std::vector offsets; auto count = 5;