diff --git a/admin.go b/admin.go index e3a81c266..f9c7ce54e 100644 --- a/admin.go +++ b/admin.go @@ -76,11 +76,17 @@ type ClusterAdmin interface { // The configs for a particular resource are updated automatically. IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error + // Creates an access control list (ACL) which is bound to a specific resource. + // This operation is not transactional so it may succeed or fail. + // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but + // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. + CreateACL(resource Resource, acl Acl) error + // Creates access control lists (ACLs) which are bound to specific resources. // This operation is not transactional so it may succeed for some ACLs while fail for others. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. - CreateACL(resource Resource, acl Acl) error + CreateACLs([]*ResourceAcls) error // Lists access control lists (ACLs) according to the supplied filter. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls @@ -799,6 +805,28 @@ func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { return err } +func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error { + var acls []*AclCreation + for _, resourceACL := range resourceACLs { + for _, acl := range resourceACL.Acls { + acls = append(acls, &AclCreation{resourceACL.Resource, *acl}) + } + } + request := &CreateAclsRequest{AclCreations: acls} + + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } + + b, err := ca.Controller() + if err != nil { + return err + } + + _, err = b.CreateAcls(request) + return err +} + func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) { request := &DescribeAclsRequest{AclFilter: filter} diff --git a/admin_test.go b/admin_test.go index 3191fff89..677e20a0e 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1086,6 +1086,50 @@ func TestClusterAdminCreateAcl(t *testing.T) { } } +func TestClusterAdminCreateAcls(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateAclsRequest": NewMockCreateAclsResponse(t), + }) + + config := NewTestConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + rACLs := []*ResourceAcls{ + &ResourceAcls{ + Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}, + Acls: []*Acl{ + &Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}, + }, + }, + &ResourceAcls{ + Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "your_topic"}, + Acls: []*Acl{ + &Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}, + }, + }, + } + + err = admin.CreateACLs(rACLs) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminListAcls(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close()