Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom roundtripper #140

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -49,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -64,25 +65,25 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string) (obj
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component)
bucket, err = s3.NewBucket(logger, config, component, rt)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component)
bucket, err = azure.NewBucket(logger, config, component, rt)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config)
bucket, err = swift.NewContainer(logger, config, rt)
case string(COS):
bucket, err = cos.NewBucket(logger, config, component)
bucket, err = cos.NewBucket(logger, config, component, rt)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component)
bucket, err = oss.NewBucket(logger, config, component, rt)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config)
bucket, err = oci.NewBucket(logger, config, rt)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
bucket, err = obs.NewBucket(logger, config, rt)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
}
Expand Down
6 changes: 3 additions & 3 deletions client/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ExampleBucket() {
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +46,7 @@ func ExampleTracingBucketUsingOpenTracing() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand All @@ -72,7 +72,7 @@ func ExampleTracingBucketUsingOpenTelemetry() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0
github.com/kr/text v0.2.0 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
12 changes: 8 additions & 4 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure
import (
"context"
"io"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -145,7 +146,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -154,11 +155,14 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component)
return NewBucketWithConfig(logger, conf, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if rt != nil {
conf.HTTPConfig.Transport = rt
}
if err := conf.validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,7 +359,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err
if err != nil {
return nil, nil, err
}
bkt, err := NewBucket(log.NewNopLogger(), bc, component)
bkt, err := NewBucket(log.NewNopLogger(), bc, component, nil)
if err != nil {
t.Errorf("Cannot create Azure storage container:")
return nil, nil, err
Expand Down
27 changes: 26 additions & 1 deletion providers/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package azure

import (
"net/http"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"

"github.com/thanos-io/objstore/exthttp"
)
Expand All @@ -20,7 +23,7 @@ type TestCase struct {
}

var validConfig = []byte(`storage_account: "myStorageAccount"
storage_account_key: "abc123"
storage_account_key: "bXlTdXBlclNlY3JldEtleTEyMyFAIw=="
container: "MyContainer"
endpoint: "blob.core.windows.net"
reader_config:
Expand Down Expand Up @@ -222,3 +225,25 @@ http_config:
testutil.Ok(t, err)
testutil.Equals(t, true, transport.TLSClientConfig.InsecureSkipVerify)
}

// ErrorRoundTripper is a custom RoundTripper that always returns an error
type ErrorRoundTripper struct {
Err error
}

func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, ert.Err
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
cfg, err := parseConfig(validConfig)
testutil.Ok(t, err)

rt := &ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt)

// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
}
17 changes: 13 additions & 4 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@ import (
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
dt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if conf.HTTPConfig.Transport != nil {
rt = conf.HTTPConfig.Transport
} else {
var err error
rt, err = exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
}
}
opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Expand All @@ -35,7 +43,7 @@ func getContainerClient(conf Config) (*container.Client, error) {
Telemetry: policy.TelemetryOptions{
ApplicationID: "Thanos",
},
Transport: &http.Client{Transport: dt},
Transport: &http.Client{Transport: rt},
},
}

Expand Down Expand Up @@ -65,6 +73,7 @@ func getContainerClient(conf Config) (*container.Client, error) {

// Otherwise use a token credential
var cred azcore.TokenCredential
var err error

// Use Managed Identity Credential if a user assigned ID is set
if conf.UserAssignedID != "" {
Expand Down
27 changes: 20 additions & 7 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -104,12 +104,11 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -128,7 +127,21 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
tpt, _ := exthttp.DefaultTransport(config.HTTPConfig)
if rt != nil {
config.HTTPConfig.Transport = rt
}
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var tpt http.RoundTripper
if config.HTTPConfig.Transport != nil {
tpt = config.HTTPConfig.Transport
} else {
var err error
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
Expand Down Expand Up @@ -485,7 +498,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand All @@ -506,7 +519,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand Down
31 changes: 31 additions & 0 deletions providers/cos/cos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
package cos

import (
"context"
"net/http"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/thanos-io/objstore/exthttp"
Expand Down Expand Up @@ -137,3 +141,30 @@
})
}
}

// ErrorRoundTripper is a custom RoundTripper that always returns an error

Check failure on line 145 in providers/cos/cos_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Comment should end in a period (godot)
type ErrorRoundTripper struct {
Err error
}

func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, ert.Err
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
config := Config{
Bucket: "bucket",
AppId: "123",
Region: "test",
SecretId: "sid",
SecretKey: "skey",
}
rt := &ErrorRoundTripper{Err: errors.New("RoundTripper error")}

bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "Test")
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
}
12 changes: 7 additions & 5 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,22 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component)
return NewBucketWithConfig(ctx, logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}

if rt != nil {
gc.HTTPConfig.Transport = rt
}
var opts []option.ClientOption

// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
Expand Down Expand Up @@ -312,7 +314,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error
return nil, nil, err
}

b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand Down
Loading
Loading