Skip to content

Commit

Permalink
rm_stm: make request validation stronger
Browse files Browse the repository at this point in the history
Return invalid_request instead of processing a wrong if branch
  • Loading branch information
rystsov committed Jun 23, 2022
1 parent 9b5491b commit 8ea6b14
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class errc : int16_t {
error_collecting_health_report,
leadership_changed,
feature_disabled,
invalid_request,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -167,6 +168,8 @@ struct errc_category final : public std::error_category {
"to finish";
case errc::feature_disabled:
return "Requested feature is disabled";
case errc::invalid_request:
return "Invalid request";
}
return "cluster::errc::unknown";
}
Expand Down
11 changes: 10 additions & 1 deletion src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
return replicate_tx(bid, std::move(b));
})
.finally([u = std::move(unit)] {});
} else if (bid.has_idempotent() && bid.first_seq <= bid.last_seq) {
} else if (bid.has_idempotent()) {
request_id rid{.pid = bid.pid, .seq = bid.first_seq};
return with_request_lock(
rid,
Expand Down Expand Up @@ -1098,6 +1098,15 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
}
auto synced_term = _insync_term;

if (bid.first_seq > bid.last_seq) {
vlog(
clusterlog.warn,
"first_seq={} of the batch should be less or equal to last_seq={}",
bid.first_seq,
bid.last_seq);
co_return errc::invalid_request;
}

auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
return error_code::broker_not_available;
case cluster::errc::not_leader:
return error_code::not_coordinator;
case cluster::errc::invalid_request:
return error_code::invalid_request;
case cluster::errc::replication_error:
case cluster::errc::shutting_down:
case cluster::errc::join_request_dispatch_error:
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ static error_code map_produce_error_code(std::error_code ec) {
return error_code::invalid_producer_epoch;
case cluster::errc::sequence_out_of_order:
return error_code::out_of_order_sequence_number;
case cluster::errc::invalid_request:
return error_code::invalid_request;
default:
return error_code::unknown_server_error;
}
Expand Down

0 comments on commit 8ea6b14

Please sign in to comment.