Skip to content

Commit

Permalink
Reinstate fetching bucket contents in parallel
Browse files Browse the repository at this point in the history
This commit reintroduces the use of goroutines for fetching objects,
but in the caller of the client interface rather than in a particular
client implementation.

Signed-off-by: Michael Bridgen <michael@weave.works>
  • Loading branch information
squaremo committed Jan 31, 2022
1 parent 0104b27 commit 53c2a15
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 7 deletions.
43 changes: 36 additions & 7 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -49,6 +50,18 @@ import (
"github.com/fluxcd/source-controller/pkg/sourceignore"
)

// maxConcurrentFetches is the upper bound on the goroutines used to
// fetch bucket objects. It's important to have a bound, to avoid
// using arbitrary amounts of memory; the actual number is chosen
// according to the queueing rule of thumb with some conservative
// parameters:
// s > Nr / T
// N (number of requestors, i.e., objects to fetch) = 10000
// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s)
// T (total time available) = 1s
// -> s > 100
const maxConcurrentFetches = 100

// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
Expand Down Expand Up @@ -325,6 +338,14 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
}
matcher := sourceignore.NewMatcher(ps)

// Download in parallel, but bound the concurrency. According to
// AWS and GCP docs, rate limits are either soft or don't exist:
// - https://cloud.google.com/storage/quotas
// - https://docs.aws.amazon.com/general/latest/gr/s3.html
// .. so, the limiting factor is this process keeping a small footprint.
semaphore := make(chan struct{}, maxConcurrentFetches)
group, ctx := errgroup.WithContext(ctx)

err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error {
if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile {
return nil
Expand All @@ -334,15 +355,23 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
return nil
}

localPath := filepath.Join(tempDir, path)
err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return err
}
// block until there's capacity
semaphore <- struct{}{}
group.Go(func() error {
defer func() { <-semaphore }()
localPath := filepath.Join(tempDir, path)
err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return err
}
return nil
})
return nil
})
if err != nil {

// VisitObjects won't return an error, but the errgroup might.
if err = group.Wait(); err != nil {
err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
Expand Down
162 changes: 162 additions & 0 deletions controllers/bucket_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
)

type mockBucketClient struct {
bucketName string
objects map[string]string
}

var mockNotFound = fmt.Errorf("not found")

func (m mockBucketClient) BucketExists(c context.Context, name string) (bool, error) {
return name == m.bucketName, nil
}

func (m mockBucketClient) ObjectExists(c context.Context, bucket, obj string) (bool, error) {
if bucket != m.bucketName {
return false, fmt.Errorf("bucket does not exist")
}
_, ok := m.objects[obj]
return ok, nil
}

func (m mockBucketClient) FGetObject(c context.Context, bucket, obj, path string) error {
if bucket != m.bucketName {
return fmt.Errorf("bucket does not exist")
}
// tiny bit of protocol, for convenience: if asked for an object "error", then return an error.
if obj == "error" {
return fmt.Errorf("I was asked to report an error")
}
object, ok := m.objects[obj]
if !ok {
return mockNotFound
}
return os.WriteFile(path, []byte(object), os.FileMode(0660))
}

func (m mockBucketClient) ObjectIsNotFound(e error) bool {
return e == mockNotFound
}

func (m mockBucketClient) VisitObjects(c context.Context, bucket string, f func(string) error) error {
for path := range m.objects {
if err := f(path); err != nil {
return err
}
}
return nil
}

func (m mockBucketClient) Close(c context.Context) {
return
}

// Since the algorithm for fetching files uses concurrency and has some complications around error
// reporting, it's worth testing by itself.
func TestFetchFiles(t *testing.T) {
files := map[string]string{
"foo.yaml": "foo: 1",
"bar.yaml": "bar: 2",
"baz.yaml": "baz: 3",
}
bucketName := "all-my-config"

bucket := sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
},
}
client := mockBucketClient{
objects: files,
bucketName: bucketName,
}

t.Run("fetch files happy path", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)

_, err = fetchFiles(context.TODO(), client, bucket, tmp)
if err != nil {
t.Fatal(err)
}

for path := range files {
p := filepath.Join(tmp, path)
_, err := os.Stat(p)
if err != nil {
t.Error(err)
}
}
})

t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)

files["error"] = "this one causes an error"
_, err = fetchFiles(context.TODO(), client, bucket, tmp)
if err == nil {
t.Fatal("expected error but got nil")
}
})

t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) {
// this will fail if, for example, the semaphore is not used correctly and blocks
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)

lotsOfFiles := map[string]string{}
for i := 0; i < 2*maxConcurrentFetches; i++ {
f := fmt.Sprintf("file-%d", i)
lotsOfFiles[f] = f
}
lotsOfFilesClient := mockBucketClient{
bucketName: bucketName,
objects: lotsOfFiles,
}

_, err = fetchFiles(context.TODO(), lotsOfFilesClient, bucket, tmp)
if err != nil {
t.Fatal(err)
}
})
}

0 comments on commit 53c2a15

Please sign in to comment.