diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 27ac62e63..0c12c62d9 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -49,6 +50,18 @@ import ( "github.com/fluxcd/source-controller/pkg/sourceignore" ) +// maxConcurrentFetches is the upper bound on the goroutines used to +// fetch bucket objects. It's important to have a bound, to avoid +// using arbitrary amounts of memory; the actual number is chosen +// according to the queueing rule of thumb with some conservative +// parameters: +// s > Nr / T +// N (number of requestors, i.e., objects to fetch) = 10000 +// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) +// T (total time available) = 1s +// -> s > 100 +const maxConcurrentFetches = 100 + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete @@ -325,6 +338,14 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck } matcher := sourceignore.NewMatcher(ps) + // Download in parallel, but bound the concurrency. According to + // AWS and GCP docs, rate limits are either soft or don't exist: + // - https://cloud.google.com/storage/quotas + // - https://docs.aws.amazon.com/general/latest/gr/s3.html + // .. so, the limiting factor is this process keeping a small footprint. + semaphore := make(chan struct{}, maxConcurrentFetches) + group, ctx := errgroup.WithContext(ctx) + err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { return nil @@ -334,15 +355,23 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck return nil } - localPath := filepath.Join(tempDir, path) - err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return err - } + // block until there's capacity + semaphore <- struct{}{} + group.Go(func() error { + defer func() { <-semaphore }() + localPath := filepath.Join(tempDir, path) + err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return err + } + return nil + }) return nil }) - if err != nil { + + // VisitObjects won't return an error, but the errgroup might. + if err = group.Wait(); err != nil { err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } diff --git a/controllers/bucket_fetch_test.go b/controllers/bucket_fetch_test.go new file mode 100644 index 000000000..c7b50f3aa --- /dev/null +++ b/controllers/bucket_fetch_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" +) + +type mockBucketClient struct { + bucketName string + objects map[string]string +} + +var mockNotFound = fmt.Errorf("not found") + +func (m mockBucketClient) BucketExists(c context.Context, name string) (bool, error) { + return name == m.bucketName, nil +} + +func (m mockBucketClient) ObjectExists(c context.Context, bucket, obj string) (bool, error) { + if bucket != m.bucketName { + return false, fmt.Errorf("bucket does not exist") + } + _, ok := m.objects[obj] + return ok, nil +} + +func (m mockBucketClient) FGetObject(c context.Context, bucket, obj, path string) error { + if bucket != m.bucketName { + return fmt.Errorf("bucket does not exist") + } + // tiny bit of protocol, for convenience: if asked for an object "error", then return an error. + if obj == "error" { + return fmt.Errorf("I was asked to report an error") + } + object, ok := m.objects[obj] + if !ok { + return mockNotFound + } + return os.WriteFile(path, []byte(object), os.FileMode(0660)) +} + +func (m mockBucketClient) ObjectIsNotFound(e error) bool { + return e == mockNotFound +} + +func (m mockBucketClient) VisitObjects(c context.Context, bucket string, f func(string) error) error { + for path := range m.objects { + if err := f(path); err != nil { + return err + } + } + return nil +} + +func (m mockBucketClient) Close(c context.Context) { + return +} + +// Since the algorithm for fetching files uses concurrency and has some complications around error +// reporting, it's worth testing by itself. +func TestFetchFiles(t *testing.T) { + files := map[string]string{ + "foo.yaml": "foo: 1", + "bar.yaml": "bar: 2", + "baz.yaml": "baz: 3", + } + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + client := mockBucketClient{ + objects: files, + bucketName: bucketName, + } + + t.Run("fetch files happy path", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err != nil { + t.Fatal(err) + } + + for path := range files { + p := filepath.Join(tmp, path) + _, err := os.Stat(p) + if err != nil { + t.Error(err) + } + } + }) + + t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + files["error"] = "this one causes an error" + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err == nil { + t.Fatal("expected error but got nil") + } + }) + + t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) { + // this will fail if, for example, the semaphore is not used correctly and blocks + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + lotsOfFiles := map[string]string{} + for i := 0; i < 2*maxConcurrentFetches; i++ { + f := fmt.Sprintf("file-%d", i) + lotsOfFiles[f] = f + } + lotsOfFilesClient := mockBucketClient{ + bucketName: bucketName, + objects: lotsOfFiles, + } + + _, err = fetchFiles(context.TODO(), lotsOfFilesClient, bucket, tmp) + if err != nil { + t.Fatal(err) + } + }) +}