Skip to content

Commit

Permalink
Merge pull request redpanda-data#5066 from mmaslankaprv/fix-4976
Browse files Browse the repository at this point in the history
k/replicated_partition: do not validate fetch offset when stopping
  • Loading branch information
mmaslankaprv committed Jun 29, 2022
2 parents ee5dcce + af8c601 commit f07e4fc
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 15 deletions.
11 changes: 6 additions & 5 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ static ss::future<read_result> do_read_from_ntp(
if (leader_epoch_err != error_code::none) {
co_return read_result(leader_epoch_err);
}
auto is_offset_valid = co_await kafka_partition->is_fetch_offset_valid(
auto offset_ec = co_await kafka_partition->validate_fetch_offset(
ntp_config.cfg.start_offset,
default_fetch_timeout + model::timeout_clock::now());

Expand All @@ -176,16 +176,17 @@ static ss::future<read_result> do_read_from_ntp(
}
}

if (!is_offset_valid) {
if (offset_ec != error_code::none) {
vlog(
klog.warn,
"fetch offset out of range for {}, requested offset: {}, "
"partition start offset: {}, high watermark: {}",
"partition start offset: {}, high watermark: {}, ec: {}",
ntp_config.ntp(),
ntp_config.cfg.start_offset,
kafka_partition->start_offset(),
kafka_partition->high_watermark());
co_return read_result(error_code::offset_out_of_range);
kafka_partition->high_watermark(),
offset_ec);
co_return read_result(offset_ec);
}
co_return co_await read_from_partition(
std::move(*kafka_partition), ntp_config.cfg, foreign_read, deadline);
Expand Down
7 changes: 5 additions & 2 deletions src/v/kafka/server/materialized_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "cluster/partition_probe.h"
#include "coproc/partition.h"
#include "kafka/protocol/errors.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -88,9 +89,11 @@ class materialized_partition final : public kafka::partition_proxy::impl {

cluster::partition_probe& probe() final { return _probe; }

ss::future<bool> is_fetch_offset_valid(
ss::future<error_code> validate_fetch_offset(
model::offset fetch_offset, model::timeout_clock::time_point) final {
co_return fetch_offset >= start_offset();
co_return fetch_offset >= start_offset()
? error_code::none
: error_code::offset_out_of_range;
}

private:
Expand Down
9 changes: 5 additions & 4 deletions src/v/kafka/server/partition_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/metadata_cache.h"
#include "cluster/partition.h"
#include "coproc/fwd.h"
#include "kafka/protocol/errors.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "storage/translating_reader.h"
Expand Down Expand Up @@ -52,8 +53,8 @@ class partition_proxy {
model::offset,
ss::lw_shared_ptr<const storage::offset_translator_state>)
= 0;
virtual ss::future<bool>
is_fetch_offset_valid(model::offset, model::timeout_clock::time_point)
virtual ss::future<error_code>
validate_fetch_offset(model::offset, model::timeout_clock::time_point)
= 0;
virtual cluster::partition_probe& probe() = 0;
virtual ~impl() noexcept = default;
Expand Down Expand Up @@ -107,9 +108,9 @@ class partition_proxy {
return _impl->get_leader_epoch_last_offset(epoch);
}

ss::future<bool> is_fetch_offset_valid(
ss::future<error_code> validate_fetch_offset(
model::offset o, model::timeout_clock::time_point deadline) {
return _impl->is_fetch_offset_valid(o, deadline);
return _impl->validate_fetch_offset(o, deadline);
}

private:
Expand Down
21 changes: 18 additions & 3 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
*/
#include "kafka/server/replicated_partition.h"

#include "cluster/errc.h"
#include "kafka/protocol/errors.h"
#include "kafka/server/logger.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/timeout_clock.h"
#include "raft/consensus_utils.h"
#include "raft/errc.h"
#include "raft/types.h"
#include "storage/types.h"

Expand Down Expand Up @@ -209,7 +211,7 @@ std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
return std::nullopt;
}

ss::future<bool> replicated_partition::is_fetch_offset_valid(
ss::future<error_code> replicated_partition::validate_fetch_offset(
model::offset fetch_offset, model::timeout_clock::time_point deadline) {
/**
* Make sure that we will update high watermark offset if it isn't yet
Expand All @@ -234,6 +236,18 @@ ss::future<bool> replicated_partition::is_fetch_offset_valid(
// retry linearizable barrier to make sure node is still a leader
auto ec = co_await linearizable_barrier();
if (ec) {
/**
* when partition is shutting down we may not be able to correctly
* validate consumer requested offset. Return
* not_leader_for_partition error to force client to retry instead
* of out of range error that is forcing consumers to reset their
* offset.
*/
if (
ec == raft::errc::shutting_down
|| ec == cluster::errc::shutting_down) {
co_return error_code::not_leader_for_partition;
}
vlog(
klog.warn,
"error updating partition high watermark with linearizable "
Expand All @@ -249,8 +263,9 @@ ss::future<bool> replicated_partition::is_fetch_offset_valid(
_partition->high_watermark());
}

co_return fetch_offset >= start_offset()
&& fetch_offset <= high_watermark();
co_return fetch_offset >= start_offset() && fetch_offset <= high_watermark()
? error_code::none
: error_code::offset_out_of_range;
}

} // namespace kafka
3 changes: 2 additions & 1 deletion src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cluster/partition.h"
#include "cluster/partition_probe.h"
#include "kafka/protocol/errors.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -98,7 +99,7 @@ class replicated_partition final : public kafka::partition_proxy::impl {
return leader_epoch_from_term(_partition->term());
}

ss::future<bool> is_fetch_offset_valid(
ss::future<error_code> validate_fetch_offset(
model::offset, model::timeout_clock::time_point) final;

private:
Expand Down

0 comments on commit f07e4fc

Please sign in to comment.