diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 002c95c68..fc953b3ba 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -26,10 +26,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/s3utils" - "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,9 +44,11 @@ import ( "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/minio" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" ) // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete @@ -72,6 +70,14 @@ type BucketReconcilerOptions struct { MaxConcurrentReconciles int } +type BucketProvider interface { + BucketExists(context.Context, string) (bool, error) + ObjectExists(context.Context, string, string) (bool, error) + FGetObject(context.Context, string, string, string) error + ListObjects(context.Context, gitignore.Matcher, string, string) error + Close(context.Context) +} + func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) } @@ -178,25 +184,25 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { - var err error - var sourceBucket sourcev1.Bucket tempDir, err := os.MkdirTemp("", bucket.Name) if err != nil { err = fmt.Errorf("tmp dir error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } - defer os.RemoveAll(tempDir) - if bucket.Spec.Provider == sourcev1.GoogleBucketProvider { - sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } - } else { - sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } + secretName := types.NamespacedName{ + Namespace: bucket.GetNamespace(), + Name: bucket.Spec.SecretRef.Name, + } + + var secret corev1.Secret + if err := r.Get(ctx, secretName, &secret); err != nil { + return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) + } + + if bucketResponse, err := registerBucketProviders(ctx, bucket, secret, tempDir); err != nil { + return bucketResponse, err } + revision, err := r.checksum(tempDir) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err @@ -241,111 +247,41 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket } message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) + os.RemoveAll(tempDir) return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil } -func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { - if err := r.gc(bucket); err != nil { - r.event(ctx, bucket, events.EventSeverityError, - fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) - // Return the error so we retry the failed garbage collection - return ctrl.Result{}, err - } - - // Record deleted status - r.recordReadiness(ctx, bucket) - - // Remove our finalizer from the list and update it - controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - return ctrl.Result{}, err - } - - // Stop reconciliation as the object is being deleted - return ctrl.Result{}, nil -} - -// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket -// using a gcp client -func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - log := logr.FromContext(ctx) - gcpClient, err := r.authGCP(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - defer gcpClient.Close(log) - - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) - defer cancel() - - exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil) - // download bucket content - for { - object, err := objects.Next() - if err == gcp.IteratorDone { - break - } +// registerBucketProviders selects a bucket provider that implement the bucket provider interface based on +// on the specified provider in the bucket spec. +func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { + switch bucket.Spec.Provider { + case sourcev1.GoogleBucketProvider: + gcpClient, err := gcp.NewClient(ctx, secret, bucket) if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { - continue + if bucketResponse, err := reconcileAll(ctx, gcpClient, bucket, tempDir); err != nil { + return bucketResponse, err } - - if matcher.Match(strings.Split(object.Name, "/"), false) { - continue + default: + minioClient, err := minio.NewClient(ctx, secret, bucket) + if err != nil { + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - localPath := filepath.Join(tempDir, object.Name) - if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if bucketResponse, err := reconcileAll(ctx, minioClient, bucket, tempDir); err != nil { + return bucketResponse, err } } return sourcev1.Bucket{}, nil } -// reconcileWithMinio handles getting objects from an S3 compatible bucket -// using a minio client -func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - s3Client, err := r.authMinio(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - +func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) defer cancel() - - exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName) + defer client.Close(ctx) + exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -354,12 +290,10 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - // Look for file with ignore rules first - // NB: S3 has flat filepath keys making it impossible to look - // for files in "subdirectories" without building up a tree first. + // Look for file with ignore rules first. path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { + if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { + if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } @@ -372,107 +306,33 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - - // download bucket content - for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if object.Err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - localPath := filepath.Join(tempDir, object.Key) - err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } + err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir) + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } return sourcev1.Bucket{}, nil } -// authGCP creates a new Google Cloud Platform storage client -// to interact with the storage service. -func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) { - var client *gcp.GCPClient - var err error - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { - return nil, err - } - client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - } else { - client, err = gcp.NewClient(ctx) - if err != nil { - return nil, err - } - } - return client, nil - -} - -// authMinio creates a new Minio client to interact with S3 -// compatible storage services. -func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) { - opt := minio.Options{ - Region: bucket.Spec.Region, - Secure: !bucket.Spec.Insecure, +func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { + if err := r.gc(bucket); err != nil { + r.event(ctx, bucket, events.EventSeverityError, + fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) + // Return the error so we retry the failed garbage collection + return ctrl.Result{}, err } - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - - accesskey := "" - secretkey := "" - if k, ok := secret.Data["accesskey"]; ok { - accesskey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretkey = string(k) - } - if accesskey == "" || secretkey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") - } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { - opt.Creds = credentials.NewIAM("") - } + // Record deleted status + r.recordReadiness(ctx, bucket) - if opt.Creds == nil { - return nil, fmt.Errorf("no bucket credentials found") + // Remove our finalizer from the list and update it + controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) + if err := r.Update(ctx, &bucket); err != nil { + return ctrl.Result{}, err } - return minio.New(bucket.Spec.Endpoint, &opt) + // Stop reconciliation as the object is being deleted + return ctrl.Result{}, nil } // checksum calculates the SHA1 checksum of the given root directory. diff --git a/go.mod b/go.mod index 8fabe102e..95bd895fd 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/go-git/go-git/v5 v5.4.2 github.com/go-logr/logr v0.4.0 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/libgit2/git2go/v31 v31.6.1 github.com/minio/minio-go/v7 v7.0.10 diff --git a/go.sum b/go.sum index 34c48c50a..ef30708b8 100644 --- a/go.sum +++ b/go.sum @@ -487,6 +487,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0 h1:6DWmvNpomjL1+3liNSZbVns3zsYzzCjm6pRBO1tLeso= diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 9127fcde3..bcf342190 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -23,11 +23,17 @@ import ( "io" "os" "path/filepath" + "strings" + "sync" gcpstorage "cloud.google.com/go/storage" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/go-logr/logr" "google.golang.org/api/iterator" "google.golang.org/api/option" + corev1 "k8s.io/api/core/v1" ) var ( @@ -50,13 +56,26 @@ type GCPClient struct { // NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) { - client, err := gcpstorage.NewClient(ctx, opts...) - if err != nil { - return nil, err +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*GCPClient, error) { + gcpclient := &GCPClient{} + if bucket.Spec.SecretRef != nil { + if err := ValidateSecret(secret.Data, secret.Name); err != nil { + return nil, err + } + client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) + if err != nil { + return nil, err + } + gcpclient.Client = client + } else { + client, err := gcpstorage.NewClient(ctx) + if err != nil { + return nil, err + } + gcpclient.Client = client } - return &GCPClient{Client: client}, nil + return gcpclient, nil } // ValidateSecret validates the credential secrets @@ -158,15 +177,48 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca // ListObjects lists the objects/contents of the bucket whose bucket name is provided. // the objects are returned as an Objectiterator and .Next() has to be called on them -// to loop through the Objects. -func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator { - items := c.Client.Bucket(bucketName).Objects(ctx, query) - return items +// to loop through the Objects. The Object are downloaded using a goroutine. +func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { + log := logr.FromContext(ctx) + items := c.Client.Bucket(bucketName).Objects(ctx, nil) + var wg sync.WaitGroup + for { + object, err := items.Next() + if err == IteratorDone { + break + } + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) + return err + } + wg.Add(1) + go func() { + defer wg.Done() + if err := DownloadObject(ctx, c, object, matcher, bucketName, tempDir); err != nil { + log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) + } + }() + } + wg.Wait() + return nil } // Close closes the GCP Client and logs any useful errors -func (c *GCPClient) Close(log logr.Logger) { +func (c *GCPClient) Close(ctx context.Context) { + log := logr.FromContext(ctx) if err := c.Client.Close(); err != nil { log.Error(err, "GCP Provider") } } + +// DownloadObject gets an object and downloads the object locally. +func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error { + if strings.HasSuffix(obj.Name, "/") || obj.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(obj.Name, "/"), false) { + return nil + } + localPath := filepath.Join(tempDir, obj.Name) + if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil { + return err + } + return nil +} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 99d72309f..fac237dbd 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -33,10 +33,15 @@ import ( "time" gcpstorage "cloud.google.com/go/storage" + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/sourceignore" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "google.golang.org/api/option" ) @@ -44,6 +49,7 @@ import ( const ( bucketName string = "test-bucket" objectName string = "test.yaml" + region = "us-east-1" ) var ( @@ -51,6 +57,55 @@ var ( client *gcpstorage.Client close func() err error + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="), + }, + Type: "Opaque", + } + badSecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("test-user"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + }, + } ) func TestMain(m *testing.M) { @@ -110,8 +165,16 @@ func TestMain(m *testing.M) { os.Exit(run) } -func TestNewClient(t *testing.T) { - gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc)) +func TestNewClientWithSecretErr(t *testing.T) { + gcpClient, err := gcp.NewClient(context.Background(), secret, bucket) + t.Log(err) + assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") + assert.Assert(t, gcpClient == nil) +} + +func TestNewClientWithoutSecretErr(t *testing.T) { + gcpClient, err := gcp.NewClient(context.Background(), badSecret, bucketNoSecretRef) + t.Log(err) assert.NilError(t, err) assert.Assert(t, gcpClient != nil) } @@ -161,15 +224,33 @@ func TestListObjects(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } - objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil) - for { - _, err := objectIterator.Next() - if err == gcp.IteratorDone { - break - } - assert.NilError(t, err) + tempDir, err := os.MkdirTemp("", bucketName) + defer os.RemoveAll(tempDir) + assert.NilError(t, err) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + + err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestListObjectsErr(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, } - assert.Assert(t, objectIterator != nil) + badBucketName := "bad-bucket" + tempDir, err := os.MkdirTemp("", badBucketName) + defer os.RemoveAll(tempDir) + assert.NilError(t, err) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + + err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } func TestFGetObject(t *testing.T) { @@ -203,8 +284,8 @@ func TestFGetObjectNotExists(t *testing.T) { func TestFGetObjectDirectoryIsFileName(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) assert.NilError(t, err) + defer os.RemoveAll(tempDir) gcpClient := &gcp.GCPClient{ Client: client, } @@ -214,6 +295,66 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { } } +func TestDownloadObject(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, + } + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ + Bucket: bucketName, + Name: objectName, + ContentType: "text/x-yaml", + Size: 1 << 20, + }, matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestDownloadObjectErr(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, + } + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ + Bucket: bucketName, + Name: "test1.yaml", + ContentType: "text/x-yaml", + Size: 1 << 20, + }, matcher, bucketName, tempDir) + assert.Error(t, err, "storage: object doesn't exist") +} + +func TestDownloadObjectSuffix(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, + } + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ + Bucket: bucketName, + Name: "test1/", + ContentType: "text/x-yaml", + Size: 1 << 20, + }, matcher, bucketName, tempDir) + assert.NilError(t, err) +} + func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go new file mode 100644 index 000000000..4dd221daa --- /dev/null +++ b/pkg/minio/minio.go @@ -0,0 +1,127 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/s3utils" + corev1 "k8s.io/api/core/v1" +) + +type MinioClient struct { + // client for interacting with S3 compatible + // Storage APIs. + *minio.Client +} + +// NewClient creates a new Minio storage client. +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*MinioClient, error) { + opt := minio.Options{ + Region: bucket.Spec.Region, + Secure: !bucket.Spec.Insecure, + } + + if bucket.Spec.SecretRef != nil { + accesskey := "" + secretkey := "" + if k, ok := secret.Data["accesskey"]; ok { + accesskey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretkey = string(k) + } + if accesskey == "" || secretkey == "" { + return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + } + opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") + } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { + opt.Creds = credentials.NewIAM("") + } + + if opt.Creds == nil { + return nil, fmt.Errorf("no bucket credentials found") + } + + client, err := minio.New(bucket.Spec.Endpoint, &opt) + if err != nil { + return nil, err + } + + return &MinioClient{Client: client}, nil +} + +// BucketExists checks if the bucket with the provided name exists. +func (c *MinioClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { + return c.Client.BucketExists(ctx, bucketName) +} + +// ObjectExists checks if the object with the provided name exists. +func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { + _, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{}) + if err != nil { + return false, err + } + return true, nil +} + +// FGetObject gets the object from the bucket and downloads the object locally. +func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error { + return c.Client.FGetObject(ctx, bucketName, objectName, localPath, minio.GetObjectOptions{}) +} + +// ListObjects lists all the objects in a bucket and downloads the objects. +func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { + for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), + }) { + if object.Err != nil { + err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err) + return err + } + + if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { + continue + } + + if matcher.Match(strings.Split(object.Key, "/"), false) { + continue + } + + localPath := filepath.Join(tempDir, object.Key) + err := c.FGetObject(ctx, bucketName, object.Key, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err) + return err + } + } + return nil +} + +// Close closes the Minio Client and logs any useful errors +func (c *MinioClient) Close(ctx context.Context) { + //minio client does not provide a close method +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go new file mode 100644 index 000000000..6d8143971 --- /dev/null +++ b/pkg/minio/minio_test.go @@ -0,0 +1,270 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package minio_test + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/minio" + "github.com/fluxcd/source-controller/pkg/sourceignore" + + "github.com/google/uuid" + miniov7 "github.com/minio/minio-go/v7" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + objectName string = "test.yaml" + region string = "us-east-1" +) + +var ( + minioclient *minio.MinioClient + bucketName = "test-bucket-minio" + uuid.New().String() + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("Q3AM3UQ867SPQQA43P2F"), + "secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + emptySecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{}, + Type: "Opaque", + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + }, + } + bucketAwsProvider = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "aws", + Insecure: true, + }, + } +) + +func TestMain(m *testing.M) { + var err error + ctx := context.Background() + minioclient, err = minio.NewClient(ctx, secret, bucket) + if err != nil { + log.Fatal(err) + } + createBucket(ctx) + addObjectToBucket(ctx) + run := m.Run() + removeObjectFromBucket(ctx) + deleteBucket(ctx) + //minioclient.Client.Close + os.Exit(run) +} + +func TestNewClient(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, secret, bucket) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientEmptySecret(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, emptySecret, bucket) + assert.Error(t, err, fmt.Sprintf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", emptySecret.Name)) + assert.Assert(t, minioClient == nil) +} + +func TestNewClientNoSecretRef(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketNoSecretRef) + assert.Error(t, err, "no bucket credentials found") + assert.Assert(t, minioClient == nil) +} + +func TestNewClientAwsProvider(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketAwsProvider) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestBucketExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, bucketName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestBucketNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, "notexistsbucket") + assert.NilError(t, err) + assert.Assert(t, !exists) +} + +func TestObjectExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, objectName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestObjectNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, "notexists.yaml") + assert.Error(t, err, "The specified key does not exist.") + assert.Assert(t, !exists) +} + +func TestFGetObject(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + err = minioclient.FGetObject(ctx, bucketName, objectName, path) + assert.NilError(t, err) +} + +func TestListObjects(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestListObjectsErr(t *testing.T) { + ctx := context.Background() + badBucketName := "bad-bucket" + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) +} + +func createBucket(ctx context.Context) { + if err := minioclient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil { + exists, errBucketExists := minioclient.BucketExists(ctx, bucketName) + if errBucketExists == nil && exists { + deleteBucket(ctx) + } else { + log.Fatalln(err) + } + } +} + +func deleteBucket(ctx context.Context) { + if err := minioclient.Client.RemoveBucket(ctx, bucketName); err != nil { + log.Println(err) + } +} + +func addObjectToBucket(ctx context.Context) { + fileReader := strings.NewReader(getObjectFile()) + fileSize := fileReader.Size() + _, err := minioclient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{ + ContentType: "text/x-yaml", + }) + if err != nil { + log.Println(err) + } +} + +func removeObjectFromBucket(ctx context.Context) { + if err := minioclient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{ + GovernanceBypass: true, + }); err != nil { + log.Println(err) + } +} + +func getObjectFile() string { + return ` + apiVersion: source.toolkit.fluxcd.io/v1beta1 + kind: Bucket + metadata: + name: podinfo + namespace: default + spec: + interval: 5m + provider: aws + bucketName: podinfo + endpoint: s3.amazonaws.com + region: us-east-1 + timeout: 30s + ` +}