Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CreateAcls Admin API support #839

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-211:
Expand All @@ -145,7 +148,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-222:
Expand All @@ -161,7 +167,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-231:
Expand All @@ -177,7 +186,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-241:
Expand All @@ -204,7 +216,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-260:
Expand All @@ -231,7 +246,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-271:
Expand All @@ -258,7 +276,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

workflows:
Expand Down
49 changes: 49 additions & 0 deletions createacl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientCreateACLs(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.0.1") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-topic-for-alice",
Host: "*",
},
{
Principal: "User:bob",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-group-for-bob",
Host: "*",
},
},
})
if err != nil {
t.Fatal(err)
}

for _, err := range res.Errors {
if err != nil {
t.Error(err)
}
}
}
108 changes: 108 additions & 0 deletions createacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/createacls"
)

// CreateACLsRequest represents a request sent to a kafka broker to add
// new ACLs.
type CreateACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of ACL to create.
ACLs []ACLEntry
}

// CreateACLsResponse represents a response from a kafka broker to an ACL
// creation request.
type CreateACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of errors that occurred while attempting to create
// the ACLs.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors []error
}

type ACLPermissionType int8

const (
ACLPermissionTypeUnknown ACLPermissionType = 0
ACLPermissionTypeAny ACLPermissionType = 1
ACLPermissionTypeDeny ACLPermissionType = 2
ACLPermissionTypeAllow ACLPermissionType = 3
)

type ACLOperationType int8

const (
ACLOperationTypeUnknown ACLOperationType = 0
ACLOperationTypeAny ACLOperationType = 1
ACLOperationTypeAll ACLOperationType = 2
ACLOperationTypeRead ACLOperationType = 3
ACLOperationTypeWrite ACLOperationType = 4
ACLOperationTypeCreate ACLOperationType = 5
ACLOperationTypeDelete ACLOperationType = 6
ACLOperationTypeAlter ACLOperationType = 7
ACLOperationTypeDescribe ACLOperationType = 8
ACLOperationTypeClusterAction ACLOperationType = 9
ACLOperationTypeDescribeConfigs ACLOperationType = 10
ACLOperationTypeAlterConfigs ACLOperationType = 11
ACLOperationTypeIdempotentWrite ACLOperationType = 12
)

type ACLEntry struct {
ResourceType ResourceType
ResourceName string
ResourcePatternType PatternType
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// CreateACLs sends ACLs creation request to a kafka broker and returns the
// response.
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
acls := make([]createacls.RequestACLs, 0, len(req.ACLs))

for _, acl := range req.ACLs {
acls = append(acls, createacls.RequestACLs{
ResourceType: int8(acl.ResourceType),
ResourceName: acl.ResourceName,
ResourcePatternType: int8(acl.ResourcePatternType),
Principal: acl.Principal,
Host: acl.Host,
Operation: int8(acl.Operation),
PermissionType: int8(acl.PermissionType),
})
}

m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
Creations: acls,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
}

res := m.(*createacls.Response)
ret := &CreateACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Errors: make([]error, 0, len(res.Results)),
}

for _, t := range res.Results {
ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
}

return ret, nil
}
10 changes: 0 additions & 10 deletions describeconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ type DescribeConfigsRequest struct {
IncludeDocumentation bool
}

type ResourceType int8

const (
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
ResourceTypeUnknown ResourceType = 0
ResourceTypeTopic ResourceType = 2
ResourceTypeBroker ResourceType = 4
)

type DescribeConfigRequestResource struct {
// Resource Type
ResourceType ResourceType
Expand Down Expand Up @@ -122,7 +113,6 @@ func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsReques
IncludeSynonyms: req.IncludeSynonyms,
IncludeDocumentation: req.IncludeDocumentation,
})

if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ services:
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
zirkome marked this conversation as resolved.
Show resolved Hide resolved
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
Expand Down
49 changes: 49 additions & 0 deletions protocol/createacls/createacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package createacls

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`

Creations []RequestACLs `kafka:"min=v0,max=v2"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[cluster.Controller], nil
}

type RequestACLs struct {
ResourceType int8 `kafka:"min=v0,max=v2"`
ResourceName string `kafka:"min=v0,max=v2"`
ResourcePatternType int8 `kafka:"min=v0,max=v2"`
Principal string `kafka:"min=v0,max=v2"`
Host string `kafka:"min=v0,max=v2"`
Operation int8 `kafka:"min=v0,max=v2"`
PermissionType int8 `kafka:"min=v0,max=v2"`
}

type Response struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
Results []ResponseACLs `kafka:"min=v0,max=v2"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls }

type ResponseACLs struct {
ErrorCode int16 `kafka:"min=v0,max=v2"`
ErrorMessage string `kafka:"min=v0,max=v2,nullable"`
}

var _ protocol.BrokerMessage = (*Request)(nil)
37 changes: 37 additions & 0 deletions resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kafka

// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
type ResourceType int8

const (
ResourceTypeUnknown ResourceType = 0
ResourceTypeAny ResourceType = 1
ResourceTypeTopic ResourceType = 2
ResourceTypeGroup ResourceType = 3
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
ResourceTypeBroker ResourceType = 4
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this should have been under a ConfigResourceType type but instead decided to keep it for backward compatibility.

ResourceTypeCluster ResourceType = 4
ResourceTypeTransactionalID ResourceType = 5
ResourceTypeDelegationToken ResourceType = 6
)

// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
type PatternType int8

const (
// PatternTypeUnknown represents any PatternType which this client cannot
// understand.
PatternTypeUnknown PatternType = 0
// PatternTypeAny matches any resource pattern type.
PatternTypeAny PatternType = 1
// PatternTypeMatch perform pattern matching.
PatternTypeMatch PatternType = 2
// PatternTypeLiteral represents a literal name.
// A literal name defines the full name of a resource, e.g. topic with name
// 'foo', or group with name 'bob'.
PatternTypeLiteral PatternType = 3
// PatternTypePrefixed represents a prefixed name.
// A prefixed name defines a prefix for a resource, e.g. topics with names
// that start with 'foo'.
PatternTypePrefixed PatternType = 4
)