From e3d24d951206fe51a68e0cf0c1965e0525ad07f7 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:39 -0700 Subject: [PATCH] cluster: shift offset translation to partition Shifting offset translation down the abstraction well to eventually reach rm_stm --- src/v/cluster/partition.cc | 42 ++++++++++++++++------ src/v/cluster/partition.h | 5 +-- src/v/kafka/server/replicated_partition.cc | 17 +++++---- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 4a5d70a878f7..4004498c2d47 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -102,9 +102,15 @@ partition::partition( } } -ss::future> partition::replicate( +ss::future> partition::replicate( model::record_batch_reader&& r, raft::replicate_options opts) { - return _raft->replicate(std::move(r), opts); + using ret_t = result; + auto res = co_await _raft->replicate(std::move(r), opts); + if (!res) { + co_return ret_t(res.error()); + } + co_return ret_t(kafka_result{ + kafka::offset(_translator->from_log_offset(res.value().last_offset)())}); } ss::shared_ptr partition::rm_stm() { @@ -126,10 +132,11 @@ ss::shared_ptr partition::rm_stm() { return _rm_stm; } -raft::replicate_stages partition::replicate_in_stages( +kafka_stages partition::replicate_in_stages( model::batch_identity bid, model::record_batch_reader&& r, raft::replicate_options opts) { + using ret_t = result; if (bid.is_transactional) { if (!_is_tx_enabled) { vlog( @@ -137,7 +144,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process a transactional request to {}. Transactional " "processing isn't enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -145,7 +152,7 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support transactional processing.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } @@ -156,7 +163,7 @@ raft::replicate_stages partition::replicate_in_stages( "Can't process an idempotent request to {}. Idempotency isn't " "enabled.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } if (!_rm_stm) { @@ -164,15 +171,29 @@ raft::replicate_stages partition::replicate_in_stages( clusterlog.error, "Topic {} doesn't support idempotency.", _raft->ntp()); - return raft::replicate_stages(raft::errc::timeout); + return kafka_stages(raft::errc::timeout); } } + ss::lw_shared_ptr res; if (_rm_stm) { - return _rm_stm->replicate_in_stages(bid, std::move(r), opts); + res = _rm_stm->replicate_in_stages(bid, std::move(r), opts); } else { - return _raft->replicate_in_stages(std::move(r), opts); + res = _raft->replicate_in_stages(std::move(r), opts); } + + auto replicate_finished = res->replicate_finished.then( + [this](result r) { + if (!r) { + return ret_t(r.error()); + } + auto old_offset = r.value().last_offset; + auto new_offset = kafka::offset( + _translator->from_log_offset(old_offset)()); + return ret_t(kafka_result{new_offset}); + }); + return kafka_stages( + std::move(res->request_enqueued), std::move(replicate_finished)); } ss::future<> partition::start() { @@ -180,7 +201,8 @@ ss::future<> partition::start() { _probe.setup_metrics(ntp); - auto f = _raft->start(); + auto f = _raft->start().then( + [this] { _translator = _raft->get_offset_translator_state(); }); if (is_id_allocator_topic(ntp)) { return f.then([this] { return _id_allocator_stm->start(); }); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 3a4e6e636acf..93f6b19b4387 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -50,10 +50,10 @@ class partition { ss::future<> start(); ss::future<> stop(); - ss::future> + ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); - raft::replicate_stages replicate_in_stages( + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); @@ -293,6 +293,7 @@ class partition { bool _is_tx_enabled{false}; bool _is_idempotence_enabled{false}; ss::lw_shared_ptr _cloud_storage_partition; + ss::lw_shared_ptr _translator; friend std::ostream& operator<<(std::ostream& o, const partition& x); }; diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index faf03c03d452..d24fe5fed21b 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -165,11 +165,11 @@ ss::future> replicated_partition::replicate( model::record_batch_reader rdr, raft::replicate_options opts) { using ret_t = result; return _partition->replicate(std::move(rdr), opts) - .then([this](result r) { + .then([](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->from_log_offset(r.value().last_offset)); + return ret_t(model::offset(r.value().last_offset())); }); } @@ -179,15 +179,18 @@ raft::replicate_stages replicated_partition::replicate( raft::replicate_options opts) { using ret_t = result; auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); - res.replicate_finished = res.replicate_finished.then( - [this](result r) { + + raft::replicate_stages out(raft::errc::success); + out.request_enqueued = std::move(res.request_enqueued); + out.replicate_finished = res.replicate_finished.then( + [](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(raft::replicate_result{ - _translator->from_log_offset(r.value().last_offset)}); + return ret_t( + raft::replicate_result{model::offset(r.value().last_offset())}); }); - return res; + return out; } std::optional replicated_partition::get_leader_epoch_last_offset(