Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicates consistency error by caching already translated offsets #5356

Merged
merged 13 commits into from
Jul 12, 2022
Merged
4 changes: 3 additions & 1 deletion src/v/cluster/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ std::string_view to_string_view(feature f) {
return "serde_raft_0";
case feature::license:
return "license";
case feature::rm_stm_kafka_cache:
return "rm_stm_kafka_cache";
case feature::test_alpha:
return "__test_alpha";
}
Expand Down Expand Up @@ -58,7 +60,7 @@ std::string_view to_string_view(feature_state::state s) {

// The version that this redpanda node will report: increment this
// on protocol changes to raft0 structures, like adding new services.
static constexpr cluster_version latest_version = cluster_version{4};
static constexpr cluster_version latest_version = cluster_version{5};

feature_table::feature_table() {
// Intentionally undocumented environment variable, only for use
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum class feature : std::uint64_t {
mtls_authentication = 0x8,
serde_raft_0 = 0x10,
license = 0x20,
rm_stm_kafka_cache = 0x40,

// Dummy features for testing only
test_alpha = uint64_t(1) << 63,
Expand Down Expand Up @@ -115,6 +116,12 @@ constexpr static std::array feature_schema{
feature::license,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster_version{5},
rystsov marked this conversation as resolved.
Show resolved Hide resolved
"rm_stm_kafka_cache",
feature::rm_stm_kafka_cache,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster_version{2001},
"__test_alpha",
Expand Down
109 changes: 33 additions & 76 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ partition::partition(
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _tx_gateway_frontend(tx_gateway_frontend)
, _feature_table(feature_table)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
config::shard_local_cfg().enable_idempotence.value()) {
Expand Down Expand Up @@ -70,7 +72,7 @@ partition::partition(

if (has_rm_stm) {
_rm_stm = ss::make_shared<cluster::rm_stm>(
clusterlog, _raft.get(), _tx_gateway_frontend);
clusterlog, _raft.get(), _tx_gateway_frontend, _feature_table);
stm_manager->add_stm(_rm_stm);
}

Expand Down Expand Up @@ -102,21 +104,15 @@ partition::partition(
}
}

ss::future<result<raft::replicate_result>> partition::replicate(
ss::future<result<kafka_result>> partition::replicate(
model::record_batch_reader&& r, raft::replicate_options opts) {
return _raft->replicate(std::move(r), opts);
}

raft::replicate_stages partition::replicate_in_stages(
rystsov marked this conversation as resolved.
Show resolved Hide resolved
model::record_batch_reader&& r, raft::replicate_options opts) {
return _raft->replicate_in_stages(std::move(r), opts);
}

ss::future<result<raft::replicate_result>> partition::replicate(
model::term_id term,
model::record_batch_reader&& r,
raft::replicate_options opts) {
return _raft->replicate(term, std::move(r), opts);
using ret_t = result<kafka_result>;
auto res = co_await _raft->replicate(std::move(r), opts);
if (!res) {
co_return ret_t(res.error());
}
co_return ret_t(kafka_result{
kafka::offset(_translator->from_log_offset(res.value().last_offset)())});
}

ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
Expand All @@ -138,26 +134,27 @@ ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
return _rm_stm;
}

raft::replicate_stages partition::replicate_in_stages(
kafka_stages partition::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader&& r,
raft::replicate_options opts) {
using ret_t = result<kafka_result>;
if (bid.is_transactional) {
if (!_is_tx_enabled) {
vlog(
clusterlog.error,
"Can't process a transactional request to {}. Transactional "
"processing isn't enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support transactional processing.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

Expand All @@ -168,84 +165,44 @@ raft::replicate_stages partition::replicate_in_stages(
"Can't process an idempotent request to {}. Idempotency isn't "
"enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support idempotency.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

if (_rm_stm) {
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
}
}

ss::future<result<raft::replicate_result>> partition::replicate(
model::batch_identity bid,
model::record_batch_reader&& r,
raft::replicate_options opts) {
rystsov marked this conversation as resolved.
Show resolved Hide resolved
if (bid.is_transactional) {
if (!_is_tx_enabled) {
vlog(
clusterlog.error,
"Can't process a transactional request to {}. Transactional "
"processing isn't enabled.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support transactional processing.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}
}

if (bid.has_idempotent()) {
if (!_is_idempotence_enabled) {
vlog(
clusterlog.error,
"Can't process an idempotent request to {}. Idempotency isn't "
"enabled.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support idempotency.",
_raft->ntp());
return ss::make_ready_future<result<raft::replicate_result>>(
raft::errc::timeout);
}
}

if (_rm_stm) {
return _rm_stm->replicate(bid, std::move(r), opts);
} else {
return _raft->replicate(std::move(r), opts);
}
auto res = _raft->replicate_in_stages(std::move(r), opts);
auto replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
return ret_t(r.error());
}
auto old_offset = r.value().last_offset;
auto new_offset = kafka::offset(
_translator->from_log_offset(old_offset)());
return ret_t(kafka_result{new_offset});
});
return kafka_stages(
std::move(res.request_enqueued), std::move(replicate_finished));
}

ss::future<> partition::start() {
auto ntp = _raft->ntp();

_probe.setup_metrics(ntp);

auto f = _raft->start();
auto f = _raft->start().then(
[this] { _translator = _raft->get_offset_translator_state(); });

if (is_id_allocator_topic(ntp)) {
return f.then([this] { return _id_allocator_stm->start(); });
Expand Down
27 changes: 8 additions & 19 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cloud_storage/remote_partition.h"
#include "cluster/archival_metadata_stm.h"
#include "cluster/feature_table.h"
#include "cluster/id_allocator_stm.h"
#include "cluster/partition_probe.h"
#include "cluster/rm_stm.h"
Expand Down Expand Up @@ -44,27 +45,17 @@ class partition {
consensus_ptr r,
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

raft::group_id group() const { return _raft->group(); }
ss::future<> start();
ss::future<> stop();

ss::future<result<raft::replicate_result>>
ss::future<result<kafka_result>>
replicate(model::record_batch_reader&&, raft::replicate_options);

raft::replicate_stages
replicate_in_stages(model::record_batch_reader&&, raft::replicate_options);

ss::future<result<raft::replicate_result>> replicate(
model::term_id, model::record_batch_reader&&, raft::replicate_options);

ss::future<result<raft::replicate_result>> replicate(
model::batch_identity,
model::record_batch_reader&&,
raft::replicate_options);

raft::replicate_stages replicate_in_stages(
kafka_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader&&,
raft::replicate_options);
Expand Down Expand Up @@ -289,11 +280,7 @@ class partition {
return _raft->abort_configuration_change(rev);
}

private:
friend partition_manager;
friend replicated_partition_probe;

consensus_ptr raft() { return _raft; }
consensus_ptr raft() const { return _raft; }

private:
consensus_ptr _raft;
Expand All @@ -305,9 +292,11 @@ class partition {
ss::abort_source _as;
partition_probe _probe;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<feature_table>& _feature_table;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::lw_shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ partition_manager::partition_manager(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cloud_storage::partition_recovery_manager>& recovery_mgr,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<feature_table>& feature_table)
: _storage(storage.local())
, _raft_manager(raft)
, _tx_gateway_frontend(tx_gateway_frontend)
, _partition_recovery_mgr(recovery_mgr)
, _cloud_storage_api(cloud_storage_api)
, _cloud_storage_cache(cloud_storage_cache) {}
, _cloud_storage_cache(cloud_storage_cache)
, _feature_table(feature_table) {}

partition_manager::ntp_table_container
partition_manager::get_topic_partition_table(
Expand Down Expand Up @@ -120,7 +122,11 @@ ss::future<consensus_ptr> partition_manager::manage(
group, std::move(initial_nodes), log);

auto p = ss::make_lw_shared<partition>(
c, _tx_gateway_frontend, _cloud_storage_api, _cloud_storage_cache);
c,
_tx_gateway_frontend,
_cloud_storage_api,
_cloud_storage_cache,
_feature_table);

_ntp_table.emplace(log.config().ntp(), p);
_raft_table.emplace(group, p);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_recovery_manager.h"
#include "cloud_storage/remote.h"
#include "cluster/feature_table.h"
#include "cluster/ntp_callbacks.h"
#include "cluster/partition.h"
#include "model/metadata.h"
Expand All @@ -37,7 +38,8 @@ class partition_manager {
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cloud_storage::partition_recovery_manager>&,
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&);
ss::sharded<cloud_storage::cache>&,
ss::sharded<feature_table>&);

using manage_cb_t
= ss::noncopyable_function<void(ss::lw_shared_ptr<partition>)>;
Expand Down Expand Up @@ -190,6 +192,7 @@ class partition_manager {
_partition_recovery_mgr;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<cloud_storage::cache>& _cloud_storage_cache;
ss::sharded<feature_table>& _feature_table;
ss::gate _gate;
bool _block_new_leadership{false};

Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
sm::make_gauge(
"leader_id",
[this] {
return _partition._raft->get_leader_id().value_or(
return _partition.raft()->get_leader_id().value_or(
model::node_id(-1));
},
sm::description("Id of current partition leader"),
Expand All @@ -98,7 +98,7 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
sm::make_gauge(
"under_replicated_replicas",
[this] {
auto metrics = _partition._raft->get_follower_metrics();
auto metrics = _partition.raft()->get_follower_metrics();
return std::count_if(
metrics.cbegin(),
metrics.cend(),
Expand Down Expand Up @@ -181,7 +181,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) {
sm::make_gauge(
"under_replicated_replicas",
[this] {
auto metrics = _partition._raft->get_follower_metrics();
auto metrics = _partition.raft()->get_follower_metrics();
return std::count_if(
metrics.cbegin(),
metrics.cend(),
Expand Down Expand Up @@ -214,7 +214,7 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) {
.aggregate({sm::shard_label, partition_label}),
sm::make_gauge(
"replicas",
[this] { return _partition._raft->get_follower_count(); },
[this] { return _partition.raft()->get_follower_count(); },
sm::description("Number of replicas per topic"),
labels)
.aggregate({sm::shard_label, partition_label}),
Expand Down
Loading