Skip to content

Commit

Permalink
rm_stm: add feature_table as a dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jul 6, 2022
1 parent db5e18c commit 57d9a94
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 24 deletions.
6 changes: 4 additions & 2 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ partition::partition(
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _tx_gateway_frontend(tx_gateway_frontend)
, _feature_table(feature_table)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
config::shard_local_cfg().enable_idempotence.value())
Expand Down Expand Up @@ -71,7 +73,7 @@ partition::partition(

if (has_rm_stm) {
_rm_stm = ss::make_shared<cluster::rm_stm>(
clusterlog, _raft.get(), _tx_gateway_frontend);
clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table);
stm_manager->add_stm(_rm_stm);
}

Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cloud_storage/remote_partition.h"
#include "cluster/archival_metadata_stm.h"
#include "cluster/feature_table.h"
#include "cluster/id_allocator_stm.h"
#include "cluster/partition_probe.h"
#include "cluster/rm_stm.h"
Expand Down Expand Up @@ -44,7 +45,8 @@ class partition {
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

raft::group_id group() const { return _raft->group(); }
ss::future<> start();
Expand Down Expand Up @@ -290,6 +292,7 @@ class partition {
ss::abort_source _as;
partition_probe _probe;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<feature_table>& _feature_table;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::lw_shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ partition_manager::partition_manager(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::partition_recovery_manager>& recovery_mgr,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _storage(storage.local())
, _raft_manager(raft)
, _tx_gateway_frontend(tx_gateway_frontend)
, _partition_recovery_mgr(recovery_mgr)
, _cloud_storage_api(cloud_storage_api)
, _cloud_storage_cache(cloud_storage_cache) {}
, _cloud_storage_cache(cloud_storage_cache)
, _feature_table(feature_table) {}

partition_manager::ntp_table_container
partition_manager::get_topic_partition_table(
Expand Down Expand Up @@ -120,7 +122,11 @@ ss::future<consensus_ptr> partition_manager::manage(
group, std::move(initial_nodes), log);

auto p = ss::make_lw_shared<partition>(
c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache);
c,
_tx_gateway_frontend,
_cloud_storage_api,
_cloud_storage_cache,
_feature_table);

_ntp_table.emplace(log.config().ntp(), p);
_raft_table.emplace(group, p);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_recovery_manager.h"
#include "cloud_storage/remote.h"
#include "cluster/feature_table.h"
#include "cluster/ntp_callbacks.h"
#include "cluster/partition.h"
#include "model/metadata.h"
Expand All @@ -37,7 +38,8 @@ class partition_manager {
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::partition_recovery_manager>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

using manage_cb_t
= ss::noncopyable_function<void(ss::lw_shared_ptr<partition>)>;
Expand Down Expand Up @@ -190,6 +192,7 @@ class partition_manager {
_partition_recovery_mgr;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<cloud_storage::cache>& _cloud_storage_cache;
ss::sharded<feature_table>& _feature_table;
ss::gate _gate;
bool _block_new_leadership{false};

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ struct tx_snapshot_v1 {
rm_stm::rm_stm(
ss::logger& logger,
raft::consensus* c,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend)
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<feature_table>& feature_table)
: persisted_stm("tx.snapshot", logger, c)
, _oldest_session(model::timestamp::now())
, _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value())
Expand All @@ -235,7 +236,8 @@ rm_stm::rm_stm(
"abort.idx",
std::filesystem::path(c->log_config().work_directory()),
ss::default_priority_class())
, _translator(c->get_offset_translator_state()) {
, _translator(c->get_offset_translator_state())
, _feature_table(feature_table) {
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
}
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "cluster/feature_table.h"
#include "cluster/persisted_stm.h"
#include "cluster/tx_utils.h"
#include "cluster/types.h"
Expand Down Expand Up @@ -151,7 +152,8 @@ class rm_stm final : public persisted_stm {
explicit rm_stm(
ss::logger&,
raft::consensus*,
ss::sharded<cluster::tx_gateway_frontend>&);
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<feature_table>&);

ss::future<checked<model::term_id, tx_errc>> begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
Expand Down Expand Up @@ -521,6 +523,7 @@ class rm_stm final : public persisted_stm {
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
storage::snapshot_manager _abort_snapshot_mgr;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;
ss::sharded<feature_table>& _feature_table;
};

} // namespace cluster
36 changes: 29 additions & 7 deletions src/v/cluster/tests/idempotency_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "cluster/errc.h"
#include "cluster/feature_table.h"
#include "cluster/rm_stm.h"
#include "finjector/hbadger.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -36,7 +37,10 @@ FIXTURE_TEST(
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -88,7 +92,10 @@ FIXTURE_TEST(
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -141,7 +148,10 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -206,7 +216,10 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -266,7 +279,10 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -319,7 +335,10 @@ FIXTURE_TEST(
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -355,7 +374,10 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down
31 changes: 25 additions & 6 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "cluster/errc.h"
#include "cluster/feature_table.h"
#include "cluster/rm_stm.h"
#include "finjector/hbadger.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down Expand Up @@ -138,7 +142,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down Expand Up @@ -213,7 +220,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down Expand Up @@ -292,7 +302,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down Expand Up @@ -329,7 +342,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down Expand Up @@ -386,7 +402,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();
stm.testing_only_enable_transactions();

Expand Down
3 changes: 2 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ void application::wire_up_redpanda_services() {
std::ref(tx_gateway_frontend),
std::ref(partition_recovery_manager),
std::ref(cloud_storage_api),
std::ref(shadow_index_cache))
std::ref(shadow_index_cache),
std::ref(_feature_table))
.get();
vlog(_log.info, "Partition manager started");

Expand Down

0 comments on commit 57d9a94

Please sign in to comment.