Skip to content

Commit

Permalink
Merge pull request redpanda-data#6969 from RafalKorepta/rk/fix/k8s-co…
Browse files Browse the repository at this point in the history
…nsole-error-handling

k8s: Add context to error messages
  • Loading branch information
RafalKorepta committed Nov 29, 2022
2 parents 5e37ccc + f0fa502 commit 5816930
Showing 1 changed file with 26 additions and 30 deletions.
56 changes: 26 additions & 30 deletions src/go/k8s/pkg/console/sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,16 @@ func NewKafkaACL(

// Ensure implements Resource interface
func (k *KafkaACL) Ensure(ctx context.Context) error {
// Build ACL for console SASL user to access everything
b := kadm.NewACLs().
Allow(k.superUsersResource.GetUsername()).
Topics("*").Groups("*").Clusters().Operations(kadm.OpAll).
ResourcePatternType(kadm.ACLPatternLiteral)
if err := b.ValidateCreate(); err != nil {
return fmt.Errorf("validating create ACLs: %w", err)
}
b.PrefixUserExcept()

kadmclient, err := k.kafkaAdmin(ctx, k.Client, k.clusterobj, k.store)
kadmclient, b, err := k.createAdminClient(ctx)
if err != nil {
return fmt.Errorf("creating kafka admin client: %w", err)
return fmt.Errorf("creating console sasl user: %w", err)
}

results, err := kadmclient.CreateACLs(ctx, b)
var errList []error
if err != nil {
return fmt.Errorf("creating kafka ACLs: %w", err)
errList = append(errList, err)
}
// CreateACLs returns no error, check results
var errList []error
for _, r := range results {
if r.Err != nil {
errList = append(errList, r.Err)
Expand Down Expand Up @@ -243,27 +232,16 @@ func (k *KafkaACL) Cleanup(ctx context.Context) error {
return nil
}

// Build ACL for console SASL user to access everything
b := kadm.NewACLs().
Allow(k.superUsersResource.GetUsername()).AllowHosts().
Topics("*").Groups("*").Clusters().Operations(kadm.OpAll).
ResourcePatternType(kadm.ACLPatternLiteral)
if err := b.ValidateCreate(); err != nil {
return fmt.Errorf("validating create ACLs: %w", err)
}
b.PrefixUserExcept()

kadmclient, err := k.kafkaAdmin(ctx, k.Client, k.clusterobj, k.store)
kadmclient, b, err := k.createAdminClient(ctx)
if err != nil {
return fmt.Errorf("creating kafka admin client: %w", err)
return fmt.Errorf("cleaning console sasl user: %w", err)
}

results, err := kadmclient.DeleteACLs(ctx, b)
var errList []error
if err != nil {
return fmt.Errorf("deleting kafka ACLs: %w", err)
errList = append(errList, err)
}
// DeleteACLs returns no error, check results
var errList []error
for _, r := range results {
if r.Err != nil {
errList = append(errList, r.Err)
Expand All @@ -276,3 +254,21 @@ func (k *KafkaACL) Cleanup(ctx context.Context) error {
controllerutil.RemoveFinalizer(k.consoleobj, ConsoleACLFinalizer)
return k.Update(ctx, k.consoleobj)
}

func (k *KafkaACL) createAdminClient(ctx context.Context) (KafkaAdminClient, *kadm.ACLBuilder, error) {
// Build ACL for console SASL user to access everything
b := kadm.NewACLs().
Allow(k.superUsersResource.GetUsername()).AllowHosts().
Topics("*").Groups("*").Clusters().Operations(kadm.OpAll).
ResourcePatternType(kadm.ACLPatternLiteral)
if err := b.ValidateCreate(); err != nil {
return nil, nil, fmt.Errorf("validating ACLs: %w", err)
}
b.PrefixUserExcept()

kadmclient, err := k.kafkaAdmin(ctx, k.Client, k.clusterobj, k.store)
if err != nil {
return nil, nil, fmt.Errorf("creating kafka admin client: %w", err)
}
return kadmclient, b, nil
}

0 comments on commit 5816930

Please sign in to comment.