Skip to content

Commit

Permalink
Merge pull request #10354 from NyaliaLui/backport-5848-v23.1.x-392
Browse files Browse the repository at this point in the history
[v23.1.x] k/client: handle consumer group coordinator changes
  • Loading branch information
NyaliaLui committed Apr 27, 2023
2 parents 1228abc + 8d1315c commit ae51a1b
Show file tree
Hide file tree
Showing 10 changed files with 506 additions and 177 deletions.
37 changes: 10 additions & 27 deletions src/v/kafka/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include "kafka/client/exceptions.h"
#include "kafka/client/logger.h"
#include "kafka/client/partitioners.h"
#include "kafka/client/retry_with_mitigation.h"
#include "kafka/client/sasl_client.h"
#include "kafka/client/utils.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/find_coordinator.h"
Expand Down Expand Up @@ -394,31 +394,13 @@ ss::future<fetch_response> client::fetch_partition(

ss::future<member_id>
client::create_consumer(const group_id& group_id, member_id name) {
auto build_request = [group_id]() {
return find_coordinator_request(group_id);
};
return gated_retry_with_mitigation(
[this, group_id, name, func{std::move(build_request)}]() {
return _brokers.any()
.then([func](shared_broker_t broker) {
return broker->dispatch(func());
})
.then([group_id, name](find_coordinator_response res) {
if (res.data.error_code != error_code::none) {
return ss::make_exception_future<
find_coordinator_response>(consumer_error(
group_id, name, res.data.error_code));
};
return ss::make_ready_future<find_coordinator_response>(
std::move(res));
});
})
.then([this](find_coordinator_response res) {
return make_broker(
res.data.node_id,
net::unresolved_address(res.data.host, res.data.port),
_config);
})
return find_coordinator_with_retry_and_mitigation(
_gate,
_config,
_brokers,
group_id,
name,
[this](std::exception_ptr ex) { return mitigate_error(ex); })
.then([this, group_id, name](shared_broker_t coordinator) mutable {
auto on_stopped = [this, group_id](const member_id& name) {
_consumers[group_id].erase(name);
Expand All @@ -430,7 +412,8 @@ client::create_consumer(const group_id& group_id, member_id name) {
std::move(coordinator),
std::move(group_id),
std::move(name),
std::move(on_stopped));
std::move(on_stopped),
[this](std::exception_ptr ex) { return mitigate_error(ex); });
})
.then([this, group_id](shared_consumer_t c) {
auto name = c->name();
Expand Down
18 changes: 7 additions & 11 deletions src/v/kafka/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include "kafka/client/consumer.h"
#include "kafka/client/fetcher.h"
#include "kafka/client/producer.h"
#include "kafka/client/retry_with_mitigation.h"
#include "kafka/client/topic_cache.h"
#include "kafka/client/transport.h"
#include "kafka/client/types.h"
#include "kafka/client/utils.h"
#include "kafka/protocol/create_topics.h"
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/list_offsets.h"
Expand Down Expand Up @@ -88,16 +88,12 @@ class client {
/// \brief Invoke func, on failure, mitigate error and retry.
template<typename Func>
std::invoke_result_t<Func> gated_retry_with_mitigation(Func func) {
return ss::try_with_gate(_gate, [this, func{std::move(func)}]() {
return retry_with_mitigation(
_config.retries(),
_config.retry_base_backoff(),
[this, func{std::move(func)}]() {
_gate.check();
return func();
},
[this](std::exception_ptr ex) { return mitigate_error(ex); });
});
return gated_retry_with_mitigation_impl(
_gate,
_config.retries(),
_config.retry_base_backoff(),
std::move(func),
[this](std::exception_ptr ex) { return mitigate_error(ex); });
}

/// \brief Dispatch a request to any broker.
Expand Down
124 changes: 119 additions & 5 deletions src/v/kafka/client/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "kafka/client/configuration.h"
#include "kafka/client/exceptions.h"
#include "kafka/client/logger.h"
#include "kafka/client/utils.h"
#include "kafka/protocol/describe_groups.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
Expand Down Expand Up @@ -92,7 +93,8 @@ consumer::consumer(
shared_broker_t coordinator,
kafka::group_id group_id,
kafka::member_id name,
ss::noncopyable_function<void(const kafka::member_id&)> on_stopped)
ss::noncopyable_function<void(const kafka::member_id&)> on_stopped,
ss::noncopyable_function<ss::future<>(std::exception_ptr)> mitigater)
: _config(config)
, _topic_cache(topic_cache)
, _brokers(brokers)
Expand All @@ -104,7 +106,8 @@ consumer::consumer(
, _group_id(std::move(group_id))
, _name(std::move(name))
, _topics()
, _on_stopped(std::move(on_stopped)) {}
, _on_stopped(std::move(on_stopped))
, _external_mitigate(std::move(mitigater)) {}

void consumer::start() {
vlog(kclog.info, "Consumer: {}: start", *this);
Expand Down Expand Up @@ -287,7 +290,7 @@ ss::future<> consumer::sync() {
: ss::make_ready_future<std::vector<metadata_response::topic>>())
.then([this](std::vector<metadata_response::topic> topics) {
auto req_builder = [me{shared_from_this()},
topics{std::move(topics)}]() {
topics{std::move(topics)}]() mutable {
auto assignments
= me->is_leader()
? me->_plan->encode(me->_plan->plan(me->_members, topics))
Expand Down Expand Up @@ -475,22 +478,133 @@ ss::future<fetch_response> consumer::fetch(
detail::reduce_fetch_response);
}

template<typename request_factory>
ss::future<
typename std::invoke_result_t<request_factory>::api_type::response_type>
consumer::reset_coordinator_and_retry_request(request_factory req) {
return find_coordinator_with_retry_and_mitigation(
_gate,
_config,
_brokers,
group_id(),
name(),
[this](std::exception_ptr ex) { return _external_mitigate(ex); })
.then(
[this, req{std::move(req)}](shared_broker_t new_coordinator) mutable {
_coordinator = new_coordinator;
// Calling req_res here will re-issue the request on the
// new coordinator
return req_res(std::move(req));
});
}

template<typename request_factory, typename response_t>
ss::future<response_t>
consumer::maybe_process_response_errors(request_factory req, response_t res) {
auto me = shared_from_this();
// By default, look at the top-level for errors
switch (res.data.error_code) {
case error_code::not_coordinator:
vlog(
kclog.debug,
"Wrong coordinator on consumer {}, getting new coordinator "
"before retry",
*me);
return reset_coordinator_and_retry_request(std::move(req));
default:
// Return the whole response otherwise
return ss::make_ready_future<response_t>(std::move(res));
}
}

template<typename request_factory>
ss::future<metadata_response> consumer::maybe_process_response_errors(
request_factory req, metadata_response res) {
auto me = shared_from_this();
for (auto& topic : res.data.topics) {
switch (topic.error_code) {
case error_code::not_coordinator:
vlog(
kclog.debug,
"Wrong coordinator on consumer {}, topic {}, getting new "
"coordinator before retry",
*me,
topic.name);
return reset_coordinator_and_retry_request(std::move(req));
default:
continue;
}
}
// Return the whole response otherwise
return ss::make_ready_future<metadata_response>(std::move(res));
}

template<typename request_factory>
ss::future<offset_commit_response> consumer::maybe_process_response_errors(
request_factory req, offset_commit_response res) {
auto me = shared_from_this();
for (auto& topic : res.data.topics) {
for (auto& partition : topic.partitions) {
switch (partition.error_code) {
case error_code::not_coordinator:
vlog(
kclog.debug,
"Wrong coordinator on consumer {}, tp {}, getting new "
"coordinator before retry",
*me,
model::topic_partition{
topic.name,
model::partition_id{partition.partition_index}});
return reset_coordinator_and_retry_request(std::move(req));
default:
continue;
}
}
}
// Return the whole response otherwise
return ss::make_ready_future<offset_commit_response>(std::move(res));
}

template<typename request_factory>
ss::future<describe_groups_response> consumer::maybe_process_response_errors(
request_factory req, describe_groups_response res) {
auto me = shared_from_this();
for (auto& group : res.data.groups) {
switch (group.error_code) {
case error_code::not_coordinator:
vlog(
kclog.debug,
"Wrong coordinator on consumer {}, group {}, getting new "
"coordinator before retry",
*me,
group);
return reset_coordinator_and_retry_request(std::move(req));
default:
continue;
}
}
// Return the whole response otherwise
return ss::make_ready_future<describe_groups_response>(std::move(res));
}

ss::future<shared_consumer_t> make_consumer(
const configuration& config,
topic_cache& topic_cache,
brokers& brokers,
shared_broker_t coordinator,
group_id group_id,
member_id name,
ss::noncopyable_function<void(const member_id&)> on_stopped) {
ss::noncopyable_function<void(const member_id&)> on_stopped,
ss::noncopyable_function<ss::future<>(std::exception_ptr)> mitigater) {
auto c = ss::make_lw_shared<consumer>(
config,
topic_cache,
brokers,
std::move(coordinator),
std::move(group_id),
std::move(name),
std::move(on_stopped));
std::move(on_stopped),
std::move(mitigater));
return c->initialize().then([c]() mutable { return std::move(c); });
}

Expand Down
50 changes: 43 additions & 7 deletions src/v/kafka/client/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class consumer final : public ss::enable_lw_shared_from_this<consumer> {
shared_broker_t coordinator,
group_id group_id,
member_id name,
ss::noncopyable_function<void(const member_id&)> on_stopped);
ss::noncopyable_function<void(const member_id&)> on_stopped,
ss::noncopyable_function<ss::future<>(std::exception_ptr)> mitigater);

const kafka::group_id& group_id() const { return _group_id; }
const kafka::member_id& member_id() const { return _member_id; }
Expand Down Expand Up @@ -106,18 +107,50 @@ class consumer final : public ss::enable_lw_shared_from_this<consumer> {
req_res(RequestFactory req) {
using api_t = typename std::invoke_result_t<RequestFactory>::api_type;
using response_t = typename api_t::response_type;
return ss::try_with_gate(_gate, [this, req{std::move(req)}]() {
return ss::try_with_gate(_gate, [this, req{std::move(req)}]() mutable {
auto r = req();
kclog.debug("Consumer: {}: {} req: {}", *this, api_t::name, r);
kclog.debug(
"Consumer: {}: {} req: {}, coordinator {}",
*this,
api_t::name,
r,
_coordinator->id());
return _coordinator->dispatch(std::move(r))
.then([this](response_t res) {
.then([this, req{std::move(req)}](response_t res) mutable {
kclog.debug(
"Consumer: {}: {} res: {}", *this, api_t::name, res);
return res;
"Consumer: {}: {} res: {}, coordinator {}",
*this,
api_t::name,
res,
_coordinator->id());
return maybe_process_response_errors(
std::move(req), std::move(res));
});
});
}

// The base template for handling response errors
template<typename request_factory, typename response_t>
ss::future<response_t>
maybe_process_response_errors(request_factory req, response_t res);

// Some template specializations for handling response errors of specific
// response types
template<typename request_factory>
ss::future<metadata_response>
maybe_process_response_errors(request_factory req, metadata_response res);
template<typename request_factory>
ss::future<offset_commit_response> maybe_process_response_errors(
request_factory req, offset_commit_response res);
template<typename request_factory>
ss::future<describe_groups_response> maybe_process_response_errors(
request_factory req, describe_groups_response res);

template<typename request_factory>
ss::future<
typename std::invoke_result_t<request_factory>::api_type::response_type>
reset_coordinator_and_retry_request(request_factory req);

const configuration& _config;
topic_cache& _topic_cache;
brokers& _brokers;
Expand All @@ -139,6 +172,8 @@ class consumer final : public ss::enable_lw_shared_from_this<consumer> {
assignment_t _assignment{};
absl::node_hash_map<shared_broker_t, fetch_session> _fetch_sessions;
ss::noncopyable_function<void(const kafka::member_id&)> _on_stopped;
ss::noncopyable_function<ss::future<>(std::exception_ptr)>
_external_mitigate;

friend std::ostream& operator<<(std::ostream& os, const consumer& c) {
fmt::print(
Expand All @@ -160,7 +195,8 @@ ss::future<shared_consumer_t> make_consumer(
shared_broker_t coordinator,
group_id group_id,
member_id name,
ss::noncopyable_function<void(const member_id&)> _on_stopped);
ss::noncopyable_function<void(const member_id&)> _on_stopped,
ss::noncopyable_function<ss::future<>(std::exception_ptr)> mitigater);

namespace detail {

Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "kafka/client/configuration.h"
#include "kafka/client/exceptions.h"
#include "kafka/client/logger.h"
#include "kafka/client/retry_with_mitigation.h"
#include "kafka/client/utils.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/produce.h"
#include "model/fundamental.h"
Expand Down
Loading

0 comments on commit ae51a1b

Please sign in to comment.