Skip to content

Commit

Permalink
Merge pull request #5356 from rystsov/seq-kafka-offset-cache
Browse files Browse the repository at this point in the history
Fix duplicates consistency error by caching already translated offsets
  • Loading branch information
rystsov committed Jul 12, 2022
2 parents 14647aa + c45672a commit 09966f9
Show file tree
Hide file tree
Showing 28 changed files with 573 additions and 261 deletions.
4 changes: 3 additions & 1 deletion src/v/cluster/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ std::string_view to_string_view(feature f) {
return "serde_raft_0";
case feature::license:
return "license";
case feature::rm_stm_kafka_cache:
return "rm_stm_kafka_cache";
case feature::test_alpha:
return "__test_alpha";
}
Expand Down Expand Up @@ -58,7 +60,7 @@ std::string_view to_string_view(feature_state::state s) {

// The version that this redpanda node will report: increment this
// on protocol changes to raft0 structures, like adding new services.
static constexpr cluster_version latest_version = cluster_version{4};
static constexpr cluster_version latest_version = cluster_version{5};

feature_table::feature_table() {
// Intentionally undocumented environment variable, only for use
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum class feature : std::uint64_t {
mtls_authentication = 0x8,
serde_raft_0 = 0x10,
license = 0x20,
rm_stm_kafka_cache = 0x40,

// Dummy features for testing only
test_alpha = uint64_t(1) << 63,
Expand Down Expand Up @@ -115,6 +116,12 @@ constexpr static std::array feature_schema{
feature::license,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster_version{5},
"rm_stm_kafka_cache",
feature::rm_stm_kafka_cache,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster_version{2001},
"__test_alpha",
Expand Down
109 changes: 33 additions & 76 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ partition::partition(
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _tx_gateway_frontend(tx_gateway_frontend)
, _feature_table(feature_table)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
config::shard_local_cfg().enable_idempotence.value()) {
Expand Down Expand Up @@ -70,7 +72,7 @@ partition::partition(

if (has_rm_stm) {
_rm_stm = ss::make_shared<cluster::rm_stm>(
clusterlog, _raft.get(), _tx_gateway_frontend);
clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table);
stm_manager->add_stm(_rm_stm);
}

Expand Down Expand Up @@ -102,21 +104,15 @@ partition::partition(
}
}

ss::future<result<raft::replicate_result>> partition::replicate(
ss::future<result<kafka_result>> partition::replicate(
model::record_batch_reader&& r, raft::replicate_options opts) {
return _raft->replicate(std::move(r), opts);
}

raft::replicate_stages partition::replicate_in_stages(
model::record_batch_reader&& r, raft::replicate_options opts) {
return _raft->replicate_in_stages(std::move(r), opts);
}

ss::future<result<raft::replicate_result>> partition::replicate(
model::term_id term,
model::record_batch_reader&& r,
raft::replicate_options opts) {
return _raft->replicate(term, std::move(r), opts);
using ret_t = result<kafka_result>;
auto res = co_await _raft->replicate(std::move(r), opts);
if (!res) {
co_return ret_t(res.error());
}
co_return ret_t(kafka_result{
kafka::offset(_translator->from_log_offset(res.value().last_offset)())});
}

ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
Expand All @@ -138,26 +134,27 @@ ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
return _rm_stm;
}

raft::replicate_stages partition::replicate_in_stages(
kafka_stages partition::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader&& r,
raft::replicate_options opts) {
using ret_t = result<kafka_result>;
if (bid.is_transactional) {
if (!_is_tx_enabled) {
vlog(
clusterlog.error,
"Can't process a transactional request to {}. Transactional "
"processing isn't enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support transactional processing.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

Expand All @@ -168,84 +165,44 @@ raft::replicate_stages partition::replicate_in_stages(
"Can't process an idempotent request to {}. Idempotency isn't "
"enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support idempotency.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

if (_rm_stm) {
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
}
}

ss::future<result<raft::replicate_result>> partition::replicate(
model::batch_identity bid,
model::record_batch_reader&& r,
raft::replicate_options opts) {
if (bid.is_transactional) {
if (!_is_tx_enabled) {
vlog(
clusterlog.error,
"Can't process a transactional request to {}. Transactional "
"processing isn't enabled.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support transactional processing.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}
}

if (bid.has_idempotent()) {
if (!_is_idempotence_enabled) {
vlog(
clusterlog.error,
"Can't process an idempotent request to {}. Idempotency isn't "
"enabled.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support idempotency.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}
}

if (_rm_stm) {
return _rm_stm->replicate(bid, std::move(r), opts);
} else {
return _raft->replicate(std::move(r), opts);
}
auto res = _raft->replicate_in_stages(std::move(r), opts);
auto replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
return ret_t(r.error());
}
auto old_offset = r.value().last_offset;
auto new_offset = kafka::offset(
_translator->from_log_offset(old_offset)());
return ret_t(kafka_result{new_offset});
});
return kafka_stages(
std::move(res.request_enqueued), std::move(replicate_finished));
}

ss::future<> partition::start() {
auto ntp = _raft->ntp();

_probe.setup_metrics(ntp);

auto f = _raft->start();
auto f = _raft->start().then(
[this] { _translator = _raft->get_offset_translator_state(); });

if (is_id_allocator_topic(ntp)) {
return f.then([this] { return _id_allocator_stm->start(); });
Expand Down
27 changes: 8 additions & 19 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cloud_storage/remote_partition.h"
#include "cluster/archival_metadata_stm.h"
#include "cluster/feature_table.h"
#include "cluster/id_allocator_stm.h"
#include "cluster/partition_probe.h"
#include "cluster/rm_stm.h"
Expand Down Expand Up @@ -44,27 +45,17 @@ class partition {
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

raft::group_id group() const { return _raft->group(); }
ss::future<> start();
ss::future<> stop();

ss::future<result<raft::replicate_result>>
ss::future<result<kafka_result>>
replicate(model::record_batch_reader&&, raft::replicate_options);

raft::replicate_stages
replicate_in_stages(model::record_batch_reader&&, raft::replicate_options);

ss::future<result<raft::replicate_result>> replicate(
model::term_id, model::record_batch_reader&&, raft::replicate_options);

ss::future<result<raft::replicate_result>> replicate(
model::batch_identity,
model::record_batch_reader&&,
raft::replicate_options);

raft::replicate_stages replicate_in_stages(
kafka_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader&&,
raft::replicate_options);
Expand Down Expand Up @@ -289,11 +280,7 @@ class partition {
return _raft->abort_configuration_change(rev);
}

private:
friend partition_manager;
friend replicated_partition_probe;

consensus_ptr raft() { return _raft; }
consensus_ptr raft() const { return _raft; }

private:
consensus_ptr _raft;
Expand All @@ -305,9 +292,11 @@ class partition {
ss::abort_source _as;
partition_probe _probe;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<feature_table>& _feature_table;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::lw_shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ partition_manager::partition_manager(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::partition_recovery_manager>& recovery_mgr,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _storage(storage.local())
, _raft_manager(raft)
, _tx_gateway_frontend(tx_gateway_frontend)
, _partition_recovery_mgr(recovery_mgr)
, _cloud_storage_api(cloud_storage_api)
, _cloud_storage_cache(cloud_storage_cache) {}
, _cloud_storage_cache(cloud_storage_cache)
, _feature_table(feature_table) {}

partition_manager::ntp_table_container
partition_manager::get_topic_partition_table(
Expand Down Expand Up @@ -120,7 +122,11 @@ ss::future<consensus_ptr> partition_manager::manage(
group, std::move(initial_nodes), log);

auto p = ss::make_lw_shared<partition>(
c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache);
c,
_tx_gateway_frontend,
_cloud_storage_api,
_cloud_storage_cache,
_feature_table);

_ntp_table.emplace(log.config().ntp(), p);
_raft_table.emplace(group, p);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_recovery_manager.h"
#include "cloud_storage/remote.h"
#include "cluster/feature_table.h"
#include "cluster/ntp_callbacks.h"
#include "cluster/partition.h"
#include "model/metadata.h"
Expand All @@ -37,7 +38,8 @@ class partition_manager {
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::partition_recovery_manager>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

using manage_cb_t
= ss::noncopyable_function<void(ss::lw_shared_ptr<partition>)>;
Expand Down Expand Up @@ -190,6 +192,7 @@ class partition_manager {
_partition_recovery_mgr;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<cloud_storage::cache>& _cloud_storage_cache;
ss::sharded<feature_table>& _feature_table;
ss::gate _gate;
bool _block_new_leadership{false};

Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
sm::make_gauge(
"leader_id",
[this] {
return _partition._raft->get_leader_id().value_or(
return _partition.raft()->get_leader_id().value_or(
model::node_id(-1));
},
sm::description("Id of current partition leader"),
Expand All @@ -98,7 +98,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
sm::make_gauge(
"under_replicated_replicas",
[this] {
auto metrics = _partition._raft->get_follower_metrics();
auto metrics = _partition.raft()->get_follower_metrics();
return std::count_if(
metrics.cbegin(),
metrics.cend(),
Expand Down Expand Up @@ -181,7 +181,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) {
sm::make_gauge(
"under_replicated_replicas",
[this] {
auto metrics = _partition._raft->get_follower_metrics();
auto metrics = _partition.raft()->get_follower_metrics();
return std::count_if(
metrics.cbegin(),
metrics.cend(),
Expand Down Expand Up @@ -214,7 +214,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) {
.aggregate({sm::shard_label, partition_label}),
sm::make_gauge(
"replicas",
[this] { return _partition._raft->get_follower_count(); },
[this] { return _partition.raft()->get_follower_count(); },
sm::description("Number of replicas per topic"),
labels)
.aggregate({sm::shard_label, partition_label}),
Expand Down
Loading

0 comments on commit 09966f9

Please sign in to comment.