diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 0547135d06177..090616884b616 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -32,10 +32,12 @@ partition::partition( consensus_ptr r, ss::sharded& tx_gateway_frontend, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _raft(r) , _probe(std::make_unique(*this)) , _tx_gateway_frontend(tx_gateway_frontend) + , _feature_table(feature_table) , _is_tx_enabled(config::shard_local_cfg().enable_transactions.value()) , _is_idempotence_enabled( config::shard_local_cfg().enable_idempotence.value()) { @@ -70,7 +72,7 @@ partition::partition( if (has_rm_stm) { _rm_stm = ss::make_shared( - clusterlog, _raft.get(), _tx_gateway_frontend); + clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table); stm_manager->add_stm(_rm_stm); } diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 93f6b19b4387e..060153c9cc0e0 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -13,6 +13,7 @@ #include "cloud_storage/remote_partition.h" #include "cluster/archival_metadata_stm.h" +#include "cluster/feature_table.h" #include "cluster/id_allocator_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" @@ -44,7 +45,8 @@ class partition { consensus_ptr r, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); raft::group_id group() const { return _raft->group(); } ss::future<> start(); @@ -290,6 +292,7 @@ class partition { ss::abort_source _as; partition_probe _probe; ss::sharded& _tx_gateway_frontend; + ss::sharded& _feature_table; bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 406baf98795c8..263b3f1857ef5 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -48,13 +48,15 @@ partition_manager::partition_manager( ss::sharded& tx_gateway_frontend, ss::sharded& recovery_mgr, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _storage(storage.local()) , _raft_manager(raft) , _tx_gateway_frontend(tx_gateway_frontend) , _partition_recovery_mgr(recovery_mgr) , _cloud_storage_api(cloud_storage_api) - , _cloud_storage_cache(cloud_storage_cache) {} + , _cloud_storage_cache(cloud_storage_cache) + , _feature_table(feature_table) {} partition_manager::ntp_table_container partition_manager::get_topic_partition_table( @@ -120,7 +122,11 @@ ss::future partition_manager::manage( group, std::move(initial_nodes), log); auto p = ss::make_lw_shared( - c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache); + c, + _tx_gateway_frontend, + _cloud_storage_api, + _cloud_storage_cache, + _feature_table); _ntp_table.emplace(log.config().ntp(), p); _raft_table.emplace(group, p); diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index fa66f5ea31a09..8f45302e4abc8 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -14,6 +14,7 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/feature_table.h" #include "cluster/ntp_callbacks.h" #include "cluster/partition.h" #include "model/metadata.h" @@ -37,7 +38,8 @@ class partition_manager { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); using manage_cb_t = ss::noncopyable_function)>; @@ -190,6 +192,7 @@ class partition_manager { _partition_recovery_mgr; ss::sharded& _cloud_storage_api; ss::sharded& _cloud_storage_cache; + ss::sharded& _feature_table; ss::gate _gate; bool _block_new_leadership{false}; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 81559c2f42761..d3367c2942241 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -215,7 +215,8 @@ struct tx_snapshot_v1 { rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, - ss::sharded& tx_gateway_frontend) + ss::sharded& tx_gateway_frontend, + ss::sharded& feature_table) : persisted_stm("tx.snapshot", logger, c) , _oldest_session(model::timestamp::now()) , _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value()) @@ -234,7 +235,8 @@ rm_stm::rm_stm( , _abort_snapshot_mgr( "abort.idx", std::filesystem::path(c->log_config().work_directory()), - ss::default_priority_class()) { + ss::default_priority_class()) + , _feature_table(feature_table) { if (!_is_tx_enabled) { _is_autoabort_enabled = false; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index aee55034860a6..6e6c09c542e25 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/feature_table.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -151,7 +152,8 @@ class rm_stm final : public persisted_stm { explicit rm_stm( ss::logger&, raft::consensus*, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future> begin_tx( model::producer_identity, model::tx_seq, std::chrono::milliseconds); @@ -523,6 +525,7 @@ class rm_stm final : public persisted_stm { ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; ss::lw_shared_ptr _translator; + ss::sharded& _feature_table; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index a9c1d6a3b252b..f79f59eaccd06 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -36,7 +37,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -81,6 +85,7 @@ FIXTURE_TEST( raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE((bool)r2); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -88,7 +93,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -135,13 +143,17 @@ FIXTURE_TEST( BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -200,13 +212,17 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE(r1.value().last_offset == offsets[i]); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -260,13 +276,17 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { r1 == failure_type(cluster::errc::sequence_out_of_order)); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -312,6 +332,7 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { .get0(); BOOST_REQUIRE( r2 == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -319,7 +340,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -349,13 +373,17 @@ FIXTURE_TEST( .get0(); BOOST_REQUIRE( r == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -404,4 +432,5 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset == r2.value().last_offset); + feature_table.stop().get0(); } diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index c53da9af13c5c..f40bb497b9d94 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -129,6 +133,7 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { BOOST_REQUIRE_EQUAL(aborted_txs.size(), 0); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -138,7 +143,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -204,6 +212,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -213,7 +222,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -285,6 +297,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // transactional writes of an unknown tx are rejected @@ -292,7 +305,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -322,6 +338,7 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } // begin fences off old transactions @@ -329,7 +346,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -379,6 +399,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(!(bool)offset_r); + feature_table.stop().get0(); } // transactional writes of an aborted tx are rejected @@ -386,7 +407,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -438,4 +462,5 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 6dba2999a00d8..1e05566105087 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -723,7 +723,8 @@ void application::wire_up_redpanda_services() { std::ref(tx_gateway_frontend), std::ref(partition_recovery_manager), std::ref(cloud_storage_api), - std::ref(shadow_index_cache)) + std::ref(shadow_index_cache), + std::ref(_feature_table)) .get(); vlog(_log.info, "Partition manager started");