From 8ea6b149bf0ecf66bf173523ffc0c760bbd3695d Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 16 Jun 2022 11:16:23 -0700 Subject: [PATCH] rm_stm: make request validation stronger Return invalid_request instead of processing a wrong if branch --- src/v/cluster/errc.h | 3 +++ src/v/cluster/rm_stm.cc | 11 ++++++++++- src/v/kafka/server/errors.h | 2 ++ src/v/kafka/server/handlers/produce.cc | 2 ++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 426b60535477..1f85cad7c67c 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -59,6 +59,7 @@ enum class errc : int16_t { error_collecting_health_report, leadership_changed, feature_disabled, + invalid_request, }; struct errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::errc"; } @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category { "to finish"; case errc::feature_disabled: return "Requested feature is disabled"; + case errc::invalid_request: + return "Invalid request"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 8f76d56c9092..21d06ef3193a 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -805,7 +805,7 @@ ss::future> rm_stm::do_replicate( return replicate_tx(bid, std::move(b)); }) .finally([u = std::move(unit)] {}); - } else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) { + } else if (bid.has_idempotent()) { request_id rid{.pid = bid.pid, .seq = bid.first_seq}; return with_request_lock( rid, @@ -1098,6 +1098,15 @@ ss::future> rm_stm::replicate_seq( } auto synced_term = _insync_term; + if (bid.first_seq > bid.last_seq) { + vlog( + clusterlog.warn, + "first_seq={} of the batch should be less or equal to last_seq={}", + bid.first_seq, + bid.last_seq); + co_return errc::invalid_request; + } + auto cached_offset = known_seq(bid); if (cached_offset) { if (cached_offset.value() < model::offset{0}) { diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 354c80df3b23..e8d86e9da831 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -43,6 +43,8 @@ constexpr error_code map_topic_error_code(cluster::errc code) { return error_code::broker_not_available; case cluster::errc::not_leader: return error_code::not_coordinator; + case cluster::errc::invalid_request: + return error_code::invalid_request; case cluster::errc::replication_error: case cluster::errc::shutting_down: case cluster::errc::join_request_dispatch_error: diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 546dfa2057ce..687b127c6c0b 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -154,6 +154,8 @@ static error_code map_produce_error_code(std::error_code ec) { return error_code::invalid_producer_epoch; case cluster::errc::sequence_out_of_order: return error_code::out_of_order_sequence_number; + case cluster::errc::invalid_request: + return error_code::invalid_request; default: return error_code::unknown_server_error; }