Skip to content

Commit

Permalink
rm_stm: add feature_table as a dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jul 9, 2022
1 parent 96db033 commit 85031e6
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 24 deletions.
6 changes: 4 additions & 2 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ partition::partition(
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _tx_gateway_frontend(tx_gateway_frontend)
, _feature_table(feature_table)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
config::shard_local_cfg().enable_idempotence.value()) {
Expand Down Expand Up @@ -70,7 +72,7 @@ partition::partition(

if (has_rm_stm) {
_rm_stm = ss::make_shared<cluster::rm_stm>(
clusterlog, _raft.get(), _tx_gateway_frontend);
clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table);
stm_manager->add_stm(_rm_stm);
}

Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cloud_storage/remote_partition.h"
#include "cluster/archival_metadata_stm.h"
#include "cluster/feature_table.h"
#include "cluster/id_allocator_stm.h"
#include "cluster/partition_probe.h"
#include "cluster/rm_stm.h"
Expand Down Expand Up @@ -44,7 +45,8 @@ class partition {
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

raft::group_id group() const { return _raft->group(); }
ss::future<> start();
Expand Down Expand Up @@ -290,6 +292,7 @@ class partition {
ss::abort_source _as;
partition_probe _probe;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<feature_table>& _feature_table;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::lw_shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ partition_manager::partition_manager(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::partition_recovery_manager>& recovery_mgr,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _storage(storage.local())
, _raft_manager(raft)
, _tx_gateway_frontend(tx_gateway_frontend)
, _partition_recovery_mgr(recovery_mgr)
, _cloud_storage_api(cloud_storage_api)
, _cloud_storage_cache(cloud_storage_cache) {}
, _cloud_storage_cache(cloud_storage_cache)
, _feature_table(feature_table) {}

partition_manager::ntp_table_container
partition_manager::get_topic_partition_table(
Expand Down Expand Up @@ -120,7 +122,11 @@ ss::future<consensus_ptr> partition_manager::manage(
group, std::move(initial_nodes), log);

auto p = ss::make_lw_shared<partition>(
c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache);
c,
_tx_gateway_frontend,
_cloud_storage_api,
_cloud_storage_cache,
_feature_table);

_ntp_table.emplace(log.config().ntp(), p);
_raft_table.emplace(group, p);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_recovery_manager.h"
#include "cloud_storage/remote.h"
#include "cluster/feature_table.h"
#include "cluster/ntp_callbacks.h"
#include "cluster/partition.h"
#include "model/metadata.h"
Expand All @@ -37,7 +38,8 @@ class partition_manager {
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::partition_recovery_manager>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

using manage_cb_t
= ss::noncopyable_function<void(ss::lw_shared_ptr<partition>)>;
Expand Down Expand Up @@ -190,6 +192,7 @@ class partition_manager {
_partition_recovery_mgr;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<cloud_storage::cache>& _cloud_storage_cache;
ss::sharded<feature_table>& _feature_table;
ss::gate _gate;
bool _block_new_leadership{false};

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ struct tx_snapshot_v1 {
rm_stm::rm_stm(
ss::logger& logger,
raft::consensus* c,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend)
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<feature_table>& feature_table)
: persisted_stm("tx.snapshot", logger, c)
, _oldest_session(model::timestamp::now())
, _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value())
Expand All @@ -234,7 +235,8 @@ rm_stm::rm_stm(
, _abort_snapshot_mgr(
"abort.idx",
std::filesystem::path(c->log_config().work_directory()),
ss::default_priority_class()) {
ss::default_priority_class())
, _feature_table(feature_table) {
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
}
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "cluster/feature_table.h"
#include "cluster/persisted_stm.h"
#include "cluster/tx_utils.h"
#include "cluster/types.h"
Expand Down Expand Up @@ -151,7 +152,8 @@ class rm_stm final : public persisted_stm {
explicit rm_stm(
ss::logger&,
raft::consensus*,
ss::sharded<cluster::tx_gateway_frontend>&);
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<feature_table>&);

ss::future<checked<model::term_id, tx_errc>> begin_tx(
model::producer_identity, model::tx_seq, std::chrono::milliseconds);
Expand Down Expand Up @@ -523,6 +525,7 @@ class rm_stm final : public persisted_stm {
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
storage::snapshot_manager _abort_snapshot_mgr;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;
ss::sharded<feature_table>& _feature_table;
};

} // namespace cluster
43 changes: 36 additions & 7 deletions src/v/cluster/tests/idempotency_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "cluster/errc.h"
#include "cluster/feature_table.h"
#include "cluster/rm_stm.h"
#include "finjector/hbadger.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -36,7 +37,10 @@ FIXTURE_TEST(
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -81,14 +85,18 @@ FIXTURE_TEST(
raft::replicate_options(raft::consistency_level::quorum_ack))
.get0();
BOOST_REQUIRE((bool)r2);
feature_table.stop().get0();
}

FIXTURE_TEST(
test_rm_stm_passes_monotonic_in_session_messages, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -135,13 +143,17 @@ FIXTURE_TEST(
BOOST_REQUIRE((bool)r2);

BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset);
feature_table.stop().get0();
}

FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -200,13 +212,17 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) {
BOOST_REQUIRE((bool)r1);
BOOST_REQUIRE(r1.value().last_offset == offsets[i]);
}
feature_table.stop().get0();
}

FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -260,13 +276,17 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) {
r1
== failure_type<cluster::errc>(cluster::errc::sequence_out_of_order));
}
feature_table.stop().get0();
}

FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -312,14 +332,18 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) {
.get0();
BOOST_REQUIRE(
r2 == failure_type<cluster::errc>(cluster::errc::sequence_out_of_order));
feature_table.stop().get0();
}

FIXTURE_TEST(
test_rm_stm_prevents_odd_session_start_off, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -349,13 +373,17 @@ FIXTURE_TEST(
.get0();
BOOST_REQUIRE(
r == failure_type<cluster::errc>(cluster::errc::sequence_out_of_order));
feature_table.stop().get0();
}

FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) {
start_raft();

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend);
ss::sharded<cluster::feature_table> feature_table;
feature_table.start().get0();
cluster::rm_stm stm(
logger, _raft.get(), tx_gateway_frontend, feature_table);
stm.testing_only_disable_auto_abort();

stm.start().get0();
Expand Down Expand Up @@ -404,4 +432,5 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) {
BOOST_REQUIRE((bool)r1);
BOOST_REQUIRE((bool)r2);
BOOST_REQUIRE(r1.value().last_offset == r2.value().last_offset);
feature_table.stop().get0();
}
Loading

0 comments on commit 85031e6

Please sign in to comment.