From 597147b6e814cdcf3a158ef6cc9a443830d521f1 Mon Sep 17 00:00:00 2001 From: Adrien Fillon Date: Tue, 26 Mar 2019 19:25:19 +0100 Subject: [PATCH] store: azure: allow passing an endpoint parameter for specific regions Fix #968 Signed-off-by: Adrien Fillon --- CHANGELOG.md | 1 + docs/storage.md | 1 + pkg/objstore/azure/azure.go | 31 +++++++---- pkg/objstore/azure/azure_test.go | 87 ++++++++++++++++++++++++++++++ pkg/objstore/azure/helpers.go | 39 ++++++++------ pkg/objstore/azure/helpers_test.go | 58 ++++++++++++++++++++ 6 files changed, 189 insertions(+), 28 deletions(-) create mode 100644 pkg/objstore/azure/azure_test.go create mode 100644 pkg/objstore/azure/helpers_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 84514b9c83..2f2377def2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver - [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples. - [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag). +- [#980](https://github.com/improbable-eng/thanos/pull/980) Ability to override Azure storage endpoint for other regions (China) New options: diff --git a/docs/storage.md b/docs/storage.md index 96582fe962..ba6864a560 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -224,6 +224,7 @@ config: storage_account: "" storage_account_key: "" container: "" + endpoint: "" ``` ### OpenStack Swift Configuration diff --git a/pkg/objstore/azure/azure.go b/pkg/objstore/azure/azure.go index d0c50524c8..266e3f55f7 100644 --- a/pkg/objstore/azure/azure.go +++ b/pkg/objstore/azure/azure.go @@ -18,11 +18,7 @@ import ( ) const ( - opObjectsList = "ListBucket" - opObjectInsert = "PutObject" - opObjectGet = "GetObject" - opObjectHead = "HeadObject" - opObjectDelete = "DeleteObject" + azureDefaultEndpoint = "blob.core.windows.net" ) // Config Azure storage configuration. @@ -30,6 +26,7 @@ type Config struct { StorageAccountName string `yaml:"storage_account"` StorageAccountKey string `yaml:"storage_account_key"` ContainerName string `yaml:"container"` + Endpoint string `yaml:"endpoint"` } // Bucket implements the store.Bucket interface against Azure APIs. @@ -45,6 +42,18 @@ func (conf *Config) validate() error { conf.StorageAccountKey == "" { return errors.New("invalid Azure storage configuration") } + if conf.StorageAccountName == "" && conf.StorageAccountKey != "" { + return errors.New("no Azure storage_account specified while storage_account_key is present in config file; both should be present.") + } + if conf.StorageAccountName != "" && conf.StorageAccountKey == "" { + return errors.New("no Azure storage_account_key specified while storage_account is present in config file; both should be present.") + } + if conf.ContainerName == "" { + return errors.New("no Azure container specified") + } + if conf.Endpoint == "" { + conf.Endpoint = azureDefaultEndpoint + } return nil } @@ -62,7 +71,7 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket } ctx := context.Background() - container, err := createContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName) + container, err := createContainer(ctx, conf) if err != nil { ret, ok := err.(blob.StorageError) if !ok { @@ -70,7 +79,7 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket } if ret.ServiceCode() == "ContainerAlreadyExists" { level.Debug(logger).Log("msg", "Getting connection to existing Azure blob container", "container", conf.ContainerName) - container, err = getContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName) + container, err = getContainer(ctx, conf) if err != nil { return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container) } @@ -166,7 +175,7 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]") } - blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name) + blobURL, err := getBlobURL(ctx, *b.config, name) if err != nil { return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) } @@ -211,7 +220,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) ( // Exists checks if the given object exists. func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name) - blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name) + blobURL, err := getBlobURL(ctx, *b.config, name) if err != nil { return false, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) } @@ -229,7 +238,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { level.Debug(b.logger).Log("msg", "Uploading blob", "blob", name) - blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name) + blobURL, err := getBlobURL(ctx, *b.config, name) if err != nil { return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) } @@ -247,7 +256,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { // Delete removes the object with the given name. func (b *Bucket) Delete(ctx context.Context, name string) error { level.Debug(b.logger).Log("msg", "Deleting blob", "blob", name) - blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name) + blobURL, err := getBlobURL(ctx, *b.config, name) if err != nil { return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) } diff --git a/pkg/objstore/azure/azure_test.go b/pkg/objstore/azure/azure_test.go new file mode 100644 index 0000000000..2574896213 --- /dev/null +++ b/pkg/objstore/azure/azure_test.go @@ -0,0 +1,87 @@ +package azure + +import ( + "testing" + + "github.com/improbable-eng/thanos/pkg/testutil" +) + +func TestConfig_validate(t *testing.T) { + type fields struct { + StorageAccountName string + StorageAccountKey string + ContainerName string + Endpoint string + } + tests := []struct { + name string + fields fields + wantErr bool + wantEndpoint string + }{ + { + name: "valid global configuration", + fields: fields{ + StorageAccountName: "foo", + StorageAccountKey: "bar", + ContainerName: "roo", + }, + wantErr: false, + wantEndpoint: azureDefaultEndpoint, + }, + { + name: "valid custom endpoint", + fields: fields{ + StorageAccountName: "foo", + StorageAccountKey: "bar", + ContainerName: "roo", + Endpoint: "blob.core.chinacloudapi.cn", + }, + wantErr: false, + wantEndpoint: "blob.core.chinacloudapi.cn", + }, + { + name: "no account key but account name", + fields: fields{ + StorageAccountName: "foo", + StorageAccountKey: "", + ContainerName: "roo", + }, + wantErr: true, + }, + { + name: "no account name but account key", + fields: fields{ + StorageAccountName: "", + StorageAccountKey: "bar", + ContainerName: "roo", + }, + wantErr: true, + }, + { + name: "no container name", + fields: fields{ + StorageAccountName: "foo", + StorageAccountKey: "bar", + ContainerName: "", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := &Config{ + StorageAccountName: tt.fields.StorageAccountName, + StorageAccountKey: tt.fields.StorageAccountKey, + ContainerName: tt.fields.ContainerName, + Endpoint: tt.fields.Endpoint, + } + err := conf.validate() + if (err != nil) != tt.wantErr { + t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr) + } else { + testutil.Equals(t, tt.wantEndpoint, conf.Endpoint) + } + }) + } +} diff --git a/pkg/objstore/azure/helpers.go b/pkg/objstore/azure/helpers.go index da6e96f89c..4e16c31c0b 100644 --- a/pkg/objstore/azure/helpers.go +++ b/pkg/objstore/azure/helpers.go @@ -5,36 +5,42 @@ import ( "fmt" "net/url" "regexp" + "time" blob "github.com/Azure/azure-storage-blob-go/azblob" ) -var ( - blobFormatString = `https://%s.blob.core.windows.net` -) - // DirDelim is the delimiter used to model a directory structure in an object store bucket. const DirDelim = "/" -func getContainerURL(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) { - c, err := blob.NewSharedKeyCredential(accountName, accountKey) +var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`) + +func getContainerURL(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey) if err != nil { return blob.ContainerURL{}, err } + + retryOptions := blob.RetryOptions{} + if deadline, ok := ctx.Deadline(); ok { + retryOptions.TryTimeout = deadline.Sub(time.Now()) + } + p := blob.NewPipeline(c, blob.PipelineOptions{ + Retry: retryOptions, Telemetry: blob.TelemetryOptions{Value: "Thanos"}, }) - u, err := url.Parse(fmt.Sprintf(blobFormatString, accountName)) + u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)) if err != nil { return blob.ContainerURL{}, err } service := blob.NewServiceURL(*u, p) - return service.NewContainerURL(containerName), nil + return service.NewContainerURL(conf.ContainerName), nil } -func getContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) { - c, err := getContainerURL(ctx, accountName, accountKey, containerName) +func getContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := getContainerURL(ctx, conf) if err != nil { return blob.ContainerURL{}, err } @@ -43,20 +49,20 @@ func getContainer(ctx context.Context, accountName, accountKey, containerName st return c, err } -func createContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) { - c, err := getContainerURL(ctx, accountName, accountKey, containerName) +func createContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := getContainerURL(ctx, conf) if err != nil { return blob.ContainerURL{}, err } _, err = c.Create( - context.Background(), + ctx, blob.Metadata{}, blob.PublicAccessNone) return c, err } -func getBlobURL(ctx context.Context, accountName, accountKey, containerName, blobName string) (blob.BlockBlobURL, error) { - c, err := getContainerURL(ctx, accountName, accountKey, containerName) +func getBlobURL(ctx context.Context, conf Config, blobName string) (blob.BlockBlobURL, error) { + c, err := getContainerURL(ctx, conf) if err != nil { return blob.BlockBlobURL{}, err } @@ -64,8 +70,7 @@ func getBlobURL(ctx context.Context, accountName, accountKey, containerName, blo } func parseError(errorCode string) string { - re, _ := regexp.Compile(`X-Ms-Error-Code:\D*\[(\w+)\]`) - match := re.FindStringSubmatch(errorCode) + match := errorCodeRegex.FindStringSubmatch(errorCode) if match != nil && len(match) == 2 { return match[1] } diff --git a/pkg/objstore/azure/helpers_test.go b/pkg/objstore/azure/helpers_test.go new file mode 100644 index 0000000000..84d47f4711 --- /dev/null +++ b/pkg/objstore/azure/helpers_test.go @@ -0,0 +1,58 @@ +package azure + +import ( + "context" + "testing" + + "github.com/improbable-eng/thanos/pkg/testutil" +) + +func Test_getContainerURL(t *testing.T) { + type args struct { + conf Config + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "default", + args: args{ + conf: Config{ + StorageAccountName: "foo", + StorageAccountKey: "Zm9vCg==", + ContainerName: "roo", + Endpoint: azureDefaultEndpoint, + }, + }, + want: "https://foo.blob.core.windows.net/roo", + wantErr: false, + }, + { + name: "azure china", + args: args{ + conf: Config{ + StorageAccountName: "foo", + StorageAccountKey: "Zm9vCg==", + ContainerName: "roo", + Endpoint: "blob.core.chinacloudapi.cn", + }, + }, + want: "https://foo.blob.core.chinacloudapi.cn/roo", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + got, err := getContainerURL(ctx, tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("getContainerURL() error = %v, wantErr %v", err, tt.wantErr) + return + } + testutil.Equals(t, tt.want, got.String()) + }) + } +}