From 911fe4906f319c47730e0851aec08016c99c4e7b Mon Sep 17 00:00:00 2001 From: Vadim Plakhtinskiy Date: Fri, 24 Jun 2022 12:17:31 +0200 Subject: [PATCH 01/12] cluster: move feature_table outside controller (cherry picked from commit 7862d4bebc0279bcd636dc0b19096fa547b8e1fc) Conflicts: src/v/cluster/controller.cc src/v/cluster/controller.h src/v/redpanda/application.cc --- src/v/cluster/controller.cc | 8 ++++---- src/v/cluster/controller.h | 5 +++-- src/v/redpanda/application.cc | 7 ++++++- src/v/redpanda/application.h | 1 + 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 39ee5afa80d5..7b9c305c3e08 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -56,7 +56,8 @@ controller::controller( ss::sharded& storage, ss::sharded& storage_node, ss::sharded& raft_manager, - ss::sharded& data_policy_table) + ss::sharded& data_policy_table, + ss::sharded& feature_table) : _config_preload(std::move(config_preload)) , _connections(ccache) , _partition_manager(pm) @@ -66,12 +67,12 @@ controller::controller( , _tp_updates_dispatcher(_partition_allocator, _tp_state, _partition_leaders) , _security_manager(_credentials, _authorizer) , _data_policy_manager(data_policy_table) - , _raft_manager(raft_manager) {} + , _raft_manager(raft_manager) + , _feature_table(feature_table) {} ss::future<> controller::wire_up() { return _as.start() .then([this] { return _members_table.start(); }) - .then([this] { return _feature_table.start(); }) .then([this] { return _partition_allocator.start_single( std::ref(_members_table), @@ -362,7 +363,6 @@ ss::future<> controller::stop() { .then([this] { return _drain_manager.stop(); }) .then([this] { return _partition_allocator.stop(); }) .then([this] { return _partition_leaders.stop(); }) - .then([this] { return _feature_table.stop(); }) .then([this] { return _members_table.stop(); }) .then([this] { return _as.stop(); }); }); diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index ccd43282c482..b76389464472 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -44,7 +44,8 @@ class controller { ss::sharded& storage, ss::sharded& storage_node, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); model::node_id self() { return _raft0->self().id(); } ss::sharded& get_topics_frontend() { return _tp_frontend; } @@ -158,7 +159,7 @@ class controller { ss::sharded _metrics_reporter; ss::sharded _feature_manager; // single instance ss::sharded _feature_backend; // instance per core - ss::sharded _feature_table; // instance per core + ss::sharded& _feature_table; // instance per core std::unique_ptr _leader_balancer; consensus_ptr _raft0; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 68206833e3fb..129b0d5ea46b 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -661,6 +661,9 @@ void application::wire_up_redpanda_services() { cloud_configs.stop().get(); } + syschecks::systemd_message("Creating feature table").get(); + construct_service(_feature_table).get(); + syschecks::systemd_message("Adding partition manager").get(); construct_service( partition_manager, @@ -690,7 +693,8 @@ void application::wire_up_redpanda_services() { storage, storage_node, std::ref(raft_group_manager), - data_policies); + data_policies, + std::ref(_feature_table)); controller->wire_up().get0(); syschecks::systemd_message("Creating kafka metadata cache").get(); @@ -803,6 +807,7 @@ void application::wire_up_redpanda_services() { make_upload_controller_config(_scheduling_groups.archival_upload())) .get(); } + // group membership syschecks::systemd_message("Creating partition manager").get(); construct_service( diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 00608e1e0dca..454e03344916 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -158,6 +158,7 @@ class application { ss::logger _log; ss::sharded _connection_cache; + ss::sharded _feature_table; ss::sharded _group_manager; ss::sharded _co_group_manager; ss::sharded _rpc; From b78ba3f0c753e72213a7992c638d66beab309b1a Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 13:55:55 -0700 Subject: [PATCH 02/12] k/produce: do not use unknown_server_error Kafka client doesn't process unknown_server_error correctly and it may lead to duplicates violating the idempotency. See the following issue for more info: https://issues.apache.org/jira/browse/KAFKA-14034 request_timed_out just like unknown_server_error means that the true outcome of the operation is unknown and unlike unknown_server_error it doesn't cause the problem so switching to using it to avoid the problem (cherry picked from commit 1a72446ab268ef66db7ac5b3baca754f5e7b7cb2) --- src/v/kafka/server/handlers/produce.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 069a17dd5be2..63cd6d5590a3 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -134,7 +134,7 @@ static error_code map_produce_error_code(std::error_code ec) { case raft::errc::shutting_down: return error_code::request_timed_out; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } @@ -152,11 +152,11 @@ static error_code map_produce_error_code(std::error_code ec) { case cluster::errc::invalid_request: return error_code::invalid_request; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } - return error_code::unknown_server_error; + return error_code::request_timed_out; } /* @@ -193,7 +193,7 @@ static partition_produce_stages partition_append( p.error_code = map_produce_error_code(r.error()); } } catch (...) { - p.error_code = error_code::unknown_server_error; + p.error_code = error_code::request_timed_out; } return p; }), From c803886c80055486fd622fc4f7a786ec80f9737f Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 8 Jul 2022 21:05:35 -0700 Subject: [PATCH 03/12] cluster: remove dead code (cherry picked from commit 1dfd8d963ea88c08c339727c2001dc3ba6f44624) --- src/v/cluster/partition.cc | 53 -------------------------------------- src/v/cluster/partition.h | 5 ---- 2 files changed, 58 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 8350845f7d95..806c69a19a7e 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -187,59 +187,6 @@ raft::replicate_stages partition::replicate_in_stages( } } -ss::future> partition::replicate( - model::batch_identity bid, - model::record_batch_reader&& r, - raft::replicate_options opts) { - if (bid.is_transactional) { - if (!_is_tx_enabled) { - vlog( - clusterlog.error, - "Can't process a transactional request to {}. Transactional " - "processing isn't enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support transactional processing.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - } - - if (bid.has_idempotent()) { - if (!_is_idempotence_enabled) { - vlog( - clusterlog.error, - "Can't process an idempotent request to {}. Idempotency isn't " - "enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support idempotency.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - } - - if (_rm_stm) { - return _rm_stm->replicate(bid, std::move(r), opts); - } else { - return _raft->replicate(std::move(r), opts); - } -} - ss::future<> partition::start() { auto ntp = _raft->ntp(); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 31ef78b983b8..855cf0c58faa 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -59,11 +59,6 @@ class partition { ss::future> replicate( model::term_id, model::record_batch_reader&&, raft::replicate_options); - ss::future> replicate( - model::batch_identity, - model::record_batch_reader&&, - raft::replicate_options); - raft::replicate_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, From fc25ff58792f942a2bc1fac7aa9f3fda3931e777 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 15:16:30 -0700 Subject: [PATCH 04/12] cluster: prepare partition for translating offset Update all partition::replicate dependees which don't perform offset translation to bypass it via a direct raft reference (cherry picked from commit 67a3112d7359fc2d2d62befed416f11dfdd48853) Conflicts: src/v/cluster/partition.h src/v/cluster/partition_probe.cc src/v/kafka/server/group.cc --- src/v/cluster/partition.cc | 12 ------------ src/v/cluster/partition.h | 12 +----------- src/v/cluster/partition_probe.cc | 5 +++-- src/v/cluster/tests/partition_moving_test.cc | 2 +- src/v/cluster/tests/rebalancing_tests_fixture.h | 2 +- src/v/coproc/tests/fixtures/fiber_mock_fixture.cc | 2 +- src/v/kafka/server/group.cc | 14 +++++++------- src/v/kafka/server/group_metadata_migration.cc | 1 + src/v/kafka/server/tests/fetch_test.cc | 8 ++++---- src/v/kafka/server/tests/topic_recreate_test.cc | 2 +- 10 files changed, 20 insertions(+), 40 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 806c69a19a7e..4a5d70a878f7 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -107,18 +107,6 @@ ss::future> partition::replicate( return _raft->replicate(std::move(r), opts); } -raft::replicate_stages partition::replicate_in_stages( - model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate_in_stages(std::move(r), opts); -} - -ss::future> partition::replicate( - model::term_id term, - model::record_batch_reader&& r, - raft::replicate_options opts) { - return _raft->replicate(term, std::move(r), opts); -} - ss::shared_ptr partition::rm_stm() { if (!_rm_stm) { if (!_is_tx_enabled && !_is_idempotence_enabled) { diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 855cf0c58faa..36f4af197ee2 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -53,12 +53,6 @@ class partition { ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages - replicate_in_stages(model::record_batch_reader&&, raft::replicate_options); - - ss::future> replicate( - model::term_id, model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, @@ -274,11 +268,7 @@ class partition { std::optional get_cloud_term_last_offset(model::term_id term) const; -private: - friend partition_manager; - friend replicated_partition_probe; - - consensus_ptr raft() { return _raft; } + consensus_ptr raft() const { return _raft; } private: consensus_ptr _raft; diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 225d65021f86..64f64b7c1038 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -74,7 +74,7 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { sm::make_gauge( "leader_id", [this] { - return _partition._raft->get_leader_id().value_or( + return _partition.raft()->get_leader_id().value_or( model::node_id(-1)); }, sm::description("Id of current partition leader"), @@ -82,7 +82,7 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { sm::make_gauge( "under_replicated_replicas", [this] { - auto metrics = _partition._raft->get_follower_metrics(); + auto metrics = _partition.raft()->get_follower_metrics(); return std::count_if( metrics.cbegin(), metrics.cend(), @@ -114,6 +114,7 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { labels), }); } + partition_probe make_materialized_partition_probe() { // TODO: implement partition probe for materialized partitions class impl : public partition_probe::impl { diff --git a/src/v/cluster/tests/partition_moving_test.cc b/src/v/cluster/tests/partition_moving_test.cc index d2b746f69c6d..be7f170f3668 100644 --- a/src/v/cluster/tests/partition_moving_test.cc +++ b/src/v/cluster/tests/partition_moving_test.cc @@ -319,7 +319,7 @@ class partition_assignment_test_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index a707fe2cb5de..709e3225a9d7 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -159,7 +159,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc index 528c78f41417..c2bc47e6651b 100644 --- a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc +++ b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc @@ -178,7 +178,7 @@ ss::future> fiber_mock_fixture::make_source( auto batch = make_random_batch(params.records_per_input); co_await tests::cooperative_spin_wait_with_timeout( 2s, [partition]() { return partition->is_elected_leader(); }); - auto r = co_await partition->replicate( + auto r = co_await partition->raft()->replicate( std::move(batch), raft::replicate_options(raft::consistency_level::leader_ack)); vassert(!r.has_error(), "Write error: {}", r.error()); diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index c6262d037c85..3760ae7867af 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1438,7 +1438,7 @@ group::commit_tx(cluster::commit_group_tx_request r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1521,7 +1521,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { r.pid, std::move(fence)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1636,7 +1636,7 @@ group::prepare_tx(cluster::prepare_group_tx_request r) { std::move(tx_entry)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1732,7 +1732,7 @@ group::abort_tx(cluster::abort_group_tx_request r) { std::move(tx)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1853,7 +1853,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto batch = std::move(builder).build(); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto replicate_stages = _partition->replicate_in_stages( + auto replicate_stages = _partition->raft()->replicate_in_stages( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2202,7 +2202,7 @@ ss::future group::remove() { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2282,7 +2282,7 @@ group::remove_topic_partitions(const std::vector& tps) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index 1e6e9454cbb3..73e663d66e2e 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -332,6 +332,7 @@ ss::future replicate( [ntp = std::move(ntp), f_reader = std::move(f_reader)](cluster::partition_manager& pm) mutable { return pm.get(ntp) + ->raft() ->replicate( std::move(f_reader), raft::replicate_options(raft::consistency_level::quorum_ack)) diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index a0846db85428..9cceb213cf4d 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -418,7 +418,7 @@ FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -483,7 +483,7 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -563,7 +563,7 @@ FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -615,7 +615,7 @@ FIXTURE_TEST(fetch_request_max_bytes, redpanda_thread_fixture) { model::offset(0), 20); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); }) diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index f1644b525f76..bb74de56c717 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -266,7 +266,7 @@ FIXTURE_TEST(test_recreated_topic_does_not_lose_data, recreate_test_fixture) { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); auto p = pm.get(ntp); - return p + return p->raft() ->replicate( std::move(rdr), raft::replicate_options( From 394c144adc7f21d0ab4bf3ec187e93e0515af856 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:36:29 -0700 Subject: [PATCH 05/12] k/group: avoid ABA problem Updating consumer groups to use conditional replication to prevent a situation when after a check a leadership jumps away, invalidates the check, jumps back just in time for the post check replication. check condition leadership goes to a new node the node replicates something which invalidates the conditions the leadership jumps back the node successfully replicates assuming that the condition is true Switched to a conditional replicate to fix the problem. When a group manager detects a leadership change it replays the group's records to reconstruct the groups state. We cache the current term in the state and use it as a condition on replicate. In this case we know that if the leadership bounce the replication won't pass. (cherry picked from commit e693bead59ec286596f23f29b50a9f49cb6ceb9a) Conflicts: src/v/kafka/server/group.cc --- src/v/kafka/server/group.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 3760ae7867af..355832a699c6 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -863,6 +863,7 @@ void group::complete_join() { std::move(batch)); (void)_partition ->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)) .then([]([[maybe_unused]] result r) {}); @@ -1209,6 +1210,7 @@ ss::future group::sync_group_completing_rebalance( return _partition ->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)) .then([this, @@ -1854,6 +1856,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto replicate_stages = _partition->raft()->replicate_in_stages( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2203,6 +2206,7 @@ ss::future group::remove() { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2283,6 +2287,7 @@ group::remove_topic_partitions(const std::vector& tps) { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { From 8138d07ffba2189e7ad98e890e129d04bf3d8572 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:26 -0700 Subject: [PATCH 06/12] c/types: introduce kafka offset types We're going to mix raft and kafka offset in the same class, since both the offsets uses the same type it's easy to make an error and treat one as it was another. Introducing kafka offset to rely on the type system to prevent such errors. (cherry picked from commit 93350756123f6883192a0fb6bb2b123f394e9a4b) Conflicts: src/v/cluster/types.h --- src/v/cluster/types.cc | 10 ++++++++++ src/v/cluster/types.h | 14 ++++++++++++++ src/v/model/fundamental.h | 6 ++++++ 3 files changed, 30 insertions(+) diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 52609f83f0b9..1c3b92aaae11 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -29,6 +29,16 @@ namespace cluster { +kafka_stages::kafka_stages( + ss::future<> enq, ss::future> offset_future) + : request_enqueued(std::move(enq)) + , replicate_finished(std::move(offset_future)) {} + +kafka_stages::kafka_stages(raft::errc ec) + : request_enqueued(ss::now()) + , replicate_finished( + ss::make_ready_future>(make_error_code(ec))){}; + bool topic_properties::is_compacted() const { if (!cleanup_policy_bitflags) { return false; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 4df0e8e2b643..eb5f393c56a1 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -132,6 +132,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept { return std::error_code(static_cast(e), tx_error_category()); } +struct kafka_result { + kafka::offset last_offset; +}; +struct kafka_stages { + kafka_stages(ss::future<>, ss::future>); + explicit kafka_stages(raft::errc); + // after this future is ready, request in enqueued in raft and it will not + // be reorderd + ss::future<> request_enqueued; + // after this future is ready, request was successfully replicated with + // requested consistency level + ss::future> replicate_finished; +}; + struct try_abort_request { model::partition_id tm; model::producer_identity pid; diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 018a0c535dbf..9fbeadc57db1 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -27,6 +27,12 @@ #include #include +namespace kafka { + +using offset = named_type; + +} // namespace kafka + namespace model { // Named after Kafka cleanup.policy topic property From 3594b59d5b7d910fe9df7c1d032574b3d6dd7bd6 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:39 -0700 Subject: [PATCH 07/12] cluster: shift offset translation to partition Shifting offset translation down the abstraction well to eventually reach rm_stm (cherry picked from commit e3d24d951206fe51a68e0cf0c1965e0525ad07f7) --- src/v/cluster/partition.cc | 42 ++++++++++++++++------ src/v/cluster/partition.h | 5 +-- src/v/kafka/server/group.cc | 4 +-- src/v/kafka/server/replicated_partition.cc | 17 +++++---- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4a5d70a878f7..4004498c2d47 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -102,9 +102,15 @@ partition::partition( } } -ss::future> partition::replicate( +ss::future> partition::replicate( model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate(std::move(r), opts); + using ret_t = result; + auto res = co_await _raft->replicate(std::move(r), opts); + if (!res) { + co_return ret_t(res.error()); + } + co_return ret_t(kafka_result{ + kafka::offset(_translator->from_log_offset(res.value().last_offset)())}); } ss::shared_ptr partition::rm_stm() { @@ -126,10 +132,11 @@ ss::shared_ptr partition::rm_stm() { return _rm_stm; } -raft::replicate_stages partition::replicate_in_stages( +kafka_stages partition::replicate_in_stages( model::batch_identity bid, model::record_batch_reader&& r, raft::replicate_options opts) { + using ret_t = result; if (bid.is_transactional) { if (!_is_tx_enabled) { vlog( @@ -137,7 +144,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process a transactional request to {}. Transactional " "processing isn't enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -145,7 +152,7 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support transactional processing.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } @@ -156,7 +163,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process an idempotent request to {}. Idempotency isn't " "enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -164,15 +171,29 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support idempotency.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } + ss::lw_shared_ptr res; if (_rm_stm) { - return _rm_stm->replicate_in_stages(bid, std::move(r), opts); + res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { - return _raft->replicate_in_stages(std::move(r), opts); + res = _raft->replicate_in_stages(std::move(r), opts); } + + auto replicate_finished = res->replicate_finished.then( + [this](result r) { + if (!r) { + return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = kafka::offset( + _translator->from_log_offset(old_offset)()); + return ret_t(kafka_result{new_offset}); + }); + return kafka_stages( + std::move(res->request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { @@ -180,7 +201,8 @@ ss::future<> partition::start() { _probe.setup_metrics(ntp); - auto f = _raft->start(); + auto f = _raft->start().then( + [this] { _translator = _raft->get_offset_translator_state(); }); if (is_id_allocator_topic(ntp)) { return f.then([this] { return _id_allocator_stm->start(); }); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 36f4af197ee2..540a6cc804ab 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -50,10 +50,10 @@ class partition { ss::future<> start(); ss::future<> stop(); - ss::future> + ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); @@ -283,6 +283,7 @@ class partition { bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; + ss::lw_shared_ptr _translator; friend std::ostream& operator<<(std::ostream& o, const partition& x); }; diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 355832a699c6..0f0cffee1a5d 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -861,7 +861,7 @@ void group::complete_join() { auto batch = checkpoint(assignments_type{}); auto reader = model::make_memory_record_batch_reader( std::move(batch)); - (void)_partition + (void)_partition->raft() ->replicate( _term, std::move(reader), @@ -1208,7 +1208,7 @@ ss::future group::sync_group_completing_rebalance( auto batch = checkpoint(assignments); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - return _partition + return _partition->raft() ->replicate( _term, std::move(reader), diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 1082ff0bb365..8ede57ffe970 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -163,11 +163,11 @@ ss::future> replicated_partition::replicate( model::record_batch_reader rdr, raft::replicate_options opts) { using ret_t = result; return _partition->replicate(std::move(rdr), opts) - .then([this](result r) { + .then([](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->from_log_offset(r.value().last_offset)); + return ret_t(model::offset(r.value().last_offset())); }); } @@ -177,15 +177,18 @@ raft::replicate_stages replicated_partition::replicate( raft::replicate_options opts) { using ret_t = result; auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); - res.replicate_finished = res.replicate_finished.then( - [this](result r) { + + raft::replicate_stages out(raft::errc::success); + out.request_enqueued = std::move(res.request_enqueued); + out.replicate_finished = res.replicate_finished.then( + [](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(raft::replicate_result{ - _translator->from_log_offset(r.value().last_offset)}); + return ret_t( + raft::replicate_result{model::offset(r.value().last_offset())}); }); - return res; + return out; } std::optional replicated_partition::get_leader_epoch_last_offset( From d0ef36ee285e71700d741cd2853ff95db2f0e7e6 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:48:55 -0700 Subject: [PATCH 08/12] rm_stm: prepare to use kafka::offset based cache Preparing rm_stm to use kafka::offset based seq-offset cache. Right now it uses raft offsets but there is a problem with it: once the cache items become older that the head of the log (eviction) panda becomes unable to use offset translation so we need to store already translated offsets. Since the cache is persisted as a part the snapshot so we need to change the disk format and provide backward compatibility. The change is splitted into two commits. Current commit introduces types to represent old format seq_cache_entry_v1 and tx_snapshot_v1 and adds compatibility machinary to convert old snapshot (tx_snapshot_v1) to new snapshot (tx_snapshot). The follow up commit updates the default types to use new format and updates the mapping between old and default types. (cherry picked from commit 63c5883435da22b30f2fbc772932766a9b2e7465) Conflicts: src/v/cluster/rm_stm.cc --- src/v/cluster/rm_stm.cc | 136 +++++++++++++++++++++++++++++++--------- src/v/cluster/rm_stm.h | 7 ++- 2 files changed, 111 insertions(+), 32 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 820948743b07..0ef3bdc3cdec 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -187,6 +187,31 @@ struct tx_snapshot_v0 { std::vector seqs; }; +struct seq_cache_entry_v1 { + int32_t seq{-1}; + model::offset offset; +}; + +struct seq_entry_v1 { + model::producer_identity pid; + int32_t seq{-1}; + model::offset last_offset{-1}; + ss::circular_buffer seq_cache; + model::timestamp::type last_write_timestamp; +}; + +struct tx_snapshot_v1 { + static constexpr uint8_t version = 1; + + std::vector fenced; + std::vector ongoing; + std::vector prepared; + std::vector aborted; + std::vector abort_indexes; + model::offset offset; + std::vector seqs; +}; + rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, @@ -1812,14 +1837,35 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { iobuf_parser data_parser(std::move(tx_ss_buf)); if (hdr.version == tx_snapshot::version) { data = reflection::adl{}.from(data_parser); + } else if (hdr.version == tx_snapshot_v1::version) { + auto data_v1 = reflection::adl{}.from(data_parser); + data.fenced = std::move(data_v1.fenced); + data.ongoing = std::move(data_v1.ongoing); + data.prepared = std::move(data_v1.prepared); + data.aborted = std::move(data_v1.aborted); + data.abort_indexes = std::move(data_v1.abort_indexes); + data.offset = std::move(data_v1.offset); + for (auto& seq_v1 : data_v1.seqs) { + seq_entry seq; + seq.pid = seq_v1.pid; + seq.seq = seq_v1.seq; + seq.last_offset = seq_v1.last_offset; + seq.seq_cache.reserve(seq_v1.seq_cache.size()); + for (auto& item : seq_v1.seq_cache) { + seq.seq_cache.push_back( + seq_cache_entry{.seq = item.seq, .offset = item.offset}); + } + seq.last_write_timestamp = seq_v1.last_write_timestamp; + data.seqs.push_back(std::move(seq)); + } } else if (hdr.version == tx_snapshot_v0::version) { auto data_v0 = reflection::adl{}.from(data_parser); - data.fenced = data_v0.fenced; - data.ongoing = data_v0.ongoing; - data.prepared = data_v0.prepared; - data.aborted = data_v0.aborted; - data.abort_indexes = data_v0.abort_indexes; - data.offset = data_v0.offset; + data.fenced = std::move(data_v0.fenced); + data.ongoing = std::move(data_v0.ongoing); + data.prepared = std::move(data_v0.prepared); + data.aborted = std::move(data_v0.aborted); + data.abort_indexes = std::move(data_v0.abort_indexes); + data.offset = std::move(data_v0.offset); for (auto seq_v0 : data_v0.seqs) { auto seq = seq_entry{ .pid = seq_v0.pid, @@ -1879,6 +1925,27 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _insync_offset = data.offset; } +uint8_t rm_stm::active_snapshot_version() { return tx_snapshot_v1::version; } + +template +void rm_stm::fill_snapshot_wo_seqs(T& snapshot) { + for (auto const& [k, v] : _log_state.fence_pid_epoch) { + snapshot.fenced.push_back(model::producer_identity{k(), v()}); + } + for (auto& entry : _log_state.ongoing_map) { + snapshot.ongoing.push_back(entry.second); + } + for (auto& entry : _log_state.prepared) { + snapshot.prepared.push_back(entry.second); + } + for (auto& entry : _log_state.aborted) { + snapshot.aborted.push_back(entry); + } + for (auto& entry : _log_state.abort_indexes) { + snapshot.abort_indexes.push_back(entry); + } +} + ss::future rm_stm::take_snapshot() { if (_log_state.aborted.size() > _abort_index_segment_size) { std::sort( @@ -1904,34 +1971,41 @@ ss::future rm_stm::take_snapshot() { _log_state.aborted = snapshot.aborted; } - tx_snapshot tx_ss; - - for (auto const& [k, v] : _log_state.fence_pid_epoch) { - tx_ss.fenced.push_back( - model::producer_identity{.id = k(), .epoch = v()}); - } - for (auto& entry : _log_state.ongoing_map) { - tx_ss.ongoing.push_back(entry.second); - } - for (auto& entry : _log_state.prepared) { - tx_ss.prepared.push_back(entry.second); - } - for (auto& entry : _log_state.aborted) { - tx_ss.aborted.push_back(entry); - } - for (auto& entry : _log_state.abort_indexes) { - tx_ss.abort_indexes.push_back(entry); - } - for (const auto& entry : _log_state.seq_table) { - tx_ss.seqs.push_back(entry.second.copy()); - } - tx_ss.offset = _insync_offset; - iobuf tx_ss_buf; - reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + auto version = active_snapshot_version(); + if (version == tx_snapshot::version) { + tx_snapshot tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& entry : _log_state.seq_table) { + tx_ss.seqs.push_back(entry.second.copy()); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else if (version == tx_snapshot_v1::version) { + tx_snapshot_v1 tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& it : _log_state.seq_table) { + auto& entry = it.second; + seq_entry_v1 seqs; + seqs.pid = entry.pid; + seqs.seq = entry.seq; + seqs.last_offset = entry.last_offset; + seqs.last_write_timestamp = entry.last_write_timestamp; + seqs.seq_cache.reserve(seqs.seq_cache.size()); + for (auto& item : entry.seq_cache) { + seqs.seq_cache.push_back( + seq_cache_entry_v1{.seq = item.seq, .offset = item.offset}); + } + tx_ss.seqs.push_back(std::move(seqs)); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else { + vassert(false, "unsupported tx_snapshot version {}", version); + } co_return stm_snapshot::create( - tx_snapshot::version, _insync_offset, std::move(tx_ss_buf)); + version, _insync_offset, std::move(tx_ss_buf)); } ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) { diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 4801edd03238..e35c39893b0a 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -124,7 +124,7 @@ class rm_stm final : public persisted_stm { }; struct tx_snapshot { - static constexpr uint8_t version = 1; + static constexpr uint8_t version = 2; std::vector fenced; std::vector ongoing; @@ -493,6 +493,11 @@ class rm_stm final : public persisted_stm { std::optional get_expiration_info(model::producer_identity pid) const; + uint8_t active_snapshot_version(); + + template + void fill_snapshot_wo_seqs(T&); + ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; absl::flat_hash_map< From b7d1f6e34bfde391f830b4039d9ad108b520e950 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 22:54:49 -0700 Subject: [PATCH 09/12] rm_stm: shift offset translation to rm_stm switching to caching seq-kafka offsets cache to avoid out of range errors on translating offsets beyond the eviction point (cherry picked from commit 4b42c7eca8a8bfa0e17cd72a694ce982584626d4) --- src/v/cluster/partition.cc | 10 +- src/v/cluster/rm_stm.cc | 118 +++++++++++++++-------- src/v/cluster/rm_stm.h | 50 ++++++---- src/v/cluster/tests/idempotency_tests.cc | 2 +- 4 files changed, 118 insertions(+), 62 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4004498c2d47..0547135d0617 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -175,14 +175,12 @@ kafka_stages partition::replicate_in_stages( } } - ss::lw_shared_ptr res; if (_rm_stm) { - res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); - } else { - res = _raft->replicate_in_stages(std::move(r), opts); + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } - auto replicate_finished = res->replicate_finished.then( + auto res = _raft->replicate_in_stages(std::move(r), opts); + auto replicate_finished = res.replicate_finished.then( [this](result r) { if (!r) { return ret_t(r.error()); @@ -193,7 +191,7 @@ kafka_stages partition::replicate_in_stages( return ret_t(kafka_result{new_offset}); }); return kafka_stages( - std::move(res->request_enqueued), std::move(replicate_finished)); + std::move(res.request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 0ef3bdc3cdec..bedeab97cb6e 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -785,7 +785,7 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } -raft::replicate_stages rm_stm::replicate_in_stages( +kafka_stages rm_stm::replicate_in_stages( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -804,10 +804,10 @@ raft::replicate_stages rm_stm::replicate_in_stages( enqueued->set_value(); } }); - return raft::replicate_stages(std::move(f), std::move(replicate_finished)); + return kafka_stages(std::move(f), std::move(replicate_finished)); } -ss::future> rm_stm::replicate( +ss::future> rm_stm::replicate( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -824,7 +824,7 @@ rm_stm::transfer_leadership(std::optional target) { }); } -ss::future> rm_stm::do_replicate( +ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, raft::replicate_options opts, @@ -854,6 +854,11 @@ ss::future<> rm_stm::stop() { return raft::state_machine::stop(); } +ss::future<> rm_stm::start() { + _translator = _c->get_offset_translator_state(); + return persisted_stm::start(); +} + rm_stm::transaction_info::status_t rm_stm::get_tx_status(model::producer_identity pid) const { if (_mem_state.preparing.contains(pid)) { @@ -947,7 +952,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return false; } - seq.update(bid.last_seq, model::offset{-1}); + seq.update(bid.last_seq, kafka::offset{-1}); seq.pid = bid.pid; seq.last_write_timestamp = last_write_timestamp; @@ -957,7 +962,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return true; } -std::optional +std::optional rm_stm::known_seq(model::batch_identity bid) const { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq == _log_state.seq_table.end()) { @@ -982,7 +987,7 @@ std::optional rm_stm::tail_seq(model::producer_identity pid) const { return pid_seq->second.seq; } -void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { +void rm_stm::set_seq(model::batch_identity bid, kafka::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { if (pid_seq->second.seq == bid.last_seq) { @@ -995,14 +1000,14 @@ void rm_stm::reset_seq(model::batch_identity bid) { _log_state.seq_table.erase(bid.pid); auto& seq = _log_state.seq_table[bid.pid]; seq.seq = bid.last_seq; - seq.last_offset = model::offset{-1}; + seq.last_offset = kafka::offset{-1}; seq.pid = bid.pid; seq.last_write_timestamp = model::timestamp::now().value(); _oldest_session = std::min( _oldest_session, model::timestamp(seq.last_write_timestamp)); } -ss::future> +ss::future> rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { if (!check_tx_permitted()) { co_return errc::generic_tx_error; @@ -1056,7 +1061,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // this isn't the first attempt in the tx we should try dedupe auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + if (cached_offset.value() < kafka::offset{0}) { vlog( clusterlog.warn, "Status of the original attempt is unknown (still is " @@ -1070,8 +1075,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // to propagate it to the app layer co_return errc::generic_tx_error; } - co_return raft::replicate_result{ - .last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } if (!check_seq(bid)) { @@ -1107,26 +1111,28 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { expiration_it->second.last_update = clock_type::now(); expiration_it->second.is_expiration_requested = false; - auto replicated = r.value(); + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); - set_seq(bid, replicated.last_offset); + set_seq(bid, new_offset); - auto last_offset = model::offset(replicated.last_offset()); if (!_mem_state.tx_start.contains(bid.pid)) { - auto base_offset = model::offset( - last_offset() - (bid.record_count - 1)); + auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); _mem_state.tx_starts.insert(base_offset); _mem_state.estimated.erase(bid.pid); } - co_return replicated; + + co_return kafka_result{.last_offset = new_offset}; } -ss::future> rm_stm::replicate_seq( +ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1185,7 +1191,7 @@ ss::future> rm_stm::replicate_seq( // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - co_return raft::replicate_result{.last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } // checking if the request is already being processed @@ -1193,8 +1199,8 @@ ss::future> rm_stm::replicate_seq( if (inflight->last_seq == bid.last_seq && inflight->is_processing) { // found an inflight request, parking the current request // until the former is resolved - auto promise = ss::make_lw_shared< - available_promise>>(); + auto promise + = ss::make_lw_shared>>(); inflight->parked.push_back(promise); u.return_all(); co_return co_await promise->get_future(); @@ -1279,20 +1285,26 @@ ss::future> rm_stm::replicate_seq( // we don't need session->lock because we never interleave // access to is_processing and offset with sync point (await) request->is_processing = false; - request->r = r; + if (r) { + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + request->r = ret_t(kafka_result{new_offset}); + } else { + request->r = ret_t(r.error()); + } for (auto& pending : request->parked) { - pending->set_value(r); + pending->set_value(request->r); } request->parked.clear(); - if (!r) { + if (!request->r) { // if r was failed at the consensus level (not because has_failed) // it should guarantee that all follow up replication requests fail // too but just in case stepping down to minimize the risk if (_c->is_leader() && _c->term() == synced_term) { co_await _c->step_down(); } - co_return r; + co_return request->r; } // requests get into session->cache in seq order so when we iterate @@ -1324,13 +1336,15 @@ ss::future> rm_stm::replicate_seq( _inflight_requests.erase(bid.pid); } - co_return r; + co_return request->r; } -ss::future> rm_stm::replicate_msg( +ss::future> rm_stm::replicate_msg( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { co_return errc::not_leader; } @@ -1338,7 +1352,14 @@ ss::future> rm_stm::replicate_msg( auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); co_await std::move(ss.request_enqueued); enqueued->set_value(); - co_return co_await std::move(ss.replicate_finished); + auto r = co_await std::move(ss.replicate_finished); + + if (!r) { + co_return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + co_return ret_t(kafka_result{new_offset}); } model::offset rm_stm::last_stable_offset() { @@ -1785,12 +1806,13 @@ ss::future<> rm_stm::apply_control( void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) { if (bid.has_idempotent()) { auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + auto translated = from_log_offset(last_offset); if (inserted) { seq_it->second.pid = bid.pid; seq_it->second.seq = bid.last_seq; - seq_it->second.last_offset = last_offset; + seq_it->second.last_offset = translated; } else { - seq_it->second.update(bid.last_seq, last_offset); + seq_it->second.update(bid.last_seq, translated); } seq_it->second.last_write_timestamp = bid.first_timestamp.value(); _oldest_session = std::min(_oldest_session, bid.first_timestamp); @@ -1849,11 +1871,21 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { seq_entry seq; seq.pid = seq_v1.pid; seq.seq = seq_v1.seq; - seq.last_offset = seq_v1.last_offset; + try { + seq.last_offset = from_log_offset(seq_v1.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seq.seq_cache.reserve(seq_v1.seq_cache.size()); for (auto& item : seq_v1.seq_cache) { - seq.seq_cache.push_back( - seq_cache_entry{.seq = item.seq, .offset = item.offset}); + try { + seq.seq_cache.push_back(seq_cache_entry{ + .seq = item.seq, .offset = from_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } seq.last_write_timestamp = seq_v1.last_write_timestamp; data.seqs.push_back(std::move(seq)); @@ -1870,7 +1902,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { auto seq = seq_entry{ .pid = seq_v0.pid, .seq = seq_v0.seq, - .last_offset = model::offset{-1}, + .last_offset = kafka::offset{-1}, .last_write_timestamp = seq_v0.last_write_timestamp}; data.seqs.push_back(std::move(seq)); } @@ -1989,12 +2021,22 @@ ss::future rm_stm::take_snapshot() { seq_entry_v1 seqs; seqs.pid = entry.pid; seqs.seq = entry.seq; - seqs.last_offset = entry.last_offset; + try { + seqs.last_offset = to_log_offset(entry.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seqs.last_write_timestamp = entry.last_write_timestamp; seqs.seq_cache.reserve(seqs.seq_cache.size()); for (auto& item : entry.seq_cache) { - seqs.seq_cache.push_back( - seq_cache_entry_v1{.seq = item.seq, .offset = item.offset}); + try { + seqs.seq_cache.push_back(seq_cache_entry_v1{ + .seq = item.seq, .offset = to_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } tx_ss.seqs.push_back(std::move(seqs)); } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index e35c39893b0a..b3ba99d48138 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -22,6 +22,7 @@ #include "raft/logger.h" #include "raft/state_machine.h" #include "raft/types.h" +#include "storage/offset_translator_state.h" #include "storage/snapshot.h" #include "utils/available_promise.h" #include "utils/expiring_promise.h" @@ -75,14 +76,14 @@ class rm_stm final : public persisted_stm { struct seq_cache_entry { int32_t seq{-1}; - model::offset offset; + kafka::offset offset; }; struct seq_entry { static const int seq_cache_size = 5; model::producer_identity pid; int32_t seq{-1}; - model::offset last_offset{-1}; + kafka::offset last_offset{-1}; ss::circular_buffer seq_cache; model::timestamp::type last_write_timestamp; @@ -100,7 +101,7 @@ class rm_stm final : public persisted_stm { return ret; } - void update(int32_t new_seq, model::offset new_offset) { + void update(int32_t new_seq, kafka::offset new_offset) { if (new_seq < seq) { return; } @@ -110,7 +111,7 @@ class rm_stm final : public persisted_stm { return; } - if (seq >= 0 && last_offset >= model::offset{0}) { + if (seq >= 0 && last_offset >= kafka::offset{0}) { auto entry = seq_cache_entry{.seq = seq, .offset = last_offset}; seq_cache.push_back(entry); while (seq_cache.size() >= seq_entry::seq_cache_size) { @@ -170,12 +171,12 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + ss::future> replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options); @@ -185,6 +186,8 @@ class rm_stm final : public persisted_stm { ss::future<> stop() override; + ss::future<> start() override; + void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } void testing_only_enable_transactions() { _is_tx_enabled = true; } @@ -274,27 +277,27 @@ class rm_stm final : public persisted_stm { ss::future<> save_abort_snapshot(abort_snapshot); bool check_seq(model::batch_identity); - std::optional known_seq(model::batch_identity) const; - void set_seq(model::batch_identity, model::offset); + std::optional known_seq(model::batch_identity) const; + void set_seq(model::batch_identity, kafka::offset); void reset_seq(model::batch_identity); std::optional tail_seq(model::producer_identity) const; - ss::future> do_replicate( + ss::future> do_replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); - ss::future> replicate_seq( + ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> replicate_msg( + ss::future> replicate_msg( model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); @@ -424,10 +427,9 @@ class rm_stm final : public persisted_stm { // original request is replicated. struct inflight_request { int32_t last_seq{-1}; - result r = errc::success; + result r = errc::success; bool is_processing; - std::vector< - ss::lw_shared_ptr>>> + std::vector>>> parked; }; @@ -467,8 +469,7 @@ class rm_stm final : public persisted_stm { tail_seq = -1; } - std::optional> - known_seq(int32_t last_seq) const { + std::optional> known_seq(int32_t last_seq) const { for (auto& seq : cache) { if (seq->last_seq == last_seq && !seq->is_processing) { return seq->r; @@ -488,6 +489,20 @@ class rm_stm final : public persisted_stm { return lock_it->second; } + kafka::offset from_log_offset(model::offset old_offset) { + if (old_offset > model::offset{-1}) { + return kafka::offset(_translator->from_log_offset(old_offset)()); + } + return kafka::offset(old_offset()); + } + + model::offset to_log_offset(kafka::offset new_offset) { + if (new_offset > model::offset{-1}) { + return _translator->to_log_offset(model::offset(new_offset())); + } + return model::offset(new_offset()); + } + transaction_info::status_t get_tx_status(model::producer_identity pid) const; std::optional @@ -520,6 +535,7 @@ class rm_stm final : public persisted_stm { bool _is_tx_enabled{false}; ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; + ss::lw_shared_ptr _translator; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index e6e59be1e8df..f543f923bb42 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -150,7 +150,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { wait_for_confirmed_leader(); wait_for_meta_initialized(); - std::vector offsets; + std::vector offsets; auto count = 5; From 2161b46dc903be357c60c8300bc5432bb7bb38a7 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 19:16:14 -0700 Subject: [PATCH 10/12] rm_stm: remove dead code (cherry picked from commit 065fb54aeb28f38aecd67ccc2d26a6dbc5c9ef9f) --- src/v/cluster/rm_stm.h | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index b3ba99d48138..dc29fde773b4 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -410,18 +410,6 @@ class rm_stm final : public persisted_stm { } }; - struct request_id { - model::producer_identity pid; - int32_t seq; - - auto operator<=>(const request_id&) const = default; - - template - friend H AbslHashValue(H h, const request_id& bid) { - return H::combine(std::move(h), bid.pid, bid.seq); - } - }; - // When a request is retried while the first appempt is still // being replicated the retried request is parked until the // original request is replicated. From 5a95cb209d3fe5e7f250e45b12ee716563bc8d60 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 5 Jul 2022 21:54:40 -0700 Subject: [PATCH 11/12] rm_stm: add feature_table as a dependency (cherry picked from commit 90007628f63115bc322fc1731c582ec4b2fac237) --- src/v/cluster/partition.cc | 6 ++-- src/v/cluster/partition.h | 5 ++- src/v/cluster/partition_manager.cc | 12 +++++-- src/v/cluster/partition_manager.h | 5 ++- src/v/cluster/rm_stm.cc | 6 ++-- src/v/cluster/rm_stm.h | 5 ++- src/v/cluster/tests/idempotency_tests.cc | 43 ++++++++++++++++++++---- src/v/cluster/tests/rm_stm_tests.cc | 37 ++++++++++++++++---- src/v/redpanda/application.cc | 3 +- 9 files changed, 98 insertions(+), 24 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 0547135d0617..090616884b61 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -32,10 +32,12 @@ partition::partition( consensus_ptr r, ss::sharded& tx_gateway_frontend, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _raft(r) , _probe(std::make_unique(*this)) , _tx_gateway_frontend(tx_gateway_frontend) + , _feature_table(feature_table) , _is_tx_enabled(config::shard_local_cfg().enable_transactions.value()) , _is_idempotence_enabled( config::shard_local_cfg().enable_idempotence.value()) { @@ -70,7 +72,7 @@ partition::partition( if (has_rm_stm) { _rm_stm = ss::make_shared( - clusterlog, _raft.get(), _tx_gateway_frontend); + clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table); stm_manager->add_stm(_rm_stm); } diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 540a6cc804ab..8a16c1719fc6 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -13,6 +13,7 @@ #include "cloud_storage/remote_partition.h" #include "cluster/archival_metadata_stm.h" +#include "cluster/feature_table.h" #include "cluster/id_allocator_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" @@ -44,7 +45,8 @@ class partition { consensus_ptr r, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); raft::group_id group() const { return _raft->group(); } ss::future<> start(); @@ -280,6 +282,7 @@ class partition { ss::abort_source _as; partition_probe _probe; ss::sharded& _tx_gateway_frontend; + ss::sharded& _feature_table; bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index d68ca729f0ab..dd81f188b163 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -47,13 +47,15 @@ partition_manager::partition_manager( ss::sharded& tx_gateway_frontend, ss::sharded& recovery_mgr, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _storage(storage.local()) , _raft_manager(raft) , _tx_gateway_frontend(tx_gateway_frontend) , _partition_recovery_mgr(recovery_mgr) , _cloud_storage_api(cloud_storage_api) - , _cloud_storage_cache(cloud_storage_cache) {} + , _cloud_storage_cache(cloud_storage_cache) + , _feature_table(feature_table) {} partition_manager::ntp_table_container partition_manager::get_topic_partition_table( @@ -119,7 +121,11 @@ ss::future partition_manager::manage( group, std::move(initial_nodes), log); auto p = ss::make_lw_shared( - c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache); + c, + _tx_gateway_frontend, + _cloud_storage_api, + _cloud_storage_cache, + _feature_table); _ntp_table.emplace(log.config().ntp(), p); _raft_table.emplace(group, p); diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index fa66f5ea31a0..8f45302e4abc 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -14,6 +14,7 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/feature_table.h" #include "cluster/ntp_callbacks.h" #include "cluster/partition.h" #include "model/metadata.h" @@ -37,7 +38,8 @@ class partition_manager { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); using manage_cb_t = ss::noncopyable_function)>; @@ -190,6 +192,7 @@ class partition_manager { _partition_recovery_mgr; ss::sharded& _cloud_storage_api; ss::sharded& _cloud_storage_cache; + ss::sharded& _feature_table; ss::gate _gate; bool _block_new_leadership{false}; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index bedeab97cb6e..73e7fcee6f3e 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -215,7 +215,8 @@ struct tx_snapshot_v1 { rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, - ss::sharded& tx_gateway_frontend) + ss::sharded& tx_gateway_frontend, + ss::sharded& feature_table) : persisted_stm("tx.snapshot", logger, c) , _oldest_session(model::timestamp::now()) , _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value()) @@ -234,7 +235,8 @@ rm_stm::rm_stm( , _abort_snapshot_mgr( "abort.idx", std::filesystem::path(c->log_config().work_directory()), - ss::default_priority_class()) { + ss::default_priority_class()) + , _feature_table(feature_table) { if (!_is_tx_enabled) { _is_autoabort_enabled = false; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index dc29fde773b4..e8988fbe313d 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/feature_table.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -152,7 +153,8 @@ class rm_stm final : public persisted_stm { explicit rm_stm( ss::logger&, raft::consensus*, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future> begin_tx( model::producer_identity, model::tx_seq, std::chrono::milliseconds); @@ -524,6 +526,7 @@ class rm_stm final : public persisted_stm { ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; ss::lw_shared_ptr _translator; + ss::sharded& _feature_table; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index f543f923bb42..e19ff1bcda55 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -36,7 +37,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -81,6 +85,7 @@ FIXTURE_TEST( raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE((bool)r2); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -88,7 +93,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -135,13 +143,17 @@ FIXTURE_TEST( BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -200,13 +212,17 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE(r1.value().last_offset == offsets[i]); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -260,13 +276,17 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { r1 == failure_type(cluster::errc::sequence_out_of_order)); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -312,6 +332,7 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { .get0(); BOOST_REQUIRE( r2 == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -319,7 +340,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -349,13 +373,17 @@ FIXTURE_TEST( .get0(); BOOST_REQUIRE( r == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -404,4 +432,5 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset == r2.value().last_offset); + feature_table.stop().get0(); } diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index aec8388bd651..63b1147f2336 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -129,6 +133,7 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { BOOST_REQUIRE_EQUAL(aborted_txs.size(), 0); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -138,7 +143,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -204,6 +212,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -213,7 +222,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -285,6 +297,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // transactional writes of an unknown tx are rejected @@ -292,7 +305,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -322,6 +338,7 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } // begin fences off old transactions @@ -329,7 +346,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -379,6 +399,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(!(bool)offset_r); + feature_table.stop().get0(); } // transactional writes of an aborted tx are rejected @@ -386,7 +407,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -438,4 +462,5 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 129b0d5ea46b..9b9a97d689be 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -672,7 +672,8 @@ void application::wire_up_redpanda_services() { std::ref(tx_gateway_frontend), std::ref(partition_recovery_manager), std::ref(cloud_storage_api), - std::ref(shadow_index_cache)) + std::ref(shadow_index_cache), + std::ref(_feature_table)) .get(); vlog(_log.info, "Partition manager started"); From f7a13ffa1227b5fe578fc722706c8fb2df75eb35 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 5 Jul 2022 17:24:10 -0700 Subject: [PATCH 12/12] rm_stm: put kafka offset cache behind feature manager (cherry picked from commit 8e7346dba33421d9b2a0e6bca5daa0cf8e8d524f) Conflicts: src/v/cluster/feature_table.cc src/v/cluster/feature_table.h tests/rptest/tests/cluster_features_test.py --- src/v/cluster/feature_table.cc | 4 +++- src/v/cluster/feature_table.h | 7 +++++++ src/v/cluster/rm_stm.cc | 7 ++++++- tests/rptest/tests/cluster_features_test.py | 2 +- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/feature_table.cc b/src/v/cluster/feature_table.cc index b60aeb52c51d..f72021ac7cec 100644 --- a/src/v/cluster/feature_table.cc +++ b/src/v/cluster/feature_table.cc @@ -26,6 +26,8 @@ std::string_view to_string_view(feature f) { return "maintenance_mode"; case feature::mtls_authentication: return "mtls_authentication"; + case feature::rm_stm_kafka_cache: + return "rm_stm_kafka_cache"; case feature::test_alpha: return "__test_alpha"; } @@ -34,7 +36,7 @@ std::string_view to_string_view(feature f) { // The version that this redpanda node will report: increment this // on protocol changes to raft0 structures, like adding new services. -static constexpr cluster_version latest_version = cluster_version{3}; +static constexpr cluster_version latest_version = cluster_version{4}; feature_table::feature_table() { // Intentionally undocumented environment variable, only for use diff --git a/src/v/cluster/feature_table.h b/src/v/cluster/feature_table.h index 8b49c5eba19c..bb85b0309207 100644 --- a/src/v/cluster/feature_table.h +++ b/src/v/cluster/feature_table.h @@ -24,6 +24,7 @@ enum class feature : std::uint64_t { consumer_offsets = 0x2, maintenance_mode = 0x4, mtls_authentication = 0x8, + rm_stm_kafka_cache = 0x10, // Dummy features for testing only test_alpha = uint64_t(1) << 63, @@ -100,6 +101,12 @@ constexpr static std::array feature_schema{ feature::mtls_authentication, feature_spec::available_policy::explicit_only, feature_spec::prepare_policy::always}, + feature_spec{ + cluster_version{4}, + "rm_stm_kafka_cache", + feature::rm_stm_kafka_cache, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, feature_spec{ cluster_version{2001}, "__test_alpha", diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 73e7fcee6f3e..e135f32c26b0 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1959,7 +1959,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _insync_offset = data.offset; } -uint8_t rm_stm::active_snapshot_version() { return tx_snapshot_v1::version; } +uint8_t rm_stm::active_snapshot_version() { + if (_feature_table.local().is_active(feature::rm_stm_kafka_cache)) { + return tx_snapshot::version; + } + return tx_snapshot_v1::version; +} template void rm_stm::fill_snapshot_wo_seqs(T& snapshot) { diff --git a/tests/rptest/tests/cluster_features_test.py b/tests/rptest/tests/cluster_features_test.py index 66c9bb1f765c..0a17211c3f09 100644 --- a/tests/rptest/tests/cluster_features_test.py +++ b/tests/rptest/tests/cluster_features_test.py @@ -40,7 +40,7 @@ def _assert_default_features(self): # This assertion will break each time we increment the value # of `latest_version` in the redpanda source. Update it when # that happens. - assert features_response['cluster_version'] == 3 + assert features_response['cluster_version'] == 4 assert self._get_features_map( features_response)['central_config']['state'] == 'active'