diff --git a/src/v/cluster/feature_table.cc b/src/v/cluster/feature_table.cc index 134f9a226374..24bf521389c8 100644 --- a/src/v/cluster/feature_table.cc +++ b/src/v/cluster/feature_table.cc @@ -30,6 +30,8 @@ std::string_view to_string_view(feature f) { return "serde_raft_0"; case feature::license: return "license"; + case feature::rm_stm_kafka_cache: + return "rm_stm_kafka_cache"; case feature::test_alpha: return "__test_alpha"; } @@ -58,7 +60,7 @@ std::string_view to_string_view(feature_state::state s) { // The version that this redpanda node will report: increment this // on protocol changes to raft0 structures, like adding new services. -static constexpr cluster_version latest_version = cluster_version{4}; +static constexpr cluster_version latest_version = cluster_version{5}; feature_table::feature_table() { // Intentionally undocumented environment variable, only for use diff --git a/src/v/cluster/feature_table.h b/src/v/cluster/feature_table.h index d40e2a7235b5..88eddf042092 100644 --- a/src/v/cluster/feature_table.h +++ b/src/v/cluster/feature_table.h @@ -27,6 +27,7 @@ enum class feature : std::uint64_t { mtls_authentication = 0x8, serde_raft_0 = 0x10, license = 0x20, + rm_stm_kafka_cache = 0x40, // Dummy features for testing only test_alpha = uint64_t(1) << 63, @@ -115,6 +116,12 @@ constexpr static std::array feature_schema{ feature::license, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + cluster_version{5}, + "rm_stm_kafka_cache", + feature::rm_stm_kafka_cache, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, feature_spec{ cluster_version{2001}, "__test_alpha", diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 8350845f7d95..090616884b61 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -32,10 +32,12 @@ partition::partition( consensus_ptr r, ss::sharded& tx_gateway_frontend, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _raft(r) , _probe(std::make_unique(*this)) , _tx_gateway_frontend(tx_gateway_frontend) + , _feature_table(feature_table) , _is_tx_enabled(config::shard_local_cfg().enable_transactions.value()) , _is_idempotence_enabled( config::shard_local_cfg().enable_idempotence.value()) { @@ -70,7 +72,7 @@ partition::partition( if (has_rm_stm) { _rm_stm = ss::make_shared( - clusterlog, _raft.get(), _tx_gateway_frontend); + clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table); stm_manager->add_stm(_rm_stm); } @@ -102,21 +104,15 @@ partition::partition( } } -ss::future> partition::replicate( +ss::future> partition::replicate( model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate(std::move(r), opts); -} - -raft::replicate_stages partition::replicate_in_stages( - model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate_in_stages(std::move(r), opts); -} - -ss::future> partition::replicate( - model::term_id term, - model::record_batch_reader&& r, - raft::replicate_options opts) { - return _raft->replicate(term, std::move(r), opts); + using ret_t = result; + auto res = co_await _raft->replicate(std::move(r), opts); + if (!res) { + co_return ret_t(res.error()); + } + co_return ret_t(kafka_result{ + kafka::offset(_translator->from_log_offset(res.value().last_offset)())}); } ss::shared_ptr partition::rm_stm() { @@ -138,10 +134,11 @@ ss::shared_ptr partition::rm_stm() { return _rm_stm; } -raft::replicate_stages partition::replicate_in_stages( +kafka_stages partition::replicate_in_stages( model::batch_identity bid, model::record_batch_reader&& r, raft::replicate_options opts) { + using ret_t = result; if (bid.is_transactional) { if (!_is_tx_enabled) { vlog( @@ -149,7 +146,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process a transactional request to {}. Transactional " "processing isn't enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -157,7 +154,7 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support transactional processing.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } @@ -168,7 +165,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process an idempotent request to {}. Idempotency isn't " "enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -176,68 +173,27 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support idempotency.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } if (_rm_stm) { return _rm_stm->replicate_in_stages(bid, std::move(r), opts); - } else { - return _raft->replicate_in_stages(std::move(r), opts); - } -} - -ss::future> partition::replicate( - model::batch_identity bid, - model::record_batch_reader&& r, - raft::replicate_options opts) { - if (bid.is_transactional) { - if (!_is_tx_enabled) { - vlog( - clusterlog.error, - "Can't process a transactional request to {}. Transactional " - "processing isn't enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support transactional processing.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } } - if (bid.has_idempotent()) { - if (!_is_idempotence_enabled) { - vlog( - clusterlog.error, - "Can't process an idempotent request to {}. Idempotency isn't " - "enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support idempotency.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - } - - if (_rm_stm) { - return _rm_stm->replicate(bid, std::move(r), opts); - } else { - return _raft->replicate(std::move(r), opts); - } + auto res = _raft->replicate_in_stages(std::move(r), opts); + auto replicate_finished = res.replicate_finished.then( + [this](result r) { + if (!r) { + return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = kafka::offset( + _translator->from_log_offset(old_offset)()); + return ret_t(kafka_result{new_offset}); + }); + return kafka_stages( + std::move(res.request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { @@ -245,7 +201,8 @@ ss::future<> partition::start() { _probe.setup_metrics(ntp); - auto f = _raft->start(); + auto f = _raft->start().then( + [this] { _translator = _raft->get_offset_translator_state(); }); if (is_id_allocator_topic(ntp)) { return f.then([this] { return _id_allocator_stm->start(); }); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 81107c6d0beb..060153c9cc0e 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -13,6 +13,7 @@ #include "cloud_storage/remote_partition.h" #include "cluster/archival_metadata_stm.h" +#include "cluster/feature_table.h" #include "cluster/id_allocator_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" @@ -44,27 +45,17 @@ class partition { consensus_ptr r, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); raft::group_id group() const { return _raft->group(); } ss::future<> start(); ss::future<> stop(); - ss::future> + ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages - replicate_in_stages(model::record_batch_reader&&, raft::replicate_options); - - ss::future> replicate( - model::term_id, model::record_batch_reader&&, raft::replicate_options); - - ss::future> replicate( - model::batch_identity, - model::record_batch_reader&&, - raft::replicate_options); - - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); @@ -289,11 +280,7 @@ class partition { return _raft->abort_configuration_change(rev); } -private: - friend partition_manager; - friend replicated_partition_probe; - - consensus_ptr raft() { return _raft; } + consensus_ptr raft() const { return _raft; } private: consensus_ptr _raft; @@ -305,9 +292,11 @@ class partition { ss::abort_source _as; partition_probe _probe; ss::sharded& _tx_gateway_frontend; + ss::sharded& _feature_table; bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; + ss::lw_shared_ptr _translator; friend std::ostream& operator<<(std::ostream& o, const partition& x); }; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 406baf98795c..263b3f1857ef 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -48,13 +48,15 @@ partition_manager::partition_manager( ss::sharded& tx_gateway_frontend, ss::sharded& recovery_mgr, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _storage(storage.local()) , _raft_manager(raft) , _tx_gateway_frontend(tx_gateway_frontend) , _partition_recovery_mgr(recovery_mgr) , _cloud_storage_api(cloud_storage_api) - , _cloud_storage_cache(cloud_storage_cache) {} + , _cloud_storage_cache(cloud_storage_cache) + , _feature_table(feature_table) {} partition_manager::ntp_table_container partition_manager::get_topic_partition_table( @@ -120,7 +122,11 @@ ss::future partition_manager::manage( group, std::move(initial_nodes), log); auto p = ss::make_lw_shared( - c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache); + c, + _tx_gateway_frontend, + _cloud_storage_api, + _cloud_storage_cache, + _feature_table); _ntp_table.emplace(log.config().ntp(), p); _raft_table.emplace(group, p); diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index fa66f5ea31a0..8f45302e4abc 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -14,6 +14,7 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/feature_table.h" #include "cluster/ntp_callbacks.h" #include "cluster/partition.h" #include "model/metadata.h" @@ -37,7 +38,8 @@ class partition_manager { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); using manage_cb_t = ss::noncopyable_function)>; @@ -190,6 +192,7 @@ class partition_manager { _partition_recovery_mgr; ss::sharded& _cloud_storage_api; ss::sharded& _cloud_storage_cache; + ss::sharded& _feature_table; ss::gate _gate; bool _block_new_leadership{false}; diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 949409d0fb20..01b21336a273 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -89,7 +89,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { sm::make_gauge( "leader_id", [this] { - return _partition._raft->get_leader_id().value_or( + return _partition.raft()->get_leader_id().value_or( model::node_id(-1)); }, sm::description("Id of current partition leader"), @@ -98,7 +98,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { sm::make_gauge( "under_replicated_replicas", [this] { - auto metrics = _partition._raft->get_follower_metrics(); + auto metrics = _partition.raft()->get_follower_metrics(); return std::count_if( metrics.cbegin(), metrics.cend(), @@ -181,7 +181,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { sm::make_gauge( "under_replicated_replicas", [this] { - auto metrics = _partition._raft->get_follower_metrics(); + auto metrics = _partition.raft()->get_follower_metrics(); return std::count_if( metrics.cbegin(), metrics.cend(), @@ -214,7 +214,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { .aggregate({sm::shard_label, partition_label}), sm::make_gauge( "replicas", - [this] { return _partition._raft->get_follower_count(); }, + [this] { return _partition.raft()->get_follower_count(); }, sm::description("Number of replicas per topic"), labels) .aggregate({sm::shard_label, partition_label}), diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 4766ad986f21..baa3dc492167 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -187,10 +187,36 @@ struct tx_snapshot_v0 { std::vector seqs; }; +struct seq_cache_entry_v1 { + int32_t seq{-1}; + model::offset offset; +}; + +struct seq_entry_v1 { + model::producer_identity pid; + int32_t seq{-1}; + model::offset last_offset{-1}; + ss::circular_buffer seq_cache; + model::timestamp::type last_write_timestamp; +}; + +struct tx_snapshot_v1 { + static constexpr uint8_t version = 1; + + std::vector fenced; + std::vector ongoing; + std::vector prepared; + std::vector aborted; + std::vector abort_indexes; + model::offset offset; + std::vector seqs; +}; + rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, - ss::sharded& tx_gateway_frontend) + ss::sharded& tx_gateway_frontend, + ss::sharded& feature_table) : persisted_stm("tx.snapshot", logger, c) , _oldest_session(model::timestamp::now()) , _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value()) @@ -209,7 +235,8 @@ rm_stm::rm_stm( , _abort_snapshot_mgr( "abort.idx", std::filesystem::path(c->log_config().work_directory()), - ss::default_priority_class()) { + ss::default_priority_class()) + , _feature_table(feature_table) { if (!_is_tx_enabled) { _is_autoabort_enabled = false; } @@ -760,7 +787,7 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } -raft::replicate_stages rm_stm::replicate_in_stages( +kafka_stages rm_stm::replicate_in_stages( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -779,10 +806,10 @@ raft::replicate_stages rm_stm::replicate_in_stages( enqueued->set_value(); } }); - return raft::replicate_stages(std::move(f), std::move(replicate_finished)); + return kafka_stages(std::move(f), std::move(replicate_finished)); } -ss::future> rm_stm::replicate( +ss::future> rm_stm::replicate( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -799,7 +826,7 @@ rm_stm::transfer_leadership(std::optional target) { }); } -ss::future> rm_stm::do_replicate( +ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, raft::replicate_options opts, @@ -829,6 +856,11 @@ ss::future<> rm_stm::stop() { return raft::state_machine::stop(); } +ss::future<> rm_stm::start() { + _translator = _c->get_offset_translator_state(); + return persisted_stm::start(); +} + rm_stm::transaction_info::status_t rm_stm::get_tx_status(model::producer_identity pid) const { if (_mem_state.preparing.contains(pid)) { @@ -922,7 +954,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return false; } - seq.update(bid.last_seq, model::offset{-1}); + seq.update(bid.last_seq, kafka::offset{-1}); seq.pid = bid.pid; seq.last_write_timestamp = last_write_timestamp; @@ -932,7 +964,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return true; } -std::optional +std::optional rm_stm::known_seq(model::batch_identity bid) const { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq == _log_state.seq_table.end()) { @@ -957,7 +989,7 @@ std::optional rm_stm::tail_seq(model::producer_identity pid) const { return pid_seq->second.seq; } -void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { +void rm_stm::set_seq(model::batch_identity bid, kafka::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { if (pid_seq->second.seq == bid.last_seq) { @@ -970,14 +1002,14 @@ void rm_stm::reset_seq(model::batch_identity bid) { _log_state.seq_table.erase(bid.pid); auto& seq = _log_state.seq_table[bid.pid]; seq.seq = bid.last_seq; - seq.last_offset = model::offset{-1}; + seq.last_offset = kafka::offset{-1}; seq.pid = bid.pid; seq.last_write_timestamp = model::timestamp::now().value(); _oldest_session = std::min( _oldest_session, model::timestamp(seq.last_write_timestamp)); } -ss::future> +ss::future> rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { if (!check_tx_permitted()) { co_return errc::generic_tx_error; @@ -1031,7 +1063,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // this isn't the first attempt in the tx we should try dedupe auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + if (cached_offset.value() < kafka::offset{0}) { vlog( clusterlog.warn, "Status of the original attempt is unknown (still is " @@ -1045,8 +1077,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // to propagate it to the app layer co_return errc::generic_tx_error; } - co_return raft::replicate_result{ - .last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } if (!check_seq(bid)) { @@ -1082,26 +1113,28 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { expiration_it->second.last_update = clock_type::now(); expiration_it->second.is_expiration_requested = false; - auto replicated = r.value(); + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); - set_seq(bid, replicated.last_offset); + set_seq(bid, new_offset); - auto last_offset = model::offset(replicated.last_offset()); if (!_mem_state.tx_start.contains(bid.pid)) { - auto base_offset = model::offset( - last_offset() - (bid.record_count - 1)); + auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); _mem_state.tx_starts.insert(base_offset); _mem_state.estimated.erase(bid.pid); } - co_return replicated; + + co_return kafka_result{.last_offset = new_offset}; } -ss::future> rm_stm::replicate_seq( +ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1160,7 +1193,7 @@ ss::future> rm_stm::replicate_seq( // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - co_return raft::replicate_result{.last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } // checking if the request is already being processed @@ -1168,8 +1201,8 @@ ss::future> rm_stm::replicate_seq( if (inflight->last_seq == bid.last_seq && inflight->is_processing) { // found an inflight request, parking the current request // until the former is resolved - auto promise = ss::make_lw_shared< - available_promise>>(); + auto promise + = ss::make_lw_shared>>(); inflight->parked.push_back(promise); u.return_all(); co_return co_await promise->get_future(); @@ -1254,20 +1287,26 @@ ss::future> rm_stm::replicate_seq( // we don't need session->lock because we never interleave // access to is_processing and offset with sync point (await) request->is_processing = false; - request->r = r; + if (r) { + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + request->r = ret_t(kafka_result{new_offset}); + } else { + request->r = ret_t(r.error()); + } for (auto& pending : request->parked) { - pending->set_value(r); + pending->set_value(request->r); } request->parked.clear(); - if (!r) { + if (!request->r) { // if r was failed at the consensus level (not because has_failed) // it should guarantee that all follow up replication requests fail // too but just in case stepping down to minimize the risk if (_c->is_leader() && _c->term() == synced_term) { co_await _c->step_down(); } - co_return r; + co_return request->r; } // requests get into session->cache in seq order so when we iterate @@ -1299,13 +1338,15 @@ ss::future> rm_stm::replicate_seq( _inflight_requests.erase(bid.pid); } - co_return r; + co_return request->r; } -ss::future> rm_stm::replicate_msg( +ss::future> rm_stm::replicate_msg( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { co_return errc::not_leader; } @@ -1313,7 +1354,14 @@ ss::future> rm_stm::replicate_msg( auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); co_await std::move(ss.request_enqueued); enqueued->set_value(); - co_return co_await std::move(ss.replicate_finished); + auto r = co_await std::move(ss.replicate_finished); + + if (!r) { + co_return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + co_return ret_t(kafka_result{new_offset}); } model::offset rm_stm::last_stable_offset() { @@ -1760,12 +1808,13 @@ ss::future<> rm_stm::apply_control( void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) { if (bid.has_idempotent()) { auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + auto translated = from_log_offset(last_offset); if (inserted) { seq_it->second.pid = bid.pid; seq_it->second.seq = bid.last_seq; - seq_it->second.last_offset = last_offset; + seq_it->second.last_offset = translated; } else { - seq_it->second.update(bid.last_seq, last_offset); + seq_it->second.update(bid.last_seq, translated); } seq_it->second.last_write_timestamp = bid.first_timestamp.value(); _oldest_session = std::min(_oldest_session, bid.first_timestamp); @@ -1812,19 +1861,50 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { iobuf_parser data_parser(std::move(tx_ss_buf)); if (hdr.version == tx_snapshot::version) { data = reflection::adl{}.from(data_parser); + } else if (hdr.version == tx_snapshot_v1::version) { + auto data_v1 = reflection::adl{}.from(data_parser); + data.fenced = std::move(data_v1.fenced); + data.ongoing = std::move(data_v1.ongoing); + data.prepared = std::move(data_v1.prepared); + data.aborted = std::move(data_v1.aborted); + data.abort_indexes = std::move(data_v1.abort_indexes); + data.offset = std::move(data_v1.offset); + for (auto& seq_v1 : data_v1.seqs) { + seq_entry seq; + seq.pid = seq_v1.pid; + seq.seq = seq_v1.seq; + try { + seq.last_offset = from_log_offset(seq_v1.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } + seq.seq_cache.reserve(seq_v1.seq_cache.size()); + for (auto& item : seq_v1.seq_cache) { + try { + seq.seq_cache.push_back(seq_cache_entry{ + .seq = item.seq, .offset = from_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } + } + seq.last_write_timestamp = seq_v1.last_write_timestamp; + data.seqs.push_back(std::move(seq)); + } } else if (hdr.version == tx_snapshot_v0::version) { auto data_v0 = reflection::adl{}.from(data_parser); - data.fenced = data_v0.fenced; - data.ongoing = data_v0.ongoing; - data.prepared = data_v0.prepared; - data.aborted = data_v0.aborted; - data.abort_indexes = data_v0.abort_indexes; - data.offset = data_v0.offset; + data.fenced = std::move(data_v0.fenced); + data.ongoing = std::move(data_v0.ongoing); + data.prepared = std::move(data_v0.prepared); + data.aborted = std::move(data_v0.aborted); + data.abort_indexes = std::move(data_v0.abort_indexes); + data.offset = std::move(data_v0.offset); for (auto seq_v0 : data_v0.seqs) { auto seq = seq_entry{ .pid = seq_v0.pid, .seq = seq_v0.seq, - .last_offset = model::offset{-1}, + .last_offset = kafka::offset{-1}, .last_write_timestamp = seq_v0.last_write_timestamp}; data.seqs.push_back(std::move(seq)); } @@ -1879,6 +1959,32 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _insync_offset = data.offset; } +uint8_t rm_stm::active_snapshot_version() { + if (_feature_table.local().is_active(feature::rm_stm_kafka_cache)) { + return tx_snapshot::version; + } + return tx_snapshot_v1::version; +} + +template +void rm_stm::fill_snapshot_wo_seqs(T& snapshot) { + for (auto const& [k, v] : _log_state.fence_pid_epoch) { + snapshot.fenced.push_back(model::producer_identity{k(), v()}); + } + for (auto& entry : _log_state.ongoing_map) { + snapshot.ongoing.push_back(entry.second); + } + for (auto& entry : _log_state.prepared) { + snapshot.prepared.push_back(entry.second); + } + for (auto& entry : _log_state.aborted) { + snapshot.aborted.push_back(entry); + } + for (auto& entry : _log_state.abort_indexes) { + snapshot.abort_indexes.push_back(entry); + } +} + ss::future rm_stm::take_snapshot() { if (_log_state.aborted.size() > _abort_index_segment_size) { std::sort( @@ -1904,33 +2010,51 @@ ss::future rm_stm::take_snapshot() { _log_state.aborted = snapshot.aborted; } - tx_snapshot tx_ss; - - for (auto const& [k, v] : _log_state.fence_pid_epoch) { - tx_ss.fenced.push_back(model::producer_identity{k(), v()}); - } - for (auto& entry : _log_state.ongoing_map) { - tx_ss.ongoing.push_back(entry.second); - } - for (auto& entry : _log_state.prepared) { - tx_ss.prepared.push_back(entry.second); - } - for (auto& entry : _log_state.aborted) { - tx_ss.aborted.push_back(entry); - } - for (auto& entry : _log_state.abort_indexes) { - tx_ss.abort_indexes.push_back(entry); - } - for (const auto& entry : _log_state.seq_table) { - tx_ss.seqs.push_back(entry.second.copy()); - } - tx_ss.offset = _insync_offset; - iobuf tx_ss_buf; - reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + auto version = active_snapshot_version(); + if (version == tx_snapshot::version) { + tx_snapshot tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& entry : _log_state.seq_table) { + tx_ss.seqs.push_back(entry.second.copy()); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else if (version == tx_snapshot_v1::version) { + tx_snapshot_v1 tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& it : _log_state.seq_table) { + auto& entry = it.second; + seq_entry_v1 seqs; + seqs.pid = entry.pid; + seqs.seq = entry.seq; + try { + seqs.last_offset = to_log_offset(entry.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } + seqs.last_write_timestamp = entry.last_write_timestamp; + seqs.seq_cache.reserve(seqs.seq_cache.size()); + for (auto& item : entry.seq_cache) { + try { + seqs.seq_cache.push_back(seq_cache_entry_v1{ + .seq = item.seq, .offset = to_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } + } + tx_ss.seqs.push_back(std::move(seqs)); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else { + vassert(false, "unsupported tx_snapshot version {}", version); + } co_return stm_snapshot::create( - tx_snapshot::version, _insync_offset, std::move(tx_ss_buf)); + version, _insync_offset, std::move(tx_ss_buf)); } ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) { diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 48f3fa0da964..6e6c09c542e2 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/feature_table.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -21,6 +22,7 @@ #include "raft/logger.h" #include "raft/state_machine.h" #include "raft/types.h" +#include "storage/offset_translator_state.h" #include "storage/snapshot.h" #include "utils/available_promise.h" #include "utils/expiring_promise.h" @@ -74,14 +76,14 @@ class rm_stm final : public persisted_stm { struct seq_cache_entry { int32_t seq{-1}; - model::offset offset; + kafka::offset offset; }; struct seq_entry { static const int seq_cache_size = 5; model::producer_identity pid; int32_t seq{-1}; - model::offset last_offset{-1}; + kafka::offset last_offset{-1}; ss::circular_buffer seq_cache; model::timestamp::type last_write_timestamp; @@ -99,7 +101,7 @@ class rm_stm final : public persisted_stm { return ret; } - void update(int32_t new_seq, model::offset new_offset) { + void update(int32_t new_seq, kafka::offset new_offset) { if (new_seq < seq) { return; } @@ -109,7 +111,7 @@ class rm_stm final : public persisted_stm { return; } - if (seq >= 0 && last_offset >= model::offset{0}) { + if (seq >= 0 && last_offset >= kafka::offset{0}) { auto entry = seq_cache_entry{.seq = seq, .offset = last_offset}; seq_cache.push_back(entry); while (seq_cache.size() >= seq_entry::seq_cache_size) { @@ -123,7 +125,7 @@ class rm_stm final : public persisted_stm { }; struct tx_snapshot { - static constexpr uint8_t version = 1; + static constexpr uint8_t version = 2; std::vector fenced; std::vector ongoing; @@ -150,7 +152,8 @@ class rm_stm final : public persisted_stm { explicit rm_stm( ss::logger&, raft::consensus*, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future> begin_tx( model::producer_identity, model::tx_seq, std::chrono::milliseconds); @@ -169,12 +172,12 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + ss::future> replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options); @@ -184,6 +187,8 @@ class rm_stm final : public persisted_stm { ss::future<> stop() override; + ss::future<> start() override; + void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } void testing_only_enable_transactions() { _is_tx_enabled = true; } @@ -273,27 +278,27 @@ class rm_stm final : public persisted_stm { ss::future<> save_abort_snapshot(abort_snapshot); bool check_seq(model::batch_identity); - std::optional known_seq(model::batch_identity) const; - void set_seq(model::batch_identity, model::offset); + std::optional known_seq(model::batch_identity) const; + void set_seq(model::batch_identity, kafka::offset); void reset_seq(model::batch_identity); std::optional tail_seq(model::producer_identity) const; - ss::future> do_replicate( + ss::future> do_replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); - ss::future> replicate_seq( + ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> replicate_msg( + ss::future> replicate_msg( model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); @@ -406,27 +411,14 @@ class rm_stm final : public persisted_stm { } }; - struct request_id { - model::producer_identity pid; - int32_t seq; - - auto operator<=>(const request_id&) const = default; - - template - friend H AbslHashValue(H h, const request_id& bid) { - return H::combine(std::move(h), bid.pid, bid.seq); - } - }; - // When a request is retried while the first appempt is still // being replicated the retried request is parked until the // original request is replicated. struct inflight_request { int32_t last_seq{-1}; - result r = errc::success; + result r = errc::success; bool is_processing; - std::vector< - ss::lw_shared_ptr>>> + std::vector>>> parked; }; @@ -466,8 +458,7 @@ class rm_stm final : public persisted_stm { tail_seq = -1; } - std::optional> - known_seq(int32_t last_seq) const { + std::optional> known_seq(int32_t last_seq) const { for (auto& seq : cache) { if (seq->last_seq == last_seq && !seq->is_processing) { return seq->r; @@ -487,11 +478,30 @@ class rm_stm final : public persisted_stm { return lock_it->second; } + kafka::offset from_log_offset(model::offset old_offset) { + if (old_offset > model::offset{-1}) { + return kafka::offset(_translator->from_log_offset(old_offset)()); + } + return kafka::offset(old_offset()); + } + + model::offset to_log_offset(kafka::offset new_offset) { + if (new_offset > model::offset{-1}) { + return _translator->to_log_offset(model::offset(new_offset())); + } + return model::offset(new_offset()); + } + transaction_info::status_t get_tx_status(model::producer_identity pid) const; std::optional get_expiration_info(model::producer_identity pid) const; + uint8_t active_snapshot_version(); + + template + void fill_snapshot_wo_seqs(T&); + ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; absl::flat_hash_map< @@ -514,6 +524,8 @@ class rm_stm final : public persisted_stm { bool _is_tx_enabled{false}; ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; + ss::lw_shared_ptr _translator; + ss::sharded& _feature_table; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index 1d3187181691..f79f59eaccd0 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -36,7 +37,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -81,6 +85,7 @@ FIXTURE_TEST( raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE((bool)r2); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -88,7 +93,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -135,13 +143,17 @@ FIXTURE_TEST( BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -150,7 +162,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { wait_for_confirmed_leader(); wait_for_meta_initialized(); - std::vector offsets; + std::vector offsets; auto count = 5; @@ -200,13 +212,17 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE(r1.value().last_offset == offsets[i]); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -260,13 +276,17 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { r1 == failure_type(cluster::errc::sequence_out_of_order)); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -312,6 +332,7 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { .get0(); BOOST_REQUIRE( r2 == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -319,7 +340,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -349,13 +373,17 @@ FIXTURE_TEST( .get0(); BOOST_REQUIRE( r == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -404,4 +432,5 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset == r2.value().last_offset); + feature_table.stop().get0(); } diff --git a/src/v/cluster/tests/partition_moving_test.cc b/src/v/cluster/tests/partition_moving_test.cc index a8b68af8fe2b..b5d8fd9766c4 100644 --- a/src/v/cluster/tests/partition_moving_test.cc +++ b/src/v/cluster/tests/partition_moving_test.cc @@ -318,7 +318,7 @@ class partition_assignment_test_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index 830d269e5b6e..635564406899 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -159,7 +159,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index c53da9af13c5..f40bb497b9d9 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -129,6 +133,7 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { BOOST_REQUIRE_EQUAL(aborted_txs.size(), 0); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -138,7 +143,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -204,6 +212,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -213,7 +222,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -285,6 +297,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // transactional writes of an unknown tx are rejected @@ -292,7 +305,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -322,6 +338,7 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } // begin fences off old transactions @@ -329,7 +346,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -379,6 +399,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(!(bool)offset_r); + feature_table.stop().get0(); } // transactional writes of an aborted tx are rejected @@ -386,7 +407,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -438,4 +462,5 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 9769e8e07a19..474bfb6a2d9e 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -32,6 +32,16 @@ namespace cluster { +kafka_stages::kafka_stages( + ss::future<> enq, ss::future> offset_future) + : request_enqueued(std::move(enq)) + , replicate_finished(std::move(offset_future)) {} + +kafka_stages::kafka_stages(raft::errc ec) + : request_enqueued(ss::now()) + , replicate_finished( + ss::make_ready_future>(make_error_code(ec))){}; + bool topic_properties::is_compacted() const { if (!cleanup_policy_bitflags) { return false; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index a5abd73d555f..b101b19a88bf 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -175,6 +175,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept { return std::error_code(static_cast(e), tx_error_category()); } +struct kafka_result { + kafka::offset last_offset; +}; +struct kafka_stages { + kafka_stages(ss::future<>, ss::future>); + explicit kafka_stages(raft::errc); + // after this future is ready, request in enqueued in raft and it will not + // be reorderd + ss::future<> request_enqueued; + // after this future is ready, request was successfully replicated with + // requested consistency level + ss::future> replicate_finished; +}; + struct try_abort_request : serde::envelope> { model::partition_id tm; diff --git a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc index 528c78f41417..c2bc47e6651b 100644 --- a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc +++ b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc @@ -178,7 +178,7 @@ ss::future> fiber_mock_fixture::make_source( auto batch = make_random_batch(params.records_per_input); co_await tests::cooperative_spin_wait_with_timeout( 2s, [partition]() { return partition->is_elected_leader(); }); - auto r = co_await partition->replicate( + auto r = co_await partition->raft()->replicate( std::move(batch), raft::replicate_options(raft::consistency_level::leader_ack)); vassert(!r.has_error(), "Write error: {}", r.error()); diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 1441b02bfb60..b7d7c2262a81 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1689,7 +1689,7 @@ group::commit_tx(cluster::commit_group_tx_request r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1772,7 +1772,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { r.pid, std::move(fence)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1887,7 +1887,7 @@ group::prepare_tx(cluster::prepare_group_tx_request r) { std::move(tx_entry)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1983,7 +1983,7 @@ group::abort_tx(cluster::abort_group_tx_request r) { std::move(tx)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2103,7 +2103,8 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto batch = std::move(builder).build(); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto replicate_stages = _partition->replicate_in_stages( + auto replicate_stages = _partition->raft()->replicate_in_stages( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2492,7 +2493,8 @@ ss::future group::remove() { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2572,7 +2574,8 @@ group::remove_topic_partitions(const std::vector& tps) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2599,7 +2602,8 @@ group::remove_topic_partitions(const std::vector& tps) { ss::future> group::store_group(model::record_batch batch) { - return _partition->replicate( + return _partition->raft()->replicate( + _term, model::make_memory_record_batch_reader(std::move(batch)), raft::replicate_options(raft::consistency_level::quorum_ack)); } diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index c7a27abe203b..a98aba8bbb5f 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -332,6 +332,7 @@ ss::future replicate( [ntp = std::move(ntp), f_reader = std::move(f_reader)](cluster::partition_manager& pm) mutable { return pm.get(ntp) + ->raft() ->replicate( std::move(f_reader), raft::replicate_options(raft::consistency_level::quorum_ack)) diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 687b127c6c0b..8e9a2b051ecd 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -139,7 +139,7 @@ static error_code map_produce_error_code(std::error_code ec) { case raft::errc::shutting_down: return error_code::request_timed_out; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } @@ -157,11 +157,11 @@ static error_code map_produce_error_code(std::error_code ec) { case cluster::errc::invalid_request: return error_code::invalid_request; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } - return error_code::unknown_server_error; + return error_code::request_timed_out; } /* @@ -198,7 +198,7 @@ static partition_produce_stages partition_append( p.error_code = map_produce_error_code(r.error()); } } catch (...) { - p.error_code = error_code::unknown_server_error; + p.error_code = error_code::request_timed_out; } return p; }), diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index faf03c03d452..d24fe5fed21b 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -165,11 +165,11 @@ ss::future> replicated_partition::replicate( model::record_batch_reader rdr, raft::replicate_options opts) { using ret_t = result; return _partition->replicate(std::move(rdr), opts) - .then([this](result r) { + .then([](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->from_log_offset(r.value().last_offset)); + return ret_t(model::offset(r.value().last_offset())); }); } @@ -179,15 +179,18 @@ raft::replicate_stages replicated_partition::replicate( raft::replicate_options opts) { using ret_t = result; auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); - res.replicate_finished = res.replicate_finished.then( - [this](result r) { + + raft::replicate_stages out(raft::errc::success); + out.request_enqueued = std::move(res.request_enqueued); + out.replicate_finished = res.replicate_finished.then( + [](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(raft::replicate_result{ - _translator->from_log_offset(r.value().last_offset)}); + return ret_t( + raft::replicate_result{model::offset(r.value().last_offset())}); }); - return res; + return out; } std::optional replicated_partition::get_leader_epoch_last_offset( diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index d69e40796943..af3bbbf539c9 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -418,7 +418,7 @@ FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -483,7 +483,7 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -563,7 +563,7 @@ FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -615,7 +615,7 @@ FIXTURE_TEST(fetch_request_max_bytes, redpanda_thread_fixture) { model::offset(0), 20); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); }) diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index fd081431ec40..8628cf183dc4 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -266,7 +266,7 @@ FIXTURE_TEST(test_recreated_topic_does_not_lose_data, recreate_test_fixture) { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); auto p = pm.get(ntp); - return p + return p->raft() ->replicate( std::move(rdr), raft::replicate_options( diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 3ecfdf1cb688..86e92f865e14 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -29,6 +29,12 @@ #include #include +namespace kafka { + +using offset = named_type; + +} // namespace kafka + namespace model { // Named after Kafka cleanup.policy topic property diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 6dba2999a00d..1e0556610508 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -723,7 +723,8 @@ void application::wire_up_redpanda_services() { std::ref(tx_gateway_frontend), std::ref(partition_recovery_manager), std::ref(cloud_storage_api), - std::ref(shadow_index_cache)) + std::ref(shadow_index_cache), + std::ref(_feature_table)) .get(); vlog(_log.info, "Partition manager started"); diff --git a/tests/rptest/services/redpanda_installer.py b/tests/rptest/services/redpanda_installer.py index fe4cfd1bdc46..e9ab2c2f2cbb 100644 --- a/tests/rptest/services/redpanda_installer.py +++ b/tests/rptest/services/redpanda_installer.py @@ -9,6 +9,7 @@ import re import requests +from ducktape.utils.util import wait_until # Match any version that may result from a redpanda binary, which may not be a # released version. @@ -16,6 +17,24 @@ VERSION_RE = re.compile(".*v(\\d+)\\.(\\d+)\\.(\\d+).*") +def wait_for_num_versions(redpanda, num_versions): + def get_unique_versions(): + node = redpanda.nodes[0] + brokers_list = \ + str(node.account.ssh_output(f"{redpanda.find_binary('rpk')} redpanda admin brokers list")) + redpanda.logger.debug(brokers_list) + version_re = re.compile("v\\d+\\.\\d+\\.\\d+") + return set(version_re.findall(brokers_list)) + + # NOTE: allow retries, as the version may not be available immediately + # following a restart. + wait_until(lambda: len(get_unique_versions()) == num_versions, + timeout_sec=30) + unique_versions = get_unique_versions() + assert len(unique_versions) == num_versions, unique_versions + return unique_versions + + class RedpandaInstaller: """ Provides mechanisms to install multiple Redpanda binaries on a cluster. diff --git a/tests/rptest/tests/cluster_features_test.py b/tests/rptest/tests/cluster_features_test.py index 471b576cba87..ab7c91c43df8 100644 --- a/tests/rptest/tests/cluster_features_test.py +++ b/tests/rptest/tests/cluster_features_test.py @@ -42,7 +42,7 @@ def _assert_default_features(self): # This assertion will break each time we increment the value # of `latest_version` in the redpanda source. Update it when # that happens. - assert features_response['cluster_version'] == 4 + assert features_response['cluster_version'] == 5 assert self._get_features_map( features_response)['central_config']['state'] == 'active' diff --git a/tests/rptest/tests/fix_5355_upgrade_test.py b/tests/rptest/tests/fix_5355_upgrade_test.py new file mode 100644 index 000000000000..455154950163 --- /dev/null +++ b/tests/rptest/tests/fix_5355_upgrade_test.py @@ -0,0 +1,118 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import re + +from rptest.clients.types import TopicSpec +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions +from rptest.services.redpanda import RedpandaService + +from confluent_kafka import (Producer, KafkaException) +from random import choice +from string import ascii_uppercase + + +def on_delivery(err, msg): + if err is not None: + raise KafkaException(err) + + +class Fix5355UpgradeTest(RedpandaTest): + topics = [TopicSpec(name="topic1")] + """ + Basic test that upgrading software works as expected. + """ + def __init__(self, test_context): + extra_rp_conf = { + "default_topic_replications": 3, + "default_topic_partitions": 1, + "log_segment_size": 1048576 + } + super(Fix5355UpgradeTest, self).__init__(test_context=test_context, + num_brokers=3, + enable_installer=True, + extra_rp_conf=extra_rp_conf) + self.installer = self.redpanda._installer + + def setUp(self): + # NOTE: `rpk redpanda admin brokers list` requires versions v22.1.x and + # above. + self.installer.install(self.redpanda.nodes, (22, 1, 3)) + super(Fix5355UpgradeTest, self).setUp() + + def fill_segment(self): + payload_1kb = ''.join(choice(ascii_uppercase) for i in range(1024)) + p = Producer({ + "bootstrap.servers": self.redpanda.brokers(), + "enable.idempotence": True, + "retries": 5 + }) + for i in range(0, 2 * 1024): + p.produce("topic1", + key="key1".encode('utf-8'), + value=payload_1kb.encode('utf-8'), + callback=on_delivery) + p.flush() + + def check_snapshot_exist(self): + for node in self.redpanda.nodes: + cmd = f"find {RedpandaService.DATA_DIR}" + out_iter = node.account.ssh_capture(cmd) + has_snapshot = False + for line in out_iter: + has_snapshot = has_snapshot or re.match( + f"{RedpandaService.DATA_DIR}/kafka/topic1/\\d+_\\d+/tx.snapshot", + line) + assert has_snapshot + + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_rollback(self): + """ + the test checks than a mid upgrade rollback isn't broken + """ + first_node = self.redpanda.nodes[0] + + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + # Upgrade one node to the head version. + self.installer.install([first_node], RedpandaInstaller.HEAD) + self.redpanda.restart_nodes([first_node]) + unique_versions = wait_for_num_versions(self.redpanda, 2) + assert "v22.1.3" in unique_versions, unique_versions + + self.fill_segment() + self.check_snapshot_exist() + + # Rollback the partial upgrade and ensure we go back to the original + # state. + self.installer.install([first_node], (22, 1, 3)) + self.redpanda.restart_nodes([first_node]) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_upgrade(self): + """ + the test checks than upgrade isn't broken + """ + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + self.fill_segment() + self.check_snapshot_exist() + + # Upgrade one node to the head version. + self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD) + self.redpanda.restart_nodes(self.redpanda.nodes) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" not in unique_versions, unique_versions diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index e5da152eeb6f..7745e1ebccaf 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -14,25 +14,7 @@ from rptest.tests.redpanda_test import RedpandaTest from rptest.services.cluster import cluster from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST -from rptest.services.redpanda_installer import RedpandaInstaller - - -def wait_for_num_versions(redpanda, num_versions): - def get_unique_versions(): - node = redpanda.nodes[0] - brokers_list = \ - str(node.account.ssh_output(f"{redpanda.find_binary('rpk')} redpanda admin brokers list")) - redpanda.logger.debug(brokers_list) - version_re = re.compile("v\\d+\\.\\d+\\.\\d+") - return set(version_re.findall(brokers_list)) - - # NOTE: allow retries, as the version may not be available immediately - # following a restart. - wait_until(lambda: len(get_unique_versions()) == num_versions, - timeout_sec=30) - unique_versions = get_unique_versions() - assert len(unique_versions) == num_versions, unique_versions - return unique_versions +from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions class UpgradeFromSpecificVersion(RedpandaTest):