From 1a72446ab268ef66db7ac5b3baca754f5e7b7cb2 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 13:55:55 -0700 Subject: [PATCH 01/13] k/produce: do not use unknown_server_error Kafka client doesn't process unknown_server_error correctly and it may lead to duplicates violating the idempotency. See the following issue for more info: https://issues.apache.org/jira/browse/KAFKA-14034 request_timed_out just like unknown_server_error means that the true outcome of the operation is unknown and unlike unknown_server_error it doesn't cause the problem so switching to using it to avoid the problem --- src/v/kafka/server/handlers/produce.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 687b127c6c0b..8e9a2b051ecd 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -139,7 +139,7 @@ static error_code map_produce_error_code(std::error_code ec) { case raft::errc::shutting_down: return error_code::request_timed_out; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } @@ -157,11 +157,11 @@ static error_code map_produce_error_code(std::error_code ec) { case cluster::errc::invalid_request: return error_code::invalid_request; default: - return error_code::unknown_server_error; + return error_code::request_timed_out; } } - return error_code::unknown_server_error; + return error_code::request_timed_out; } /* @@ -198,7 +198,7 @@ static partition_produce_stages partition_append( p.error_code = map_produce_error_code(r.error()); } } catch (...) { - p.error_code = error_code::unknown_server_error; + p.error_code = error_code::request_timed_out; } return p; }), From 1dfd8d963ea88c08c339727c2001dc3ba6f44624 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 8 Jul 2022 21:05:35 -0700 Subject: [PATCH 02/13] cluster: remove dead code --- src/v/cluster/partition.cc | 53 -------------------------------------- src/v/cluster/partition.h | 5 ---- 2 files changed, 58 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 8350845f7d95..806c69a19a7e 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -187,59 +187,6 @@ raft::replicate_stages partition::replicate_in_stages( } } -ss::future> partition::replicate( - model::batch_identity bid, - model::record_batch_reader&& r, - raft::replicate_options opts) { - if (bid.is_transactional) { - if (!_is_tx_enabled) { - vlog( - clusterlog.error, - "Can't process a transactional request to {}. Transactional " - "processing isn't enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support transactional processing.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - } - - if (bid.has_idempotent()) { - if (!_is_idempotence_enabled) { - vlog( - clusterlog.error, - "Can't process an idempotent request to {}. Idempotency isn't " - "enabled.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - - if (!_rm_stm) { - vlog( - clusterlog.error, - "Topic {} doesn't support idempotency.", - _raft->ntp()); - return ss::make_ready_future>( - raft::errc::timeout); - } - } - - if (_rm_stm) { - return _rm_stm->replicate(bid, std::move(r), opts); - } else { - return _raft->replicate(std::move(r), opts); - } -} - ss::future<> partition::start() { auto ntp = _raft->ntp(); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 81107c6d0beb..e62c856e1b51 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -59,11 +59,6 @@ class partition { ss::future> replicate( model::term_id, model::record_batch_reader&&, raft::replicate_options); - ss::future> replicate( - model::batch_identity, - model::record_batch_reader&&, - raft::replicate_options); - raft::replicate_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, From 67a3112d7359fc2d2d62befed416f11dfdd48853 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 15:16:30 -0700 Subject: [PATCH 03/13] cluster: prepare partition for translating offset Update all partition::replicate dependees which don't perform offset translation to bypass it via a direct raft reference --- src/v/cluster/partition.cc | 12 ------------ src/v/cluster/partition.h | 12 +----------- src/v/cluster/partition_probe.cc | 8 ++++---- src/v/cluster/tests/partition_moving_test.cc | 2 +- src/v/cluster/tests/rebalancing_tests_fixture.h | 2 +- .../coproc/tests/fixtures/fiber_mock_fixture.cc | 2 +- src/v/kafka/server/group.cc | 16 ++++++++-------- src/v/kafka/server/group_metadata_migration.cc | 1 + src/v/kafka/server/tests/fetch_test.cc | 8 ++++---- src/v/kafka/server/tests/topic_recreate_test.cc | 2 +- 10 files changed, 22 insertions(+), 43 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 806c69a19a7e..4a5d70a878f7 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -107,18 +107,6 @@ ss::future> partition::replicate( return _raft->replicate(std::move(r), opts); } -raft::replicate_stages partition::replicate_in_stages( - model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate_in_stages(std::move(r), opts); -} - -ss::future> partition::replicate( - model::term_id term, - model::record_batch_reader&& r, - raft::replicate_options opts) { - return _raft->replicate(term, std::move(r), opts); -} - ss::shared_ptr partition::rm_stm() { if (!_rm_stm) { if (!_is_tx_enabled && !_is_idempotence_enabled) { diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index e62c856e1b51..3a4e6e636acf 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -53,12 +53,6 @@ class partition { ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages - replicate_in_stages(model::record_batch_reader&&, raft::replicate_options); - - ss::future> replicate( - model::term_id, model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, @@ -284,11 +278,7 @@ class partition { return _raft->abort_configuration_change(rev); } -private: - friend partition_manager; - friend replicated_partition_probe; - - consensus_ptr raft() { return _raft; } + consensus_ptr raft() const { return _raft; } private: consensus_ptr _raft; diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 949409d0fb20..01b21336a273 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -89,7 +89,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { sm::make_gauge( "leader_id", [this] { - return _partition._raft->get_leader_id().value_or( + return _partition.raft()->get_leader_id().value_or( model::node_id(-1)); }, sm::description("Id of current partition leader"), @@ -98,7 +98,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { sm::make_gauge( "under_replicated_replicas", [this] { - auto metrics = _partition._raft->get_follower_metrics(); + auto metrics = _partition.raft()->get_follower_metrics(); return std::count_if( metrics.cbegin(), metrics.cend(), @@ -181,7 +181,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { sm::make_gauge( "under_replicated_replicas", [this] { - auto metrics = _partition._raft->get_follower_metrics(); + auto metrics = _partition.raft()->get_follower_metrics(); return std::count_if( metrics.cbegin(), metrics.cend(), @@ -214,7 +214,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { .aggregate({sm::shard_label, partition_label}), sm::make_gauge( "replicas", - [this] { return _partition._raft->get_follower_count(); }, + [this] { return _partition.raft()->get_follower_count(); }, sm::description("Number of replicas per topic"), labels) .aggregate({sm::shard_label, partition_label}), diff --git a/src/v/cluster/tests/partition_moving_test.cc b/src/v/cluster/tests/partition_moving_test.cc index a8b68af8fe2b..b5d8fd9766c4 100644 --- a/src/v/cluster/tests/partition_moving_test.cc +++ b/src/v/cluster/tests/partition_moving_test.cc @@ -318,7 +318,7 @@ class partition_assignment_test_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index 830d269e5b6e..635564406899 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -159,7 +159,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); // replicate - auto f = pm.get(ntp)->replicate( + auto f = pm.get(ntp)->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); diff --git a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc index 528c78f41417..c2bc47e6651b 100644 --- a/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc +++ b/src/v/coproc/tests/fixtures/fiber_mock_fixture.cc @@ -178,7 +178,7 @@ ss::future> fiber_mock_fixture::make_source( auto batch = make_random_batch(params.records_per_input); co_await tests::cooperative_spin_wait_with_timeout( 2s, [partition]() { return partition->is_elected_leader(); }); - auto r = co_await partition->replicate( + auto r = co_await partition->raft()->replicate( std::move(batch), raft::replicate_options(raft::consistency_level::leader_ack)); vassert(!r.has_error(), "Write error: {}", r.error()); diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 1441b02bfb60..32815af490d9 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1689,7 +1689,7 @@ group::commit_tx(cluster::commit_group_tx_request r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1772,7 +1772,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { r.pid, std::move(fence)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1887,7 +1887,7 @@ group::prepare_tx(cluster::prepare_group_tx_request r) { std::move(tx_entry)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -1983,7 +1983,7 @@ group::abort_tx(cluster::abort_group_tx_request r) { std::move(tx)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->replicate( + auto e = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2103,7 +2103,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto batch = std::move(builder).build(); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto replicate_stages = _partition->replicate_in_stages( + auto replicate_stages = _partition->raft()->replicate_in_stages( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2492,7 +2492,7 @@ ss::future group::remove() { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2572,7 +2572,7 @@ group::remove_topic_partitions(const std::vector& tps) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { - auto result = co_await _partition->replicate( + auto result = co_await _partition->raft()->replicate( std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2599,7 +2599,7 @@ group::remove_topic_partitions(const std::vector& tps) { ss::future> group::store_group(model::record_batch batch) { - return _partition->replicate( + return _partition->raft()->replicate( model::make_memory_record_batch_reader(std::move(batch)), raft::replicate_options(raft::consistency_level::quorum_ack)); } diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index c7a27abe203b..a98aba8bbb5f 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -332,6 +332,7 @@ ss::future replicate( [ntp = std::move(ntp), f_reader = std::move(f_reader)](cluster::partition_manager& pm) mutable { return pm.get(ntp) + ->raft() ->replicate( std::move(f_reader), raft::replicate_options(raft::consistency_level::quorum_ack)) diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index d69e40796943..af3bbbf539c9 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -418,7 +418,7 @@ FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -483,7 +483,7 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -563,7 +563,7 @@ FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { model::offset(0), 5); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options( raft::consistency_level::quorum_ack)); @@ -615,7 +615,7 @@ FIXTURE_TEST(fetch_request_max_bytes, redpanda_thread_fixture) { model::offset(0), 20); auto rdr = model::make_memory_record_batch_reader( std::move(batches)); - return partition->replicate( + return partition->raft()->replicate( std::move(rdr), raft::replicate_options(raft::consistency_level::quorum_ack)); }) diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index fd081431ec40..8628cf183dc4 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -266,7 +266,7 @@ FIXTURE_TEST(test_recreated_topic_does_not_lose_data, recreate_test_fixture) { auto rdr = model::make_memory_record_batch_reader( std::move(batches)); auto p = pm.get(ntp); - return p + return p->raft() ->replicate( std::move(rdr), raft::replicate_options( From e693bead59ec286596f23f29b50a9f49cb6ceb9a Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:36:29 -0700 Subject: [PATCH 04/13] k/group: avoid ABA problem Updating consumer groups to use conditional replication to prevent a situation when after a check a leadership jumps away, invalidates the check, jumps back just in time for the post check replication. check condition leadership goes to a new node the node replicates something which invalidates the conditions the leadership jumps back the node successfully replicates assuming that the condition is true Switched to a conditional replicate to fix the problem. When a group manager detects a leadership change it replays the group's records to reconstruct the groups state. We cache the current term in the state and use it as a condition on replicate. In this case we know that if the leadership bounce the replication won't pass. --- src/v/kafka/server/group.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 32815af490d9..b7d7c2262a81 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2104,6 +2104,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto replicate_stages = _partition->raft()->replicate_in_stages( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2493,6 +2494,7 @@ ss::future group::remove() { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2573,6 +2575,7 @@ group::remove_topic_partitions(const std::vector& tps) { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2600,6 +2603,7 @@ group::remove_topic_partitions(const std::vector& tps) { ss::future> group::store_group(model::record_batch batch) { return _partition->raft()->replicate( + _term, model::make_memory_record_batch_reader(std::move(batch)), raft::replicate_options(raft::consistency_level::quorum_ack)); } From 93350756123f6883192a0fb6bb2b123f394e9a4b Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:26 -0700 Subject: [PATCH 05/13] c/types: introduce kafka offset types We're going to mix raft and kafka offset in the same class, since both the offsets uses the same type it's easy to make an error and treat one as it was another. Introducing kafka offset to rely on the type system to prevent such errors. --- src/v/cluster/types.cc | 10 ++++++++++ src/v/cluster/types.h | 14 ++++++++++++++ src/v/model/fundamental.h | 6 ++++++ 3 files changed, 30 insertions(+) diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 9769e8e07a19..474bfb6a2d9e 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -32,6 +32,16 @@ namespace cluster { +kafka_stages::kafka_stages( + ss::future<> enq, ss::future> offset_future) + : request_enqueued(std::move(enq)) + , replicate_finished(std::move(offset_future)) {} + +kafka_stages::kafka_stages(raft::errc ec) + : request_enqueued(ss::now()) + , replicate_finished( + ss::make_ready_future>(make_error_code(ec))){}; + bool topic_properties::is_compacted() const { if (!cleanup_policy_bitflags) { return false; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index a5abd73d555f..b101b19a88bf 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -175,6 +175,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept { return std::error_code(static_cast(e), tx_error_category()); } +struct kafka_result { + kafka::offset last_offset; +}; +struct kafka_stages { + kafka_stages(ss::future<>, ss::future>); + explicit kafka_stages(raft::errc); + // after this future is ready, request in enqueued in raft and it will not + // be reorderd + ss::future<> request_enqueued; + // after this future is ready, request was successfully replicated with + // requested consistency level + ss::future> replicate_finished; +}; + struct try_abort_request : serde::envelope> { model::partition_id tm; diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 3ecfdf1cb688..86e92f865e14 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -29,6 +29,12 @@ #include #include +namespace kafka { + +using offset = named_type; + +} // namespace kafka + namespace model { // Named after Kafka cleanup.policy topic property From e3d24d951206fe51a68e0cf0c1965e0525ad07f7 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:39 -0700 Subject: [PATCH 06/13] cluster: shift offset translation to partition Shifting offset translation down the abstraction well to eventually reach rm_stm --- src/v/cluster/partition.cc | 42 ++++++++++++++++------ src/v/cluster/partition.h | 5 +-- src/v/kafka/server/replicated_partition.cc | 17 +++++---- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4a5d70a878f7..4004498c2d47 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -102,9 +102,15 @@ partition::partition( } } -ss::future> partition::replicate( +ss::future> partition::replicate( model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate(std::move(r), opts); + using ret_t = result; + auto res = co_await _raft->replicate(std::move(r), opts); + if (!res) { + co_return ret_t(res.error()); + } + co_return ret_t(kafka_result{ + kafka::offset(_translator->from_log_offset(res.value().last_offset)())}); } ss::shared_ptr partition::rm_stm() { @@ -126,10 +132,11 @@ ss::shared_ptr partition::rm_stm() { return _rm_stm; } -raft::replicate_stages partition::replicate_in_stages( +kafka_stages partition::replicate_in_stages( model::batch_identity bid, model::record_batch_reader&& r, raft::replicate_options opts) { + using ret_t = result; if (bid.is_transactional) { if (!_is_tx_enabled) { vlog( @@ -137,7 +144,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process a transactional request to {}. Transactional " "processing isn't enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -145,7 +152,7 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support transactional processing.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } @@ -156,7 +163,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process an idempotent request to {}. Idempotency isn't " "enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -164,15 +171,29 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support idempotency.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } + ss::lw_shared_ptr res; if (_rm_stm) { - return _rm_stm->replicate_in_stages(bid, std::move(r), opts); + res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { - return _raft->replicate_in_stages(std::move(r), opts); + res = _raft->replicate_in_stages(std::move(r), opts); } + + auto replicate_finished = res->replicate_finished.then( + [this](result r) { + if (!r) { + return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = kafka::offset( + _translator->from_log_offset(old_offset)()); + return ret_t(kafka_result{new_offset}); + }); + return kafka_stages( + std::move(res->request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { @@ -180,7 +201,8 @@ ss::future<> partition::start() { _probe.setup_metrics(ntp); - auto f = _raft->start(); + auto f = _raft->start().then( + [this] { _translator = _raft->get_offset_translator_state(); }); if (is_id_allocator_topic(ntp)) { return f.then([this] { return _id_allocator_stm->start(); }); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 3a4e6e636acf..93f6b19b4387 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -50,10 +50,10 @@ class partition { ss::future<> start(); ss::future<> stop(); - ss::future> + ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); @@ -293,6 +293,7 @@ class partition { bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; + ss::lw_shared_ptr _translator; friend std::ostream& operator<<(std::ostream& o, const partition& x); }; diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index faf03c03d452..d24fe5fed21b 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -165,11 +165,11 @@ ss::future> replicated_partition::replicate( model::record_batch_reader rdr, raft::replicate_options opts) { using ret_t = result; return _partition->replicate(std::move(rdr), opts) - .then([this](result r) { + .then([](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->from_log_offset(r.value().last_offset)); + return ret_t(model::offset(r.value().last_offset())); }); } @@ -179,15 +179,18 @@ raft::replicate_stages replicated_partition::replicate( raft::replicate_options opts) { using ret_t = result; auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); - res.replicate_finished = res.replicate_finished.then( - [this](result r) { + + raft::replicate_stages out(raft::errc::success); + out.request_enqueued = std::move(res.request_enqueued); + out.replicate_finished = res.replicate_finished.then( + [](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(raft::replicate_result{ - _translator->from_log_offset(r.value().last_offset)}); + return ret_t( + raft::replicate_result{model::offset(r.value().last_offset())}); }); - return res; + return out; } std::optional replicated_partition::get_leader_epoch_last_offset( From 63c5883435da22b30f2fbc772932766a9b2e7465 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:48:55 -0700 Subject: [PATCH 07/13] rm_stm: prepare to use kafka::offset based cache Preparing rm_stm to use kafka::offset based seq-offset cache. Right now it uses raft offsets but there is a problem with it: once the cache items become older that the head of the log (eviction) panda becomes unable to use offset translation so we need to store already translated offsets. Since the cache is persisted as a part the snapshot so we need to change the disk format and provide backward compatibility. The change is splitted into two commits. Current commit introduces types to represent old format seq_cache_entry_v1 and tx_snapshot_v1 and adds compatibility machinary to convert old snapshot (tx_snapshot_v1) to new snapshot (tx_snapshot). The follow up commit updates the default types to use new format and updates the mapping between old and default types. --- src/v/cluster/rm_stm.cc | 135 +++++++++++++++++++++++++++++++--------- src/v/cluster/rm_stm.h | 7 ++- 2 files changed, 111 insertions(+), 31 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 4766ad986f21..f1435e79a497 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -187,6 +187,31 @@ struct tx_snapshot_v0 { std::vector seqs; }; +struct seq_cache_entry_v1 { + int32_t seq{-1}; + model::offset offset; +}; + +struct seq_entry_v1 { + model::producer_identity pid; + int32_t seq{-1}; + model::offset last_offset{-1}; + ss::circular_buffer seq_cache; + model::timestamp::type last_write_timestamp; +}; + +struct tx_snapshot_v1 { + static constexpr uint8_t version = 1; + + std::vector fenced; + std::vector ongoing; + std::vector prepared; + std::vector aborted; + std::vector abort_indexes; + model::offset offset; + std::vector seqs; +}; + rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, @@ -1812,14 +1837,35 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { iobuf_parser data_parser(std::move(tx_ss_buf)); if (hdr.version == tx_snapshot::version) { data = reflection::adl{}.from(data_parser); + } else if (hdr.version == tx_snapshot_v1::version) { + auto data_v1 = reflection::adl{}.from(data_parser); + data.fenced = std::move(data_v1.fenced); + data.ongoing = std::move(data_v1.ongoing); + data.prepared = std::move(data_v1.prepared); + data.aborted = std::move(data_v1.aborted); + data.abort_indexes = std::move(data_v1.abort_indexes); + data.offset = std::move(data_v1.offset); + for (auto& seq_v1 : data_v1.seqs) { + seq_entry seq; + seq.pid = seq_v1.pid; + seq.seq = seq_v1.seq; + seq.last_offset = seq_v1.last_offset; + seq.seq_cache.reserve(seq_v1.seq_cache.size()); + for (auto& item : seq_v1.seq_cache) { + seq.seq_cache.push_back( + seq_cache_entry{.seq = item.seq, .offset = item.offset}); + } + seq.last_write_timestamp = seq_v1.last_write_timestamp; + data.seqs.push_back(std::move(seq)); + } } else if (hdr.version == tx_snapshot_v0::version) { auto data_v0 = reflection::adl{}.from(data_parser); - data.fenced = data_v0.fenced; - data.ongoing = data_v0.ongoing; - data.prepared = data_v0.prepared; - data.aborted = data_v0.aborted; - data.abort_indexes = data_v0.abort_indexes; - data.offset = data_v0.offset; + data.fenced = std::move(data_v0.fenced); + data.ongoing = std::move(data_v0.ongoing); + data.prepared = std::move(data_v0.prepared); + data.aborted = std::move(data_v0.aborted); + data.abort_indexes = std::move(data_v0.abort_indexes); + data.offset = std::move(data_v0.offset); for (auto seq_v0 : data_v0.seqs) { auto seq = seq_entry{ .pid = seq_v0.pid, @@ -1879,6 +1925,27 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _insync_offset = data.offset; } +uint8_t rm_stm::active_snapshot_version() { return tx_snapshot_v1::version; } + +template +void rm_stm::fill_snapshot_wo_seqs(T& snapshot) { + for (auto const& [k, v] : _log_state.fence_pid_epoch) { + snapshot.fenced.push_back(model::producer_identity{k(), v()}); + } + for (auto& entry : _log_state.ongoing_map) { + snapshot.ongoing.push_back(entry.second); + } + for (auto& entry : _log_state.prepared) { + snapshot.prepared.push_back(entry.second); + } + for (auto& entry : _log_state.aborted) { + snapshot.aborted.push_back(entry); + } + for (auto& entry : _log_state.abort_indexes) { + snapshot.abort_indexes.push_back(entry); + } +} + ss::future rm_stm::take_snapshot() { if (_log_state.aborted.size() > _abort_index_segment_size) { std::sort( @@ -1904,33 +1971,41 @@ ss::future rm_stm::take_snapshot() { _log_state.aborted = snapshot.aborted; } - tx_snapshot tx_ss; - - for (auto const& [k, v] : _log_state.fence_pid_epoch) { - tx_ss.fenced.push_back(model::producer_identity{k(), v()}); - } - for (auto& entry : _log_state.ongoing_map) { - tx_ss.ongoing.push_back(entry.second); - } - for (auto& entry : _log_state.prepared) { - tx_ss.prepared.push_back(entry.second); - } - for (auto& entry : _log_state.aborted) { - tx_ss.aborted.push_back(entry); - } - for (auto& entry : _log_state.abort_indexes) { - tx_ss.abort_indexes.push_back(entry); - } - for (const auto& entry : _log_state.seq_table) { - tx_ss.seqs.push_back(entry.second.copy()); - } - tx_ss.offset = _insync_offset; - iobuf tx_ss_buf; - reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + auto version = active_snapshot_version(); + if (version == tx_snapshot::version) { + tx_snapshot tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& entry : _log_state.seq_table) { + tx_ss.seqs.push_back(entry.second.copy()); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else if (version == tx_snapshot_v1::version) { + tx_snapshot_v1 tx_ss; + fill_snapshot_wo_seqs(tx_ss); + for (const auto& it : _log_state.seq_table) { + auto& entry = it.second; + seq_entry_v1 seqs; + seqs.pid = entry.pid; + seqs.seq = entry.seq; + seqs.last_offset = entry.last_offset; + seqs.last_write_timestamp = entry.last_write_timestamp; + seqs.seq_cache.reserve(seqs.seq_cache.size()); + for (auto& item : entry.seq_cache) { + seqs.seq_cache.push_back( + seq_cache_entry_v1{.seq = item.seq, .offset = item.offset}); + } + tx_ss.seqs.push_back(std::move(seqs)); + } + tx_ss.offset = _insync_offset; + reflection::adl{}.to(tx_ss_buf, std::move(tx_ss)); + } else { + vassert(false, "unsupported tx_snapshot version {}", version); + } co_return stm_snapshot::create( - tx_snapshot::version, _insync_offset, std::move(tx_ss_buf)); + version, _insync_offset, std::move(tx_ss_buf)); } ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) { diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 48f3fa0da964..fcb0ea4b4656 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -123,7 +123,7 @@ class rm_stm final : public persisted_stm { }; struct tx_snapshot { - static constexpr uint8_t version = 1; + static constexpr uint8_t version = 2; std::vector fenced; std::vector ongoing; @@ -492,6 +492,11 @@ class rm_stm final : public persisted_stm { std::optional get_expiration_info(model::producer_identity pid) const; + uint8_t active_snapshot_version(); + + template + void fill_snapshot_wo_seqs(T&); + ss::basic_rwlock<> _state_lock; absl::flat_hash_map> _tx_locks; absl::flat_hash_map< From 4b42c7eca8a8bfa0e17cd72a694ce982584626d4 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 1 Jul 2022 22:54:49 -0700 Subject: [PATCH 08/13] rm_stm: shift offset translation to rm_stm switching to caching seq-kafka offsets cache to avoid out of range errors on translating offsets beyond the eviction point --- src/v/cluster/partition.cc | 10 +- src/v/cluster/rm_stm.cc | 118 +++++++++++++++-------- src/v/cluster/rm_stm.h | 50 ++++++---- src/v/cluster/tests/idempotency_tests.cc | 2 +- 4 files changed, 118 insertions(+), 62 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4004498c2d47..0547135d0617 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -175,14 +175,12 @@ kafka_stages partition::replicate_in_stages( } } - ss::lw_shared_ptr res; if (_rm_stm) { - res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); - } else { - res = _raft->replicate_in_stages(std::move(r), opts); + return _rm_stm->replicate_in_stages(bid, std::move(r), opts); } - auto replicate_finished = res->replicate_finished.then( + auto res = _raft->replicate_in_stages(std::move(r), opts); + auto replicate_finished = res.replicate_finished.then( [this](result r) { if (!r) { return ret_t(r.error()); @@ -193,7 +191,7 @@ kafka_stages partition::replicate_in_stages( return ret_t(kafka_result{new_offset}); }); return kafka_stages( - std::move(res->request_enqueued), std::move(replicate_finished)); + std::move(res.request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index f1435e79a497..328ebfca2b67 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -785,7 +785,7 @@ ss::future rm_stm::do_abort_tx( co_return tx_errc::none; } -raft::replicate_stages rm_stm::replicate_in_stages( +kafka_stages rm_stm::replicate_in_stages( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -804,10 +804,10 @@ raft::replicate_stages rm_stm::replicate_in_stages( enqueued->set_value(); } }); - return raft::replicate_stages(std::move(f), std::move(replicate_finished)); + return kafka_stages(std::move(f), std::move(replicate_finished)); } -ss::future> rm_stm::replicate( +ss::future> rm_stm::replicate( model::batch_identity bid, model::record_batch_reader r, raft::replicate_options opts) { @@ -824,7 +824,7 @@ rm_stm::transfer_leadership(std::optional target) { }); } -ss::future> rm_stm::do_replicate( +ss::future> rm_stm::do_replicate( model::batch_identity bid, model::record_batch_reader b, raft::replicate_options opts, @@ -854,6 +854,11 @@ ss::future<> rm_stm::stop() { return raft::state_machine::stop(); } +ss::future<> rm_stm::start() { + _translator = _c->get_offset_translator_state(); + return persisted_stm::start(); +} + rm_stm::transaction_info::status_t rm_stm::get_tx_status(model::producer_identity pid) const { if (_mem_state.preparing.contains(pid)) { @@ -947,7 +952,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return false; } - seq.update(bid.last_seq, model::offset{-1}); + seq.update(bid.last_seq, kafka::offset{-1}); seq.pid = bid.pid; seq.last_write_timestamp = last_write_timestamp; @@ -957,7 +962,7 @@ bool rm_stm::check_seq(model::batch_identity bid) { return true; } -std::optional +std::optional rm_stm::known_seq(model::batch_identity bid) const { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq == _log_state.seq_table.end()) { @@ -982,7 +987,7 @@ std::optional rm_stm::tail_seq(model::producer_identity pid) const { return pid_seq->second.seq; } -void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) { +void rm_stm::set_seq(model::batch_identity bid, kafka::offset last_offset) { auto pid_seq = _log_state.seq_table.find(bid.pid); if (pid_seq != _log_state.seq_table.end()) { if (pid_seq->second.seq == bid.last_seq) { @@ -995,14 +1000,14 @@ void rm_stm::reset_seq(model::batch_identity bid) { _log_state.seq_table.erase(bid.pid); auto& seq = _log_state.seq_table[bid.pid]; seq.seq = bid.last_seq; - seq.last_offset = model::offset{-1}; + seq.last_offset = kafka::offset{-1}; seq.pid = bid.pid; seq.last_write_timestamp = model::timestamp::now().value(); _oldest_session = std::min( _oldest_session, model::timestamp(seq.last_write_timestamp)); } -ss::future> +ss::future> rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { if (!check_tx_permitted()) { co_return errc::generic_tx_error; @@ -1056,7 +1061,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // this isn't the first attempt in the tx we should try dedupe auto cached_offset = known_seq(bid); if (cached_offset) { - if (cached_offset.value() < model::offset{0}) { + if (cached_offset.value() < kafka::offset{0}) { vlog( clusterlog.warn, "Status of the original attempt is unknown (still is " @@ -1070,8 +1075,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // to propagate it to the app layer co_return errc::generic_tx_error; } - co_return raft::replicate_result{ - .last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } if (!check_seq(bid)) { @@ -1107,26 +1111,28 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { expiration_it->second.last_update = clock_type::now(); expiration_it->second.is_expiration_requested = false; - auto replicated = r.value(); + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); - set_seq(bid, replicated.last_offset); + set_seq(bid, new_offset); - auto last_offset = model::offset(replicated.last_offset()); if (!_mem_state.tx_start.contains(bid.pid)) { - auto base_offset = model::offset( - last_offset() - (bid.record_count - 1)); + auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); _mem_state.tx_starts.insert(base_offset); _mem_state.estimated.erase(bid.pid); } - co_return replicated; + + co_return kafka_result{.last_offset = new_offset}; } -ss::future> rm_stm::replicate_seq( +ss::future> rm_stm::replicate_seq( model::batch_identity bid, model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1185,7 +1191,7 @@ ss::future> rm_stm::replicate_seq( // checking among the responded requests auto cached_offset = known_seq(bid); if (cached_offset) { - co_return raft::replicate_result{.last_offset = cached_offset.value()}; + co_return kafka_result{.last_offset = cached_offset.value()}; } // checking if the request is already being processed @@ -1193,8 +1199,8 @@ ss::future> rm_stm::replicate_seq( if (inflight->last_seq == bid.last_seq && inflight->is_processing) { // found an inflight request, parking the current request // until the former is resolved - auto promise = ss::make_lw_shared< - available_promise>>(); + auto promise + = ss::make_lw_shared>>(); inflight->parked.push_back(promise); u.return_all(); co_return co_await promise->get_future(); @@ -1279,20 +1285,26 @@ ss::future> rm_stm::replicate_seq( // we don't need session->lock because we never interleave // access to is_processing and offset with sync point (await) request->is_processing = false; - request->r = r; + if (r) { + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + request->r = ret_t(kafka_result{new_offset}); + } else { + request->r = ret_t(r.error()); + } for (auto& pending : request->parked) { - pending->set_value(r); + pending->set_value(request->r); } request->parked.clear(); - if (!r) { + if (!request->r) { // if r was failed at the consensus level (not because has_failed) // it should guarantee that all follow up replication requests fail // too but just in case stepping down to minimize the risk if (_c->is_leader() && _c->term() == synced_term) { co_await _c->step_down(); } - co_return r; + co_return request->r; } // requests get into session->cache in seq order so when we iterate @@ -1324,13 +1336,15 @@ ss::future> rm_stm::replicate_seq( _inflight_requests.erase(bid.pid); } - co_return r; + co_return request->r; } -ss::future> rm_stm::replicate_msg( +ss::future> rm_stm::replicate_msg( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { + using ret_t = result; + if (!co_await sync(_sync_timeout)) { co_return errc::not_leader; } @@ -1338,7 +1352,14 @@ ss::future> rm_stm::replicate_msg( auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts); co_await std::move(ss.request_enqueued); enqueued->set_value(); - co_return co_await std::move(ss.replicate_finished); + auto r = co_await std::move(ss.replicate_finished); + + if (!r) { + co_return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = from_log_offset(old_offset); + co_return ret_t(kafka_result{new_offset}); } model::offset rm_stm::last_stable_offset() { @@ -1785,12 +1806,13 @@ ss::future<> rm_stm::apply_control( void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) { if (bid.has_idempotent()) { auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid); + auto translated = from_log_offset(last_offset); if (inserted) { seq_it->second.pid = bid.pid; seq_it->second.seq = bid.last_seq; - seq_it->second.last_offset = last_offset; + seq_it->second.last_offset = translated; } else { - seq_it->second.update(bid.last_seq, last_offset); + seq_it->second.update(bid.last_seq, translated); } seq_it->second.last_write_timestamp = bid.first_timestamp.value(); _oldest_session = std::min(_oldest_session, bid.first_timestamp); @@ -1849,11 +1871,21 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { seq_entry seq; seq.pid = seq_v1.pid; seq.seq = seq_v1.seq; - seq.last_offset = seq_v1.last_offset; + try { + seq.last_offset = from_log_offset(seq_v1.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seq.seq_cache.reserve(seq_v1.seq_cache.size()); for (auto& item : seq_v1.seq_cache) { - seq.seq_cache.push_back( - seq_cache_entry{.seq = item.seq, .offset = item.offset}); + try { + seq.seq_cache.push_back(seq_cache_entry{ + .seq = item.seq, .offset = from_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } seq.last_write_timestamp = seq_v1.last_write_timestamp; data.seqs.push_back(std::move(seq)); @@ -1870,7 +1902,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { auto seq = seq_entry{ .pid = seq_v0.pid, .seq = seq_v0.seq, - .last_offset = model::offset{-1}, + .last_offset = kafka::offset{-1}, .last_write_timestamp = seq_v0.last_write_timestamp}; data.seqs.push_back(std::move(seq)); } @@ -1989,12 +2021,22 @@ ss::future rm_stm::take_snapshot() { seq_entry_v1 seqs; seqs.pid = entry.pid; seqs.seq = entry.seq; - seqs.last_offset = entry.last_offset; + try { + seqs.last_offset = to_log_offset(entry.last_offset); + } catch (...) { + // ignoring outside the translation range errors + continue; + } seqs.last_write_timestamp = entry.last_write_timestamp; seqs.seq_cache.reserve(seqs.seq_cache.size()); for (auto& item : entry.seq_cache) { - seqs.seq_cache.push_back( - seq_cache_entry_v1{.seq = item.seq, .offset = item.offset}); + try { + seqs.seq_cache.push_back(seq_cache_entry_v1{ + .seq = item.seq, .offset = to_log_offset(item.offset)}); + } catch (...) { + // ignoring outside the translation range errors + continue; + } } tx_ss.seqs.push_back(std::move(seqs)); } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index fcb0ea4b4656..0be0ae891598 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -21,6 +21,7 @@ #include "raft/logger.h" #include "raft/state_machine.h" #include "raft/types.h" +#include "storage/offset_translator_state.h" #include "storage/snapshot.h" #include "utils/available_promise.h" #include "utils/expiring_promise.h" @@ -74,14 +75,14 @@ class rm_stm final : public persisted_stm { struct seq_cache_entry { int32_t seq{-1}; - model::offset offset; + kafka::offset offset; }; struct seq_entry { static const int seq_cache_size = 5; model::producer_identity pid; int32_t seq{-1}; - model::offset last_offset{-1}; + kafka::offset last_offset{-1}; ss::circular_buffer seq_cache; model::timestamp::type last_write_timestamp; @@ -99,7 +100,7 @@ class rm_stm final : public persisted_stm { return ret; } - void update(int32_t new_seq, model::offset new_offset) { + void update(int32_t new_seq, kafka::offset new_offset) { if (new_seq < seq) { return; } @@ -109,7 +110,7 @@ class rm_stm final : public persisted_stm { return; } - if (seq >= 0 && last_offset >= model::offset{0}) { + if (seq >= 0 && last_offset >= kafka::offset{0}) { auto entry = seq_cache_entry{.seq = seq, .offset = last_offset}; seq_cache.push_back(entry); while (seq_cache.size() >= seq_entry::seq_cache_size) { @@ -169,12 +170,12 @@ class rm_stm final : public persisted_stm { ss::future> aborted_transactions(model::offset, model::offset); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + ss::future> replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options); @@ -184,6 +185,8 @@ class rm_stm final : public persisted_stm { ss::future<> stop() override; + ss::future<> start() override; + void testing_only_disable_auto_abort() { _is_autoabort_enabled = false; } void testing_only_enable_transactions() { _is_tx_enabled = true; } @@ -273,27 +276,27 @@ class rm_stm final : public persisted_stm { ss::future<> save_abort_snapshot(abort_snapshot); bool check_seq(model::batch_identity); - std::optional known_seq(model::batch_identity) const; - void set_seq(model::batch_identity, model::offset); + std::optional known_seq(model::batch_identity) const; + void set_seq(model::batch_identity, kafka::offset); void reset_seq(model::batch_identity); std::optional tail_seq(model::producer_identity) const; - ss::future> do_replicate( + ss::future> do_replicate( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> + ss::future> replicate_tx(model::batch_identity, model::record_batch_reader); - ss::future> replicate_seq( + ss::future> replicate_seq( model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); - ss::future> replicate_msg( + ss::future> replicate_msg( model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>); @@ -423,10 +426,9 @@ class rm_stm final : public persisted_stm { // original request is replicated. struct inflight_request { int32_t last_seq{-1}; - result r = errc::success; + result r = errc::success; bool is_processing; - std::vector< - ss::lw_shared_ptr>>> + std::vector>>> parked; }; @@ -466,8 +468,7 @@ class rm_stm final : public persisted_stm { tail_seq = -1; } - std::optional> - known_seq(int32_t last_seq) const { + std::optional> known_seq(int32_t last_seq) const { for (auto& seq : cache) { if (seq->last_seq == last_seq && !seq->is_processing) { return seq->r; @@ -487,6 +488,20 @@ class rm_stm final : public persisted_stm { return lock_it->second; } + kafka::offset from_log_offset(model::offset old_offset) { + if (old_offset > model::offset{-1}) { + return kafka::offset(_translator->from_log_offset(old_offset)()); + } + return kafka::offset(old_offset()); + } + + model::offset to_log_offset(kafka::offset new_offset) { + if (new_offset > model::offset{-1}) { + return _translator->to_log_offset(model::offset(new_offset())); + } + return model::offset(new_offset()); + } + transaction_info::status_t get_tx_status(model::producer_identity pid) const; std::optional @@ -519,6 +534,7 @@ class rm_stm final : public persisted_stm { bool _is_tx_enabled{false}; ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; + ss::lw_shared_ptr _translator; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index 1d3187181691..a9c1d6a3b252 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -150,7 +150,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { wait_for_confirmed_leader(); wait_for_meta_initialized(); - std::vector offsets; + std::vector offsets; auto count = 5; From 065fb54aeb28f38aecd67ccc2d26a6dbc5c9ef9f Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 19:16:14 -0700 Subject: [PATCH 09/13] rm_stm: remove dead code --- src/v/cluster/rm_stm.h | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 0be0ae891598..aee55034860a 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -409,18 +409,6 @@ class rm_stm final : public persisted_stm { } }; - struct request_id { - model::producer_identity pid; - int32_t seq; - - auto operator<=>(const request_id&) const = default; - - template - friend H AbslHashValue(H h, const request_id& bid) { - return H::combine(std::move(h), bid.pid, bid.seq); - } - }; - // When a request is retried while the first appempt is still // being replicated the retried request is parked until the // original request is replicated. From 90007628f63115bc322fc1731c582ec4b2fac237 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 5 Jul 2022 21:54:40 -0700 Subject: [PATCH 10/13] rm_stm: add feature_table as a dependency --- src/v/cluster/partition.cc | 6 ++-- src/v/cluster/partition.h | 5 ++- src/v/cluster/partition_manager.cc | 12 +++++-- src/v/cluster/partition_manager.h | 5 ++- src/v/cluster/rm_stm.cc | 6 ++-- src/v/cluster/rm_stm.h | 5 ++- src/v/cluster/tests/idempotency_tests.cc | 43 ++++++++++++++++++++---- src/v/cluster/tests/rm_stm_tests.cc | 37 ++++++++++++++++---- src/v/redpanda/application.cc | 3 +- 9 files changed, 98 insertions(+), 24 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 0547135d0617..090616884b61 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -32,10 +32,12 @@ partition::partition( consensus_ptr r, ss::sharded& tx_gateway_frontend, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _raft(r) , _probe(std::make_unique(*this)) , _tx_gateway_frontend(tx_gateway_frontend) + , _feature_table(feature_table) , _is_tx_enabled(config::shard_local_cfg().enable_transactions.value()) , _is_idempotence_enabled( config::shard_local_cfg().enable_idempotence.value()) { @@ -70,7 +72,7 @@ partition::partition( if (has_rm_stm) { _rm_stm = ss::make_shared( - clusterlog, _raft.get(), _tx_gateway_frontend); + clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table); stm_manager->add_stm(_rm_stm); } diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 93f6b19b4387..060153c9cc0e 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -13,6 +13,7 @@ #include "cloud_storage/remote_partition.h" #include "cluster/archival_metadata_stm.h" +#include "cluster/feature_table.h" #include "cluster/id_allocator_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" @@ -44,7 +45,8 @@ class partition { consensus_ptr r, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); raft::group_id group() const { return _raft->group(); } ss::future<> start(); @@ -290,6 +292,7 @@ class partition { ss::abort_source _as; partition_probe _probe; ss::sharded& _tx_gateway_frontend; + ss::sharded& _feature_table; bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 406baf98795c..263b3f1857ef 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -48,13 +48,15 @@ partition_manager::partition_manager( ss::sharded& tx_gateway_frontend, ss::sharded& recovery_mgr, ss::sharded& cloud_storage_api, - ss::sharded& cloud_storage_cache) + ss::sharded& cloud_storage_cache, + ss::sharded& feature_table) : _storage(storage.local()) , _raft_manager(raft) , _tx_gateway_frontend(tx_gateway_frontend) , _partition_recovery_mgr(recovery_mgr) , _cloud_storage_api(cloud_storage_api) - , _cloud_storage_cache(cloud_storage_cache) {} + , _cloud_storage_cache(cloud_storage_cache) + , _feature_table(feature_table) {} partition_manager::ntp_table_container partition_manager::get_topic_partition_table( @@ -120,7 +122,11 @@ ss::future partition_manager::manage( group, std::move(initial_nodes), log); auto p = ss::make_lw_shared( - c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache); + c, + _tx_gateway_frontend, + _cloud_storage_api, + _cloud_storage_cache, + _feature_table); _ntp_table.emplace(log.config().ntp(), p); _raft_table.emplace(group, p); diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index fa66f5ea31a0..8f45302e4abc 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -14,6 +14,7 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/feature_table.h" #include "cluster/ntp_callbacks.h" #include "cluster/partition.h" #include "model/metadata.h" @@ -37,7 +38,8 @@ class partition_manager { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); using manage_cb_t = ss::noncopyable_function)>; @@ -190,6 +192,7 @@ class partition_manager { _partition_recovery_mgr; ss::sharded& _cloud_storage_api; ss::sharded& _cloud_storage_cache; + ss::sharded& _feature_table; ss::gate _gate; bool _block_new_leadership{false}; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 328ebfca2b67..3658c61db032 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -215,7 +215,8 @@ struct tx_snapshot_v1 { rm_stm::rm_stm( ss::logger& logger, raft::consensus* c, - ss::sharded& tx_gateway_frontend) + ss::sharded& tx_gateway_frontend, + ss::sharded& feature_table) : persisted_stm("tx.snapshot", logger, c) , _oldest_session(model::timestamp::now()) , _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value()) @@ -234,7 +235,8 @@ rm_stm::rm_stm( , _abort_snapshot_mgr( "abort.idx", std::filesystem::path(c->log_config().work_directory()), - ss::default_priority_class()) { + ss::default_priority_class()) + , _feature_table(feature_table) { if (!_is_tx_enabled) { _is_autoabort_enabled = false; } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index aee55034860a..6e6c09c542e2 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/feature_table.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -151,7 +152,8 @@ class rm_stm final : public persisted_stm { explicit rm_stm( ss::logger&, raft::consensus*, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future> begin_tx( model::producer_identity, model::tx_seq, std::chrono::milliseconds); @@ -523,6 +525,7 @@ class rm_stm final : public persisted_stm { ss::sharded& _tx_gateway_frontend; storage::snapshot_manager _abort_snapshot_mgr; ss::lw_shared_ptr _translator; + ss::sharded& _feature_table; }; } // namespace cluster diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index a9c1d6a3b252..f79f59eaccd0 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -36,7 +37,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -81,6 +85,7 @@ FIXTURE_TEST( raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE((bool)r2); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -88,7 +93,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -135,13 +143,17 @@ FIXTURE_TEST( BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -200,13 +212,17 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE(r1.value().last_offset == offsets[i]); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -260,13 +276,17 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, mux_state_machine_fixture) { r1 == failure_type(cluster::errc::sequence_out_of_order)); } + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -312,6 +332,7 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, mux_state_machine_fixture) { .get0(); BOOST_REQUIRE( r2 == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST( @@ -319,7 +340,10 @@ FIXTURE_TEST( start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -349,13 +373,17 @@ FIXTURE_TEST( .get0(); BOOST_REQUIRE( r == failure_type(cluster::errc::sequence_out_of_order)); + feature_table.stop().get0(); } FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.start().get0(); @@ -404,4 +432,5 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, mux_state_machine_fixture) { BOOST_REQUIRE((bool)r1); BOOST_REQUIRE((bool)r2); BOOST_REQUIRE(r1.value().last_offset == r2.value().last_offset); + feature_table.stop().get0(); } diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index c53da9af13c5..f40bb497b9d9 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/errc.h" +#include "cluster/feature_table.h" #include "cluster/rm_stm.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -65,7 +66,10 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -129,6 +133,7 @@ FIXTURE_TEST(test_tx_happy_tx, mux_state_machine_fixture) { BOOST_REQUIRE_EQUAL(aborted_txs.size(), 0); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -138,7 +143,10 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -204,6 +212,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // tests: @@ -213,7 +222,10 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -285,6 +297,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) { })); BOOST_REQUIRE_LT(tx_offset, stm.last_stable_offset()); + feature_table.stop().get0(); } // transactional writes of an unknown tx are rejected @@ -292,7 +305,10 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -322,6 +338,7 @@ FIXTURE_TEST(test_tx_unknown_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } // begin fences off old transactions @@ -329,7 +346,10 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -379,6 +399,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(!(bool)offset_r); + feature_table.stop().get0(); } // transactional writes of an aborted tx are rejected @@ -386,7 +407,10 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { start_raft(); ss::sharded tx_gateway_frontend; - cluster::rm_stm stm(logger, _raft.get(), tx_gateway_frontend); + ss::sharded feature_table; + feature_table.start().get0(); + cluster::rm_stm stm( + logger, _raft.get(), tx_gateway_frontend, feature_table); stm.testing_only_disable_auto_abort(); stm.testing_only_enable_transactions(); @@ -438,4 +462,5 @@ FIXTURE_TEST(test_tx_post_aborted_produce, mux_state_machine_fixture) { raft::replicate_options(raft::consistency_level::quorum_ack)) .get0(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); + feature_table.stop().get0(); } diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 6dba2999a00d..1e0556610508 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -723,7 +723,8 @@ void application::wire_up_redpanda_services() { std::ref(tx_gateway_frontend), std::ref(partition_recovery_manager), std::ref(cloud_storage_api), - std::ref(shadow_index_cache)) + std::ref(shadow_index_cache), + std::ref(_feature_table)) .get(); vlog(_log.info, "Partition manager started"); From 8e7346dba33421d9b2a0e6bca5daa0cf8e8d524f Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 5 Jul 2022 17:24:10 -0700 Subject: [PATCH 11/13] rm_stm: put kafka offset cache behind feature manager --- src/v/cluster/feature_table.cc | 4 +++- src/v/cluster/feature_table.h | 7 +++++++ src/v/cluster/rm_stm.cc | 7 ++++++- tests/rptest/tests/cluster_features_test.py | 2 +- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/feature_table.cc b/src/v/cluster/feature_table.cc index 134f9a226374..24bf521389c8 100644 --- a/src/v/cluster/feature_table.cc +++ b/src/v/cluster/feature_table.cc @@ -30,6 +30,8 @@ std::string_view to_string_view(feature f) { return "serde_raft_0"; case feature::license: return "license"; + case feature::rm_stm_kafka_cache: + return "rm_stm_kafka_cache"; case feature::test_alpha: return "__test_alpha"; } @@ -58,7 +60,7 @@ std::string_view to_string_view(feature_state::state s) { // The version that this redpanda node will report: increment this // on protocol changes to raft0 structures, like adding new services. -static constexpr cluster_version latest_version = cluster_version{4}; +static constexpr cluster_version latest_version = cluster_version{5}; feature_table::feature_table() { // Intentionally undocumented environment variable, only for use diff --git a/src/v/cluster/feature_table.h b/src/v/cluster/feature_table.h index d40e2a7235b5..88eddf042092 100644 --- a/src/v/cluster/feature_table.h +++ b/src/v/cluster/feature_table.h @@ -27,6 +27,7 @@ enum class feature : std::uint64_t { mtls_authentication = 0x8, serde_raft_0 = 0x10, license = 0x20, + rm_stm_kafka_cache = 0x40, // Dummy features for testing only test_alpha = uint64_t(1) << 63, @@ -115,6 +116,12 @@ constexpr static std::array feature_schema{ feature::license, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + cluster_version{5}, + "rm_stm_kafka_cache", + feature::rm_stm_kafka_cache, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, feature_spec{ cluster_version{2001}, "__test_alpha", diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 3658c61db032..baa3dc492167 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1959,7 +1959,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _insync_offset = data.offset; } -uint8_t rm_stm::active_snapshot_version() { return tx_snapshot_v1::version; } +uint8_t rm_stm::active_snapshot_version() { + if (_feature_table.local().is_active(feature::rm_stm_kafka_cache)) { + return tx_snapshot::version; + } + return tx_snapshot_v1::version; +} template void rm_stm::fill_snapshot_wo_seqs(T& snapshot) { diff --git a/tests/rptest/tests/cluster_features_test.py b/tests/rptest/tests/cluster_features_test.py index 471b576cba87..ab7c91c43df8 100644 --- a/tests/rptest/tests/cluster_features_test.py +++ b/tests/rptest/tests/cluster_features_test.py @@ -42,7 +42,7 @@ def _assert_default_features(self): # This assertion will break each time we increment the value # of `latest_version` in the redpanda source. Update it when # that happens. - assert features_response['cluster_version'] == 4 + assert features_response['cluster_version'] == 5 assert self._get_features_map( features_response)['central_config']['state'] == 'active' From e5846f10846a9a4b8224b83b609cdc974297b6b7 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 11 Jul 2022 19:28:20 -0700 Subject: [PATCH 12/13] ducky: move wait_for_num_versions to redpanda_installer --- tests/rptest/services/redpanda_installer.py | 19 +++++++++++++++++++ tests/rptest/tests/upgrade_test.py | 20 +------------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/tests/rptest/services/redpanda_installer.py b/tests/rptest/services/redpanda_installer.py index fe4cfd1bdc46..e9ab2c2f2cbb 100644 --- a/tests/rptest/services/redpanda_installer.py +++ b/tests/rptest/services/redpanda_installer.py @@ -9,6 +9,7 @@ import re import requests +from ducktape.utils.util import wait_until # Match any version that may result from a redpanda binary, which may not be a # released version. @@ -16,6 +17,24 @@ VERSION_RE = re.compile(".*v(\\d+)\\.(\\d+)\\.(\\d+).*") +def wait_for_num_versions(redpanda, num_versions): + def get_unique_versions(): + node = redpanda.nodes[0] + brokers_list = \ + str(node.account.ssh_output(f"{redpanda.find_binary('rpk')} redpanda admin brokers list")) + redpanda.logger.debug(brokers_list) + version_re = re.compile("v\\d+\\.\\d+\\.\\d+") + return set(version_re.findall(brokers_list)) + + # NOTE: allow retries, as the version may not be available immediately + # following a restart. + wait_until(lambda: len(get_unique_versions()) == num_versions, + timeout_sec=30) + unique_versions = get_unique_versions() + assert len(unique_versions) == num_versions, unique_versions + return unique_versions + + class RedpandaInstaller: """ Provides mechanisms to install multiple Redpanda binaries on a cluster. diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index e5da152eeb6f..7745e1ebccaf 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -14,25 +14,7 @@ from rptest.tests.redpanda_test import RedpandaTest from rptest.services.cluster import cluster from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST -from rptest.services.redpanda_installer import RedpandaInstaller - - -def wait_for_num_versions(redpanda, num_versions): - def get_unique_versions(): - node = redpanda.nodes[0] - brokers_list = \ - str(node.account.ssh_output(f"{redpanda.find_binary('rpk')} redpanda admin brokers list")) - redpanda.logger.debug(brokers_list) - version_re = re.compile("v\\d+\\.\\d+\\.\\d+") - return set(version_re.findall(brokers_list)) - - # NOTE: allow retries, as the version may not be available immediately - # following a restart. - wait_until(lambda: len(get_unique_versions()) == num_versions, - timeout_sec=30) - unique_versions = get_unique_versions() - assert len(unique_versions) == num_versions, unique_versions - return unique_versions +from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions class UpgradeFromSpecificVersion(RedpandaTest): From c45672a8a65277866349c353637a039cb60051f1 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 11 Jul 2022 19:28:59 -0700 Subject: [PATCH 13/13] ducky: add fix 5355 upgrade test manually validated the tests by tweaking active_snapshot_version() to ignore feature manager and to always use the newest version and checked that in this case the tests fail --- tests/rptest/tests/fix_5355_upgrade_test.py | 118 ++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 tests/rptest/tests/fix_5355_upgrade_test.py diff --git a/tests/rptest/tests/fix_5355_upgrade_test.py b/tests/rptest/tests/fix_5355_upgrade_test.py new file mode 100644 index 000000000000..455154950163 --- /dev/null +++ b/tests/rptest/tests/fix_5355_upgrade_test.py @@ -0,0 +1,118 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import re + +from rptest.clients.types import TopicSpec +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions +from rptest.services.redpanda import RedpandaService + +from confluent_kafka import (Producer, KafkaException) +from random import choice +from string import ascii_uppercase + + +def on_delivery(err, msg): + if err is not None: + raise KafkaException(err) + + +class Fix5355UpgradeTest(RedpandaTest): + topics = [TopicSpec(name="topic1")] + """ + Basic test that upgrading software works as expected. + """ + def __init__(self, test_context): + extra_rp_conf = { + "default_topic_replications": 3, + "default_topic_partitions": 1, + "log_segment_size": 1048576 + } + super(Fix5355UpgradeTest, self).__init__(test_context=test_context, + num_brokers=3, + enable_installer=True, + extra_rp_conf=extra_rp_conf) + self.installer = self.redpanda._installer + + def setUp(self): + # NOTE: `rpk redpanda admin brokers list` requires versions v22.1.x and + # above. + self.installer.install(self.redpanda.nodes, (22, 1, 3)) + super(Fix5355UpgradeTest, self).setUp() + + def fill_segment(self): + payload_1kb = ''.join(choice(ascii_uppercase) for i in range(1024)) + p = Producer({ + "bootstrap.servers": self.redpanda.brokers(), + "enable.idempotence": True, + "retries": 5 + }) + for i in range(0, 2 * 1024): + p.produce("topic1", + key="key1".encode('utf-8'), + value=payload_1kb.encode('utf-8'), + callback=on_delivery) + p.flush() + + def check_snapshot_exist(self): + for node in self.redpanda.nodes: + cmd = f"find {RedpandaService.DATA_DIR}" + out_iter = node.account.ssh_capture(cmd) + has_snapshot = False + for line in out_iter: + has_snapshot = has_snapshot or re.match( + f"{RedpandaService.DATA_DIR}/kafka/topic1/\\d+_\\d+/tx.snapshot", + line) + assert has_snapshot + + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_rollback(self): + """ + the test checks than a mid upgrade rollback isn't broken + """ + first_node = self.redpanda.nodes[0] + + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + # Upgrade one node to the head version. + self.installer.install([first_node], RedpandaInstaller.HEAD) + self.redpanda.restart_nodes([first_node]) + unique_versions = wait_for_num_versions(self.redpanda, 2) + assert "v22.1.3" in unique_versions, unique_versions + + self.fill_segment() + self.check_snapshot_exist() + + # Rollback the partial upgrade and ensure we go back to the original + # state. + self.installer.install([first_node], (22, 1, 3)) + self.redpanda.restart_nodes([first_node]) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_upgrade(self): + """ + the test checks than upgrade isn't broken + """ + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" in unique_versions, unique_versions + + self.fill_segment() + self.check_snapshot_exist() + + # Upgrade one node to the head version. + self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD) + self.redpanda.restart_nodes(self.redpanda.nodes) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert "v22.1.3" not in unique_versions, unique_versions