Skip to content

Commit

Permalink
kafka: fixed: find tran coordinator was not ACL verified
Browse files Browse the repository at this point in the history
The code to handle FindCoordinator request for transaction coordinator type
appeared before the caller is checked for authorization for this operation
against the ACL. Now the chech has been moved before any other handling.
  • Loading branch information
dlex committed Jul 6, 2022
1 parent 547af6f commit 9b7174a
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/v/kafka/server/handlers/find_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ ss::future<response_ptr> find_coordinator_handler::handle(
find_coordinator_request request;
request.decode(ctx.reader(), ctx.header().version);

if (request.data.key_type == coordinator_type::group) {
if (!ctx.authorized(
security::acl_operation::describe, group_id(request.data.key))) {
return ctx.respond(find_coordinator_response(
error_code::group_authorization_failed));
}
} else if (request.data.key_type == coordinator_type::transaction) {
if (!ctx.authorized(
security::acl_operation::describe,
transactional_id(request.data.key))) {
return ctx.respond(find_coordinator_response(
error_code::transactional_id_authorization_failed));
}
}

if (request.data.key_type == coordinator_type::transaction) {
if (!ctx.are_transactions_enabled()) {
return ctx.respond(
Expand All @@ -98,21 +113,6 @@ ss::future<response_ptr> find_coordinator_handler::handle(
find_coordinator_response(error_code::unsupported_version));
}

if (request.data.key_type == coordinator_type::group) {
if (!ctx.authorized(
security::acl_operation::describe, group_id(request.data.key))) {
return ctx.respond(find_coordinator_response(
error_code::group_authorization_failed));
}
} else if (request.data.key_type == coordinator_type::transaction) {
if (!ctx.authorized(
security::acl_operation::describe,
transactional_id(request.data.key))) {
return ctx.respond(find_coordinator_response(
error_code::transactional_id_authorization_failed));
}
}

return ss::do_with(
std::move(ctx),
[request = std::move(request)](request_context& ctx) mutable {
Expand Down

0 comments on commit 9b7174a

Please sign in to comment.