Skip to content

Commit

Permalink
Refactor Bucket Controller
Browse files Browse the repository at this point in the history
Fix bug in bucket provider interface

Added Bucket Provider Interface

Signed-off-by: pa250194 <pa250194@ncr.com>

Fix context timeout defer issue

Signed-off-by: pa250194 <pa250194@ncr.com>

Fix GCP storage provider test

Signed-off-by: pa250194 <pa250194@ncr.com>
  • Loading branch information
pa250194 committed Oct 21, 2021
1 parent 79c19ad commit 4105490
Show file tree
Hide file tree
Showing 7 changed files with 679 additions and 226 deletions.
270 changes: 65 additions & 205 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -48,9 +44,11 @@ import (
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/minio"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
)

// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -72,6 +70,14 @@ type BucketReconcilerOptions struct {
MaxConcurrentReconciles int
}

type BucketProvider interface {
BucketExists(context.Context, string) (bool, error)
ObjectExists(context.Context, string, string) (bool, error)
FGetObject(context.Context, string, string, string) error
ListObjects(context.Context, gitignore.Matcher, string, string) error
Close(context.Context)
}

func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
}
Expand Down Expand Up @@ -178,25 +184,25 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
var err error
var sourceBucket sourcev1.Bucket
tempDir, err := os.MkdirTemp("", bucket.Name)
if err != nil {
err = fmt.Errorf("tmp dir error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
}
defer os.RemoveAll(tempDir)
if bucket.Spec.Provider == sourcev1.GoogleBucketProvider {
sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir)
if err != nil {
return sourceBucket, err
}
} else {
sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir)
if err != nil {
return sourceBucket, err
}
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}

var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err)
}

if bucketResponse, err := registerBucketProviders(ctx, bucket, secret, tempDir); err != nil {
return bucketResponse, err
}

revision, err := r.checksum(tempDir)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
Expand Down Expand Up @@ -241,111 +247,41 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket
}

message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
os.RemoveAll(tempDir)
return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
}

func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
if err := r.gc(bucket); err != nil {
r.event(ctx, bucket, events.EventSeverityError,
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

// Record deleted status
r.recordReadiness(ctx, bucket)

// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &bucket); err != nil {
return ctrl.Result{}, err
}

// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}

// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket
// using a gcp client
func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
log := logr.FromContext(ctx)
gcpClient, err := r.authGCP(ctx, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
}
defer gcpClient.Close(log)

ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()

exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if !exists {
err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}

// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// In-spec patterns take precedence
if bucket.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil)
// download bucket content
for {
object, err := objects.Next()
if err == gcp.IteratorDone {
break
}
// registerBucketProviders selects a bucket provider that implement the bucket provider interface based on
// on the specified provider in the bucket spec.
func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) {
switch bucket.Spec.Provider {
case sourcev1.GoogleBucketProvider:
gcpClient, err := gcp.NewClient(ctx, secret, bucket)
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}

if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
continue
if bucketResponse, err := reconcileAll(ctx, gcpClient, bucket, tempDir); err != nil {
return bucketResponse, err
}

if matcher.Match(strings.Split(object.Name, "/"), false) {
continue
default:
minioClient, err := minio.NewClient(ctx, secret, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}

localPath := filepath.Join(tempDir, object.Name)
if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
if bucketResponse, err := reconcileAll(ctx, minioClient, bucket, tempDir); err != nil {
return bucketResponse, err
}
}
return sourcev1.Bucket{}, nil
}

// reconcileWithMinio handles getting objects from an S3 compatible bucket
// using a minio client
func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
s3Client, err := r.authMinio(ctx, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
}

func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()

exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
defer client.Close(ctx)
exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
Expand All @@ -354,12 +290,10 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}

// Look for file with ignore rules first
// NB: S3 has flat filepath keys making it impossible to look
// for files in "subdirectories" without building up a tree first.
// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
Expand All @@ -372,107 +306,33 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)

// download bucket content
for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
}) {
if object.Err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}

if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
continue
}

if matcher.Match(strings.Split(object.Key, "/"), false) {
continue
}

localPath := filepath.Join(tempDir, object.Key)
err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir)
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
return sourcev1.Bucket{}, nil
}

// authGCP creates a new Google Cloud Platform storage client
// to interact with the storage service.
func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) {
var client *gcp.GCPClient
var err error
if bucket.Spec.SecretRef != nil {
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}

var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return nil, fmt.Errorf("credentials secret error: %w", err)
}
if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil {
return nil, err
}
client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
if err != nil {
return nil, err
}
} else {
client, err = gcp.NewClient(ctx)
if err != nil {
return nil, err
}
}
return client, nil

}

// authMinio creates a new Minio client to interact with S3
// compatible storage services.
func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
opt := minio.Options{
Region: bucket.Spec.Region,
Secure: !bucket.Spec.Insecure,
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
if err := r.gc(bucket); err != nil {
r.event(ctx, bucket, events.EventSeverityError,
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

if bucket.Spec.SecretRef != nil {
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}

var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return nil, fmt.Errorf("credentials secret error: %w", err)
}

accesskey := ""
secretkey := ""
if k, ok := secret.Data["accesskey"]; ok {
accesskey = string(k)
}
if k, ok := secret.Data["secretkey"]; ok {
secretkey = string(k)
}
if accesskey == "" || secretkey == "" {
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
}
opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
opt.Creds = credentials.NewIAM("")
}
// Record deleted status
r.recordReadiness(ctx, bucket)

if opt.Creds == nil {
return nil, fmt.Errorf("no bucket credentials found")
// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &bucket); err != nil {
return ctrl.Result{}, err
}

return minio.New(bucket.Spec.Endpoint, &opt)
// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}

// checksum calculates the SHA1 checksum of the given root directory.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/go-git/go-git/v5 v5.4.2
github.com/go-logr/logr v0.4.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/libgit2/git2go/v31 v31.6.1
github.com/minio/minio-go/v7 v7.0.10
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0 h1:6DWmvNpomjL1+3liNSZbVns3zsYzzCjm6pRBO1tLeso=
Expand Down
Loading

0 comments on commit 4105490

Please sign in to comment.