From 76259d50015917082d2107471f2a227a47ddd09a Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 5 Jul 2022 21:54:40 -0700 Subject: [PATCH] rm_stm: add feature_table as a dependency --- src/v/cluster/partition.cc | 16 +++++++---- src/v/cluster/partition.h | 5 +++- src/v/cluster/partition_manager.cc | 12 ++++++-- src/v/cluster/partition_manager.h | 5 +++- src/v/cluster/rm_stm.cc | 11 ++++++-- src/v/cluster/rm_stm.h | 7 ++++- src/v/cluster/tests/idempotency_tests.cc | 36 +++++++++++++++++++----- src/v/cluster/tests/rm_stm_tests.cc | 31 ++++++++++++++++---- src/v/redpanda/application.cc | 3 +- 9 files changed, 99 insertions(+), 27 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 16cc0f64eeecf..967c5ad94fa77 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -32,14 +32,15 @@ partition::partition( consensus_ptr r, ss::sharded& tx_gateway_frontend, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _raft(r) , _probe(std::make_unique(*this)) , _tx_gateway_frontend(tx_gateway_frontend) + , _feature_table(feature_table) , _is_tx_enabled(config::shard_local_cfg().enable_transactions.value()) , _is_idempotence_enabled( - config::shard_local_cfg().enable_idempotence.value()) - , _translator(r->get_offset_translator_state()) { + config::shard_local_cfg().enable_idempotence.value()) { auto stm_manager = _raft->log().stm_manager(); if (is_id_allocator_topic(_raft->ntp())) { @@ -71,7 +72,7 @@ partition::partition( if (has_rm_stm) { _rm_stm = ss::make_shared( - clusterlog, _raft.get(), _tx_gateway_frontend); + clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table); stm_manager->add_stm(_rm_stm); } @@ -106,6 +107,8 @@ partition::partition( ss::future> partition::replicate( model::record_batch_reader&& r, raft::replicate_options opts) { using ret_t = result; + vassert( + _translator, "ntp {}: offset translator must be initialized", ntp()); auto res = co_await _raft->replicate(std::move(r), opts); if (!res) { co_return ret_t(res.error()); @@ -138,6 +141,8 @@ kafka_stages partition::replicate_in_stages( model::record_batch_reader&& r, raft::replicate_options opts) { using ret_t = result; + vassert( + _translator, "ntp {}: offset translator must be initialized", ntp()); if (bid.is_transactional) { if (!_is_tx_enabled) { vlog( @@ -201,7 +206,8 @@ ss::future<> partition::start() { _probe.setup_metrics(ntp); - auto f = _raft->start(); + auto f = _raft->start().then( + [this] { _translator = _raft->get_offset_translator_state(); }); if (is_id_allocator_topic(ntp)) { return f.then([this] { return _id_allocator_stm->start(); }); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 408e9e9ac9461..70f1f34ea9a97 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -13,6 +13,7 @@ #include "cloud_storage/remote_partition.h" #include "cluster/archival_metadata_stm.h" +#include "cluster/feature_table.h" #include "cluster/id_allocator_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" @@ -44,7 +45,8 @@ class partition { consensus_ptr r, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); raft::group_id group() const { return _raft->group(); } ss::future<> start(); @@ -290,6 +292,7 @@ class partition { ss::abort_source _as; partition_probe _probe; ss::sharded& _tx_gateway_frontend; + ss::sharded& _feature_table; bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 406baf98795c8..263b3f1857ef5 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -48,13 +48,15 @@ partition_manager::partition_manager( ss::sharded& tx_gateway_frontend, ss::sharded& recovery_mgr, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _storage(storage.local()) , _raft_manager(raft) , _tx_gateway_frontend(tx_gateway_frontend) , _partition_recovery_mgr(recovery_mgr) , _cloud_storage_api(cloud_storage_api) - , _cloud_storage_cache(cloud_storage_cache) {} + , _cloud_storage_cache(cloud_storage_cache) + , _feature_table(feature_table) {} partition_manager::ntp_table_container partition_manager::get_topic_partition_table( @@ -120,7 +122,11 @@ ss::future partition_manager::manage( group, std::move(initial_nodes), log); auto p = ss::make_lw_shared( - c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache); + c, + _tx_gateway_frontend, + _cloud_storage_api, + _cloud_storage_cache, + _feature_table); _ntp_table.emplace(log.config().ntp(), p); _raft_table.emplace(group, p); diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index fa66f5ea31a09..8f45302e4abc8 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -14,6 +14,7 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/feature_table.h" #include "cluster/ntp_callbacks.h" #include "cluster/partition.h" #include "model/metadata.h" @@ -37,7 +38,8 @@ class partition_manager { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); using manage_cb_t = ss::noncopyable_function)>; @@ -190,6 +192,7 @@ class partition_manager { _partition_recovery_mgr; ss::sharded& _cloud_storage_api; ss::sharded& _cloud_storage_cache; + ss::sharded& _feature_table; ss::gate _gate; bool _block_new_leadership{false}; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 4491370d49a7b..ff71a368f09de 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -215,7 +215,8 @@ struct tx_snapshot_v1 { rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, - ss::sharded& tx_gateway_frontend) + ss::sharded& tx_gateway_frontend, + ss::sharded& feature_table) : persisted_stm("tx.snapshot", logger, c) , _oldest_session(model::timestamp::now()) , _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value()) @@ -235,7 +236,7 @@ rm_stm::rm_stm( "abort.idx", std::filesystem::path(c->log_config().work_directory()), ss::default_priority_class()) - , _translator(c->get_offset_translator_state()) { + , _feature_table(feature_table) { if (!_is_tx_enabled) { _is_autoabort_enabled = false; } @@ -855,6 +856,11 @@ ss::future<> rm_stm::stop() { return raft::state_machine::stop(); } +ss::future<> rm_stm::start() { + _translator = _c->get_offset_translator_state(); + return persisted_stm::start(); +} + rm_stm::transaction_info::status_t rm_stm::get_tx_status(model::producer_identity pid) const { if (_mem_state.preparing.contains(pid)) { @@ -1353,6 +1359,7 @@ ss::future> rm_stm::replicate_msg( } auto old_offset = r.value().last_offset; auto new_offset = kafka_offset(_translator->from_log_offset(old_offset)()); + co_return ret_t(kafka_result{new_offset}); } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 67d6fa91a226a..4a121c3133bd2 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/feature_table.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -151,7 +152,8 @@ class rm_stm final : public persisted_stm { explicit rm_stm( ss::logger&, raft::consensus*, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future> begin_tx( model::producer_identity, model::tx_seq, std::chrono::milliseconds); @@ -185,6 +187,8 @@ class rm_stm final : public persisted_stm { ss::future<> stop() override; + ss::future<> start() override; + void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } void testing_only_enable_transactions() { _is_tx_enabled = true; } @@ -521,6 +525,7 @@ class rm_stm final : public persisted_stm { ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; ss::lw_shared_ptr _translator; + ss::sharded& _feature_table; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index fd5a34711b8bf..808282a881583 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -36,7 +37,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -88,7 +92,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -141,7 +148,10 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -206,7 +216,10 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -266,7 +279,10 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -319,7 +335,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -355,7 +374,10 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index c53da9af13c5c..6e4be15a5e237 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -138,7 +142,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -213,7 +220,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -292,7 +302,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -329,7 +342,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -386,7 +402,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 75c874495d5d6..c34724516d0cc 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -723,7 +723,8 @@ void application::wire_up_redpanda_services() { std::ref(tx_gateway_frontend), std::ref(partition_recovery_manager), std::ref(cloud_storage_api), - std::ref(shadow_index_cache)) + std::ref(shadow_index_cache), + std::ref(_feature_table)) .get(); vlog(_log.info, "Partition manager started");