From 576bc375471c9ab36d6d2bf2573e0b303305ce0a Mon Sep 17 00:00:00 2001 From: setoru Date: Tue, 25 Apr 2023 19:19:54 +0800 Subject: [PATCH 01/10] add obs support Signed-off-by: setoru --- go.mod | 1 + go.sum | 2 + providers/obs/obs.go | 467 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 470 insertions(+) create mode 100644 providers/obs/obs.go diff --git a/go.mod b/go.mod index 6c6ae842..8a04716c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/efficientgo/e2e v0.13.1-0.20220922081603-45de9fc588a8 github.com/fatih/structtag v1.2.0 github.com/go-kit/log v0.2.1 + github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible github.com/minio/minio-go/v7 v7.0.45 github.com/ncw/swift v1.0.53 github.com/opentracing/opentracing-go v1.2.0 diff --git a/go.sum b/go.sum index 9dd1f476..d6b3a275 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,8 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible h1:tKTaPHNVwikS3I1rdyf1INNvgJXWSf/+TzqsiGbrgnQ= +github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible/go.mod h1:l7VUhRbTKCzdOacdT4oWCwATKyvZqUOlOqr0Ous3k4s= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= diff --git a/providers/obs/obs.go b/providers/obs/obs.go new file mode 100644 index 00000000..c0bc057d --- /dev/null +++ b/providers/obs/obs.go @@ -0,0 +1,467 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package obs + +import ( + "context" + "github.com/go-kit/log" + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/exthttp" + "gopkg.in/yaml.v2" + "io" + "math" + "os" + "strings" + "testing" + "time" +) + +const DirDelim = "/" + +const ( + MinMultipartUploadSize int64 = 1024 * 1024 * 100 + PartSize int64 = 1024 * 1024 * 100 +) + +var DefaultConfig = Config{ + HTTPConfig: exthttp.HTTPConfig{ + IdleConnTimeout: model.Duration(90 * time.Second), + ResponseHeaderTimeout: model.Duration(2 * time.Minute), + TLSHandshakeTimeout: model.Duration(10 * time.Second), + ExpectContinueTimeout: model.Duration(1 * time.Second), + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + MaxConnsPerHost: 0, + }, +} + +type Config struct { + Bucket string `yaml:"bucket"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` + HTTPConfig exthttp.HTTPConfig `yaml:"http_config"` +} + +func (conf *Config) validate() error { + if conf.Endpoint == "" { + return errors.New("no obs endpoint in config file") + } + + if conf.AccessKey == "" && conf.SecretKey != "" { + return errors.New("no obs access_key specified") + } + + if conf.AccessKey != "" && conf.SecretKey == "" { + return errors.New("no obs secret_key specified") + } + + if conf.AccessKey == "" && conf.SecretKey == "" { + return errors.New("no obs secret_key and access_key specified") + } + return nil +} + +type Bucket struct { + logger log.Logger + client *obs.ObsClient + name string +} + +func NewBucket(logger log.Logger, conf []byte) (*Bucket, error) { + config, err := parseConfig(conf) + if err != nil { + return nil, errors.Wrap(err, "parsing cos configuration") + } + + return NewBucketWithConfig(logger, config) +} + +func parseConfig(conf []byte) (Config, error) { + config := DefaultConfig + if err := yaml.UnmarshalStrict(conf, &config); err != nil { + return Config{}, err + } + + return config, nil +} + +func NewBucketWithConfig(logger log.Logger, config Config) (*Bucket, error) { + if err := config.validate(); err != nil { + return nil, errors.Wrap(err, "validate obs config err") + } + + rt, err := exthttp.DefaultTransport(config.HTTPConfig) + if err != nil { + return nil, errors.Wrap(err, "get http transport err") + } + + client, err := obs.New(config.AccessKey, config.SecretKey, config.Endpoint, obs.WithHttpTransport(rt)) + if err != nil { + return nil, errors.Wrap(err, "initialize obs client err") + } + + bkt := &Bucket{ + logger: logger, + client: client, + name: config.Bucket, + } + return bkt, nil +} + +// Name returns the bucket name for the provider. +func (b *Bucket) Name() string { + return b.name +} + +// Delete removes the object with the given name +func (b *Bucket) Delete(ctx context.Context, name string) error { + input := &obs.DeleteObjectInput{Bucket: b.name, Key: name} + _, err := b.client.DeleteObject(input) + return err +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "failed to get size apriori to upload %s", name) + } + + if size <= 0 { + return errors.New("object size must be provided") + } + if size <= MinMultipartUploadSize { + err := b.putObjectSingle(name, r) + if err != nil { + return err + } + } else { + initOutput, err := b.initiateMultipartUpload(name) + if err != nil { + return err + } + uploadId := initOutput.UploadId + + partSum := int(math.Floor(float64(size) / float64(PartSize))) + lastPart := size % PartSize + parts := make([]obs.Part, 0, partSum) + for i := 0; i < partSum; i++ { + inputPart := &obs.UploadPartInput{ + Bucket: b.name, + Key: name, + UploadId: uploadId, + Body: r, + PartNumber: i + 1, + PartSize: PartSize, + Offset: int64(i) * PartSize, + } + output, err := b.client.UploadPart(inputPart) + if err != nil { + return errors.Wrap(err, "fail to multipart upload") + } + parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) + } + if lastPart != 0 { + inputPart := &obs.UploadPartInput{ + Bucket: b.name, + Key: name, + UploadId: uploadId, + Body: r, + PartNumber: partSum + 1, + PartSize: lastPart, + Offset: int64(partSum) * PartSize, + } + output, err := b.client.UploadPart(inputPart) + if err != nil { + return errors.Wrap(err, "fail to upload lastPart") + } + parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) + } + inputComplete := &obs.CompleteMultipartUploadInput{ + Bucket: b.name, + Key: name, + UploadId: uploadId, + Parts: parts, + } + _, err = b.client.CompleteMultipartUpload(inputComplete) + if err != nil { + return errors.Wrap(err, "fail to complete multipart upload") + } + } + return nil +} + +func (b *Bucket) putObjectSingle(key string, body io.Reader) error { + input := &obs.PutObjectInput{} + input.Bucket = b.name + input.Key = key + input.Body = body + _, err := b.client.PutObject(input) + return errors.Wrap(err, "fail to upload object") +} + +func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultipartUploadOutput, err error) { + initInput := &obs.InitiateMultipartUploadInput{} + initInput.Bucket = b.name + initInput.Key = key + initOutput, err := b.client.InitiateMultipartUpload(initInput) + return initOutput, errors.Wrap(err, "fail to init multipart upload job") +} + +func (b *Bucket) multipartUpload(numThreads int, size int64, key, uploadId string, body io.Reader, parts *[]obs.Part) error { + partSum := int(math.Ceil(float64(size) / float64(PartSize))) + lastPart := size % PartSize + + uploadPartCh := make(chan obs.Part) + uploadCh := make(chan int) + ctx, cancel := context.WithCancel(context.Background()) + var gerr error + + go func() { + defer close(uploadCh) + for partNum := 0; partNum < partSum; partNum++ { + uploadCh <- partNum + } + }() + for i := 0; i < numThreads; i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case partNum, ok := <-uploadCh: + if !ok { + return + } + offset := int64(partNum) * PartSize + partSize := PartSize + if partNum == partSum { + offset = int64(partSum) * PartSize + partSize = lastPart + } + inputPart := &obs.UploadPartInput{ + Bucket: b.name, + Key: key, + UploadId: uploadId, + Body: body, + PartNumber: partNum + 1, + PartSize: partSize, + Offset: offset, + } + output, err := b.client.UploadPart(inputPart) + if err != nil { + cancel() + gerr = err + return + } + uploadPartCh <- obs.Part{PartNumber: output.PartNumber, ETag: output.ETag} + } + } + }() + } + for i := 0; i < partSum; i++ { + select { + case <-ctx.Done(): + return errors.Wrap(gerr, "fail to multipart upload") + case part := <-uploadPartCh: + *parts = append(*parts, part) + } + } + return nil +} + +func (b *Bucket) Close() error { return nil } + +// Iter calls f for each entry in the given directory (not recursive.) +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + if dir != "" { + dir = strings.TrimSuffix(dir, DirDelim) + DirDelim + } + + input := &obs.ListObjectsInput{} + input.Bucket = b.name + input.Prefix = dir + input.Delimiter = DirDelim + if objstore.ApplyIterOptions(options...).Recursive { + input.Delimiter = "" + } + for { + output, err := b.client.ListObjects(input) + if err != nil { + return errors.Wrap(err, "fail to list object") + } + for _, content := range output.Contents { + if err := f(content.Key); err != nil { + return errors.Wrap(err, "fail to call f for object") + } + } + for _, topDir := range output.CommonPrefixes { + if err := f(topDir); err != nil { + return errors.Wrap(err, "fail to call f for top dir object") + } + } + + if !output.IsTruncated { + break + } + + input.Marker = output.NextMarker + } + return nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getRange(ctx, name, 0, -1) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getRange(ctx, name, off, length) +} + +func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if strings.TrimSpace(name) == "" { + return nil, errors.New("object name cannot be empty") + } + input := &obs.GetObjectInput{} + input.Bucket = b.name + input.Key = name + if off < 0 { + return nil, errors.New("incorrect offset") + } + input.RangeStart = off + if length != -1 { + input.RangeEnd = off + length - 1 + } else { + input.RangeEnd = math.MaxInt64 + } + output, err := b.client.GetObject(input) + if err != nil { + return nil, errors.Wrap(err, "fail to get object") + } + return output.Body, nil +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { + input := &obs.GetObjectMetadataInput{ + Bucket: b.name, + Key: name, + } + _, err := b.client.GetObjectMetadata(input) + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "fail to get object metadata") + } + return true, nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + switch oriErr := errors.Cause(err).(type) { + case obs.ObsError: + if oriErr.Status == "404 Not Found" { + return true + } + default: + return false + } + return false +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + input := &obs.GetObjectMetadataInput{ + Bucket: b.name, + Key: name, + } + output, err := b.client.GetObjectMetadata(input) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrap(err, "fail to get object metadata") + } + attr := objstore.ObjectAttributes{ + Size: output.ContentLength, + LastModified: output.LastModified, + } + return attr, nil +} + +// NewTestBucket creates test bkt client that before returning creates temporary bucket. +func NewTestBucket(t testing.TB, location string) (objstore.Bucket, func(), error) { + c := configFromEnv() + if c.Endpoint == "" || c.AccessKey == "" || c.SecretKey == "" { + return nil, nil, errors.New("insufficient obs test configuration information") + } + + if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" { + return nil, nil, errors.New("OBS_BUCKET is defined. Normally this tests will create temporary bucket " + + "and delete it after test. Unset OBS_BUCKET env variable to use default logic. If you really want to run " + + "tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true.") + } + return NewTestBucketFromConfig(t, c, false, location) +} + +func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool, location string) (objstore.Bucket, func(), error) { + ctx := context.Background() + + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + b, err := NewBucket(log.NewNopLogger(), bc) + if err != nil { + return nil, nil, err + } + + bktToCreate := c.Bucket + if c.Bucket != "" && reuseBucket { + if err := b.Iter(ctx, "", func(f string) error { + return errors.Errorf("bucket %s is not empty", c.Bucket) + }); err != nil { + return nil, nil, err + } + + t.Log("WARNING. Reusing", c.Bucket, "OBS bucket for OBS tests. Manual cleanup afterwards is required") + return b, func() {}, nil + } + + if c.Bucket == "" { + bktToCreate = objstore.CreateTemporaryTestBucketName(t) + } + + input := &obs.CreateBucketInput{ + Bucket: bktToCreate, + BucketLocation: obs.BucketLocation{Location: location}, + } + _, err = b.client.CreateBucket(input) + if err != nil { + return nil, nil, err + } + b.name = bktToCreate + t.Log("created temporary OBS bucket for OBS tests with name", bktToCreate) + + return b, func() { + objstore.EmptyBucket(t, ctx, b) + if _, err := b.client.DeleteBucket(bktToCreate); err != nil { + t.Logf("deleting bucket %s failed: %s", bktToCreate, err) + } + }, nil +} + +func configFromEnv() Config { + c := Config{ + Bucket: os.Getenv("OBS_BUCKET"), + Endpoint: os.Getenv("OBS_ENDPOINT"), + AccessKey: os.Getenv("OBS_ACCESS_KEY"), + SecretKey: os.Getenv("OBS_SECRET_KEY"), + } + return c +} From aaa702b25fb163f5d33c6def0a7adf26267e8d44 Mon Sep 17 00:00:00 2001 From: setoru Date: Wed, 26 Apr 2023 15:15:49 +0800 Subject: [PATCH 02/10] fix obs code style Signed-off-by: setoru --- providers/obs/obs.go | 150 +++++++++++++------------------------------ 1 file changed, 45 insertions(+), 105 deletions(-) diff --git a/providers/obs/obs.go b/providers/obs/obs.go index c0bc057d..8aa635cd 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -128,6 +128,7 @@ func (b *Bucket) Delete(ctx context.Context, name string) error { // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { size, err := objstore.TryToGetSize(r) + if err != nil { return errors.Wrapf(err, "failed to get size apriori to upload %s", name) } @@ -145,50 +146,31 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { if err != nil { return err } - uploadId := initOutput.UploadId - partSum := int(math.Floor(float64(size) / float64(PartSize))) - lastPart := size % PartSize - parts := make([]obs.Part, 0, partSum) - for i := 0; i < partSum; i++ { - inputPart := &obs.UploadPartInput{ - Bucket: b.name, - Key: name, - UploadId: uploadId, - Body: r, - PartNumber: i + 1, - PartSize: PartSize, - Offset: int64(i) * PartSize, - } - output, err := b.client.UploadPart(inputPart) - if err != nil { - return errors.Wrap(err, "fail to multipart upload") - } - parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) - } - if lastPart != 0 { - inputPart := &obs.UploadPartInput{ - Bucket: b.name, - Key: name, - UploadId: uploadId, - Body: r, - PartNumber: partSum + 1, - PartSize: lastPart, - Offset: int64(partSum) * PartSize, - } - output, err := b.client.UploadPart(inputPart) + uploadId := initOutput.UploadId + defer func() { if err != nil { - return errors.Wrap(err, "fail to upload lastPart") + if _, err := b.client.AbortMultipartUpload(&obs.AbortMultipartUploadInput{ + UploadId: uploadId, + Bucket: b.name, + Key: name, + }); err != nil { + err = errors.Wrap(err, "failed to abort multipart upload") + return + } } - parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) + }() + parts, err := b.multipartUpload(size, name, uploadId, r) + if err != nil { + return err } - inputComplete := &obs.CompleteMultipartUploadInput{ + + _, err = b.client.CompleteMultipartUpload(&obs.CompleteMultipartUploadInput{ Bucket: b.name, Key: name, UploadId: uploadId, Parts: parts, - } - _, err = b.client.CompleteMultipartUpload(inputComplete) + }) if err != nil { return errors.Wrap(err, "fail to complete multipart upload") } @@ -213,66 +195,30 @@ func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultip return initOutput, errors.Wrap(err, "fail to init multipart upload job") } -func (b *Bucket) multipartUpload(numThreads int, size int64, key, uploadId string, body io.Reader, parts *[]obs.Part) error { +func (b *Bucket) multipartUpload(size int64, key, uploadId string, body io.Reader) ([]obs.Part, error) { partSum := int(math.Ceil(float64(size) / float64(PartSize))) lastPart := size % PartSize - - uploadPartCh := make(chan obs.Part) - uploadCh := make(chan int) - ctx, cancel := context.WithCancel(context.Background()) - var gerr error - - go func() { - defer close(uploadCh) - for partNum := 0; partNum < partSum; partNum++ { - uploadCh <- partNum + parts := make([]obs.Part, 0, partSum) + for i := 1; i <= partSum; i++ { + partSize := PartSize + if i == partSum { + partSize = lastPart } - }() - for i := 0; i < numThreads; i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case partNum, ok := <-uploadCh: - if !ok { - return - } - offset := int64(partNum) * PartSize - partSize := PartSize - if partNum == partSum { - offset = int64(partSum) * PartSize - partSize = lastPart - } - inputPart := &obs.UploadPartInput{ - Bucket: b.name, - Key: key, - UploadId: uploadId, - Body: body, - PartNumber: partNum + 1, - PartSize: partSize, - Offset: offset, - } - output, err := b.client.UploadPart(inputPart) - if err != nil { - cancel() - gerr = err - return - } - uploadPartCh <- obs.Part{PartNumber: output.PartNumber, ETag: output.ETag} - } - } - }() - } - for i := 0; i < partSum; i++ { - select { - case <-ctx.Done(): - return errors.Wrap(gerr, "fail to multipart upload") - case part := <-uploadPartCh: - *parts = append(*parts, part) + output, err := b.client.UploadPart(&obs.UploadPartInput{ + Bucket: b.name, + Key: key, + UploadId: uploadId, + Body: body, + PartNumber: i, + PartSize: partSize, + Offset: int64(i-1) * PartSize, + }) + if err != nil { + return nil, errors.Wrap(err, "fail to multipart upload") } + parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) } - return nil + return parts, nil } func (b *Bucket) Close() error { return nil } @@ -350,11 +296,10 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( // Exists checks if the given object exists in the bucket. func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { - input := &obs.GetObjectMetadataInput{ + _, err := b.client.GetObjectMetadata(&obs.GetObjectMetadataInput{ Bucket: b.name, Key: name, - } - _, err := b.client.GetObjectMetadata(input) + }) if err != nil { if b.IsObjNotFoundErr(err) { return false, nil @@ -371,27 +316,23 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { if oriErr.Status == "404 Not Found" { return true } - default: - return false } return false } // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { - input := &obs.GetObjectMetadataInput{ + output, err := b.client.GetObjectMetadata(&obs.GetObjectMetadataInput{ Bucket: b.name, Key: name, - } - output, err := b.client.GetObjectMetadata(input) + }) if err != nil { return objstore.ObjectAttributes{}, errors.Wrap(err, "fail to get object metadata") } - attr := objstore.ObjectAttributes{ + return objstore.ObjectAttributes{ Size: output.ContentLength, LastModified: output.LastModified, - } - return attr, nil + }, nil } // NewTestBucket creates test bkt client that before returning creates temporary bucket. @@ -437,11 +378,10 @@ func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool, location bktToCreate = objstore.CreateTemporaryTestBucketName(t) } - input := &obs.CreateBucketInput{ + _, err = b.client.CreateBucket(&obs.CreateBucketInput{ Bucket: bktToCreate, BucketLocation: obs.BucketLocation{Location: location}, - } - _, err = b.client.CreateBucket(input) + }) if err != nil { return nil, nil, err } From 0bd2405b21bf7f4e073452bcdb00bc8210383159 Mon Sep 17 00:00:00 2001 From: setoru Date: Wed, 26 Apr 2023 15:29:59 +0800 Subject: [PATCH 03/10] add factory config for obs Signed-off-by: setoru --- client/factory.go | 4 ++++ objtesting/foreach.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/client/factory.go b/client/factory.go index bfe4370f..000fdcaa 100644 --- a/client/factory.go +++ b/client/factory.go @@ -6,6 +6,7 @@ package client import ( "context" "fmt" + "github.com/thanos-io/objstore/providers/obs" "strings" "github.com/go-kit/log" @@ -38,6 +39,7 @@ const ( ALIYUNOSS ObjProvider = "ALIYUNOSS" BOS ObjProvider = "BOS" OCI ObjProvider = "OCI" + OBS ObjProvider = "OBS" ) type BucketConfig struct { @@ -80,6 +82,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe bucket, err = bos.NewBucket(logger, config, component) case string(OCI): bucket, err = oci.NewBucket(logger, config) + case string(OBS): + bucket, err = obs.NewBucket(logger, config) default: return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type) } diff --git a/objtesting/foreach.go b/objtesting/foreach.go index 07c86738..e514e7a1 100644 --- a/objtesting/foreach.go +++ b/objtesting/foreach.go @@ -4,6 +4,7 @@ package objtesting import ( + "github.com/thanos-io/objstore/providers/obs" "os" "strings" "testing" @@ -183,4 +184,18 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) testFn(t, bkt) }) } + + // Optional OBS. + if !IsObjStoreSkipped(t, client.OBS) { + t.Run("obs", func(t *testing.T) { + bkt, closeFn, err := obs.NewTestBucket(t, "cn-south-1") + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() + + testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) + }) + } } From e26c94babcc4a900e78759b3758cccc53a53dcb5 Mon Sep 17 00:00:00 2001 From: setoru Date: Wed, 26 Apr 2023 15:57:46 +0800 Subject: [PATCH 04/10] add obs config to bucketcfggen Signed-off-by: setoru --- scripts/cfggen/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 57afbe1f..a5591991 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -5,6 +5,7 @@ package main import ( "fmt" + "github.com/thanos-io/objstore/providers/obs" "io" "os" "path/filepath" @@ -44,6 +45,7 @@ var ( client.FILESYSTEM: filesystem.Config{}, client.BOS: bos.Config{}, client.OCI: oci.Config{}, + client.OBS: obs.DefaultConfig, } ) From 803e50b1adedf4ed46a58b9467b8a4651a7f1a6b Mon Sep 17 00:00:00 2001 From: setoru Date: Thu, 27 Apr 2023 15:34:47 +0800 Subject: [PATCH 05/10] add changelog entry Signed-off-by: setoru --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47541e3d..05aafa7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#41](https://github.com/thanos-io/objstore/pull/41) S3: Support S3 session token. - [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled - [#44](https://github.com/thanos-io/objstore/pull/44) Add new metric to count total number of fetched bytes from bucket +- [#50](https://github.com/thanos-io/objstore/pull/50) Add Huawei Cloud OBS Object Storage Support ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. From 69c1212f52bb0d358526ca045de2f50ee5ce92a8 Mon Sep 17 00:00:00 2001 From: setoru Date: Fri, 28 Apr 2023 15:53:30 +0800 Subject: [PATCH 06/10] fix imports Signed-off-by: setoru --- client/factory.go | 14 +++++++------- objtesting/foreach.go | 6 +++--- providers/obs/obs.go | 16 +++++++++------- scripts/cfggen/main.go | 14 +++++++------- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/client/factory.go b/client/factory.go index 000fdcaa..12d61892 100644 --- a/client/factory.go +++ b/client/factory.go @@ -6,25 +6,25 @@ package client import ( "context" "fmt" - "github.com/thanos-io/objstore/providers/obs" "strings" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "gopkg.in/yaml.v2" - "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/azure" "github.com/thanos-io/objstore/providers/bos" "github.com/thanos-io/objstore/providers/cos" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/objstore/providers/gcs" + "github.com/thanos-io/objstore/providers/obs" "github.com/thanos-io/objstore/providers/oci" "github.com/thanos-io/objstore/providers/oss" "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore/providers/swift" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" ) type ObjProvider string diff --git a/objtesting/foreach.go b/objtesting/foreach.go index e514e7a1..87d9ed1b 100644 --- a/objtesting/foreach.go +++ b/objtesting/foreach.go @@ -4,13 +4,10 @@ package objtesting import ( - "github.com/thanos-io/objstore/providers/obs" "os" "strings" "testing" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/azure" @@ -18,10 +15,13 @@ import ( "github.com/thanos-io/objstore/providers/cos" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/objstore/providers/gcs" + "github.com/thanos-io/objstore/providers/obs" "github.com/thanos-io/objstore/providers/oci" "github.com/thanos-io/objstore/providers/oss" "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore/providers/swift" + + "github.com/efficientgo/core/testutil" ) // IsObjStoreSkipped returns true if given provider ID is found in THANOS_TEST_OBJSTORE_SKIP array delimited by comma e.g: diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 8aa635cd..2df638d9 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -5,19 +5,21 @@ package obs import ( "context" - "github.com/go-kit/log" - "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" - "github.com/pkg/errors" - "github.com/prometheus/common/model" - "github.com/thanos-io/objstore" - "github.com/thanos-io/objstore/exthttp" - "gopkg.in/yaml.v2" "io" "math" "os" "strings" "testing" "time" + + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/exthttp" + + "github.com/go-kit/log" + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "gopkg.in/yaml.v2" ) const DirDelim = "/" diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index a5591991..424bf9b0 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -5,28 +5,28 @@ package main import ( "fmt" - "github.com/thanos-io/objstore/providers/obs" "io" "os" "path/filepath" "reflect" "strings" - "github.com/thanos-io/objstore/providers/oci" - - "github.com/fatih/structtag" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/azure" "github.com/thanos-io/objstore/providers/bos" "github.com/thanos-io/objstore/providers/cos" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/objstore/providers/gcs" + "github.com/thanos-io/objstore/providers/obs" + "github.com/thanos-io/objstore/providers/oci" "github.com/thanos-io/objstore/providers/oss" "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore/providers/swift" + + "github.com/fatih/structtag" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" "gopkg.in/alecthomas/kingpin.v2" "gopkg.in/yaml.v2" ) From 368a778eadf7bccc858fc152834dffcd0f96b7c6 Mon Sep 17 00:00:00 2001 From: setoru Date: Fri, 28 Apr 2023 16:03:03 +0800 Subject: [PATCH 07/10] fix return readable error Signed-off-by: setoru --- providers/obs/obs.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 2df638d9..af14c354 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -120,7 +120,7 @@ func (b *Bucket) Name() string { return b.name } -// Delete removes the object with the given name +// Delete removes the object with the given name. func (b *Bucket) Delete(ctx context.Context, name string) error { input := &obs.DeleteObjectInput{Bucket: b.name, Key: name} _, err := b.client.DeleteObject(input) @@ -139,12 +139,13 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { return errors.New("object size must be provided") } if size <= MinMultipartUploadSize { - err := b.putObjectSingle(name, r) + err = b.putObjectSingle(name, r) if err != nil { return err } } else { - initOutput, err := b.initiateMultipartUpload(name) + var initOutput *obs.InitiateMultipartUploadOutput + initOutput, err = b.initiateMultipartUpload(name) if err != nil { return err } @@ -152,7 +153,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { uploadId := initOutput.UploadId defer func() { if err != nil { - if _, err := b.client.AbortMultipartUpload(&obs.AbortMultipartUploadInput{ + if _, err = b.client.AbortMultipartUpload(&obs.AbortMultipartUploadInput{ UploadId: uploadId, Bucket: b.name, Key: name, @@ -186,7 +187,10 @@ func (b *Bucket) putObjectSingle(key string, body io.Reader) error { input.Key = key input.Body = body _, err := b.client.PutObject(input) - return errors.Wrap(err, "fail to upload object") + if err != nil { + return errors.Wrap(err, "fail to upload object") + } + return nil } func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultipartUploadOutput, err error) { @@ -194,7 +198,10 @@ func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultip initInput.Bucket = b.name initInput.Key = key initOutput, err := b.client.InitiateMultipartUpload(initInput) - return initOutput, errors.Wrap(err, "fail to init multipart upload job") + if err != nil { + return nil, errors.Wrap(err, "fail to init multipart upload job") + } + return initOutput, nil } func (b *Bucket) multipartUpload(size int64, key, uploadId string, body io.Reader) ([]obs.Part, error) { From f2475dcd5f38db40b46caa479df7f0fbd18fdb1b Mon Sep 17 00:00:00 2001 From: setoru Date: Thu, 4 May 2023 15:15:15 +0800 Subject: [PATCH 08/10] fix makefile and readme Signed-off-by: setoru --- .github/workflows/test.yaml | 2 +- Makefile | 2 +- README.md | 40 +++++++++++++++++++++++++++++++++++-- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 59a43eba..8b83c6c2 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -69,7 +69,7 @@ jobs: - name: Run unit tests env: # THANOS_TEST_OBJSTORE_SKIP: AZURE,COS,ALIYUNOSS,BOS - THANOS_TEST_OBJSTORE_SKIP: GCS,S3,SWIFT,AZURE,COS,ALIYUNOSS,BOS,OCI + THANOS_TEST_OBJSTORE_SKIP: GCS,S3,SWIFT,AZURE,COS,ALIYUNOSS,BOS,OCI,OBS # Variables for Swift testing. OS_AUTH_URL: http://127.0.0.1:5000/v2.0 OS_PASSWORD: s3cr3t diff --git a/Makefile b/Makefile index 72d88f6b..09fd0d3e 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ MDOX_VALIDATE_CONFIG ?= .mdox.validate.yaml .PHONY: test-local test-local: - THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI $(MAKE) test + THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI,OBS $(MAKE) test .PHONY: test test: diff --git a/README.md b/README.md index 3ee3c375..6f1fe103 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,7 @@ Current object storage client implementations: | [Baidu BOS](#baidu-bos) | Beta | Production Usage | no | @yahaa | | [Local Filesystem](#filesystem) | Stable | Testing and Demo only | yes | @bwplotka | | [Oracle Cloud Infrastructure Object Storage](#oracle-cloud-infrastructure-object-storage) | Beta | Production Usage | yes | @aarontams,@gaurav-05,@ericrrath | +| [HuaweiCloud OBS](#huaweicloud-obs) | Beta | Production Usage | no | @setoru | **Missing support to some object storage?** Check out [how to add your client section](#how-to-add-a-new-client-to-thanos) @@ -289,7 +290,7 @@ Example working AWS IAM policy for user: To test the policy, set env vars for S3 access for *empty, not used* bucket as well as: ``` -THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI +THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI,OBS THANOS_ALLOW_EXISTING_BUCKET_USE=true ``` @@ -323,7 +324,7 @@ We need access to CreateBucket and DeleteBucket and access to all buckets: } ``` -With this policy you should be able to run set `THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI` and unset `S3_BUCKET` and run all tests using `make test`. +With this policy you should be able to run set `THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI,OBS` and unset `S3_BUCKET` and run all tests using `make test`. Details about AWS policies: https://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html @@ -640,6 +641,41 @@ config: You can also include any of the optional configuration just like the example in `Default Provider`. +##### HuaweiCloud OBS + +To use HuaweiCloud OBS as storage store, you should apply a HuaweiCloud Account to create an object storage bucket at first. +Note that detailed from [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) + +To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in yaml format stored in a file: + +```yaml mdox-exec="go run scripts/cfggen/main.go --name=cos.Config" +type: OBS +config: + bucket: "" + endpoint: "" + access_key: "" + secret_key: "" + http_config: + idle_conn_timeout: 1m30s + response_header_timeout: 2m + insecure_skip_verify: false + tls_handshake_timeout: 10s + expect_continue_timeout: 1s + max_idle_conns: 100 + max_idle_conns_per_host: 100 + max_conns_per_host: 0 + tls_config: + ca_file: "" + cert_file: "" + key_file: "" + server_name: "" + insecure_skip_verify: false + disable_compression: false +prefix: "" +``` + +The `access_key` and `secret_key` field is required. The `http_config` field is optional for optimize HTTP transport settings. + #### How to add a new client to Thanos? Following checklist allows adding new Go code client to supported providers: From e5dd1f82993f44d8e3f5eb4f11c243926b8d1872 Mon Sep 17 00:00:00 2001 From: setoru Date: Fri, 5 May 2023 11:07:16 +0800 Subject: [PATCH 09/10] fix readability problem and comment Signed-off-by: setoru --- README.md | 6 +++--- providers/obs/obs.go | 26 ++++++++++++-------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 6f1fe103..5cf090e3 100644 --- a/README.md +++ b/README.md @@ -643,10 +643,10 @@ You can also include any of the optional configuration just like the example in ##### HuaweiCloud OBS -To use HuaweiCloud OBS as storage store, you should apply a HuaweiCloud Account to create an object storage bucket at first. -Note that detailed from [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) +To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. +More details: [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) -To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in yaml format stored in a file: +To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in YAML format stored in a file: ```yaml mdox-exec="go run scripts/cfggen/main.go --name=cos.Config" type: OBS diff --git a/providers/obs/obs.go b/providers/obs/obs.go index af14c354..b3585530 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -175,7 +175,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { Parts: parts, }) if err != nil { - return errors.Wrap(err, "fail to complete multipart upload") + return errors.Wrap(err, "failed to complete multipart upload") } } return nil @@ -188,7 +188,7 @@ func (b *Bucket) putObjectSingle(key string, body io.Reader) error { input.Body = body _, err := b.client.PutObject(input) if err != nil { - return errors.Wrap(err, "fail to upload object") + return errors.Wrap(err, "failed to upload object") } return nil } @@ -199,7 +199,7 @@ func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultip initInput.Key = key initOutput, err := b.client.InitiateMultipartUpload(initInput) if err != nil { - return nil, errors.Wrap(err, "fail to init multipart upload job") + return nil, errors.Wrap(err, "failed to init multipart upload job") } return initOutput, nil } @@ -223,7 +223,7 @@ func (b *Bucket) multipartUpload(size int64, key, uploadId string, body io.Reade Offset: int64(i-1) * PartSize, }) if err != nil { - return nil, errors.Wrap(err, "fail to multipart upload") + return nil, errors.Wrap(err, "failed to multipart upload") } parts = append(parts, obs.Part{PartNumber: output.PartNumber, ETag: output.ETag}) } @@ -248,16 +248,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt for { output, err := b.client.ListObjects(input) if err != nil { - return errors.Wrap(err, "fail to list object") + return errors.Wrap(err, "failed to list object") } for _, content := range output.Contents { if err := f(content.Key); err != nil { - return errors.Wrap(err, "fail to call f for object") + return errors.Wrap(err, "failed to call iter function for object X") } } for _, topDir := range output.CommonPrefixes { if err := f(topDir); err != nil { - return errors.Wrap(err, "fail to call f for top dir object") + return errors.Wrap(err, "failed to call f for top dir object") } } @@ -291,14 +291,13 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, errors.New("incorrect offset") } input.RangeStart = off + input.RangeEnd = math.MaxInt64 if length != -1 { input.RangeEnd = off + length - 1 - } else { - input.RangeEnd = math.MaxInt64 } output, err := b.client.GetObject(input) if err != nil { - return nil, errors.Wrap(err, "fail to get object") + return nil, errors.Wrap(err, "failed to get object") } return output.Body, nil } @@ -313,15 +312,14 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { if b.IsObjNotFoundErr(err) { return false, nil } - return false, errors.Wrap(err, "fail to get object metadata") + return false, errors.Wrap(err, "failed to get object metadata") } return true, nil } // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. func (b *Bucket) IsObjNotFoundErr(err error) bool { - switch oriErr := errors.Cause(err).(type) { - case obs.ObsError: + if oriErr, ok := errors.Cause(err).(obs.ObsError); ok { if oriErr.Status == "404 Not Found" { return true } @@ -336,7 +334,7 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt Key: name, }) if err != nil { - return objstore.ObjectAttributes{}, errors.Wrap(err, "fail to get object metadata") + return objstore.ObjectAttributes{}, errors.Wrap(err, "failed to get object metadata") } return objstore.ObjectAttributes{ Size: output.ContentLength, From 809c245beb7a8dee6b21a1bf6670a8b369dc5672 Mon Sep 17 00:00:00 2001 From: setoru Date: Fri, 5 May 2023 16:43:18 +0800 Subject: [PATCH 10/10] fix comment Signed-off-by: setoru --- providers/obs/obs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/obs/obs.go b/providers/obs/obs.go index b3585530..f0433708 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -252,12 +252,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt } for _, content := range output.Contents { if err := f(content.Key); err != nil { - return errors.Wrap(err, "failed to call iter function for object X") + return errors.Wrapf(err, "failed to call iter function for object %s", content.Key) } } for _, topDir := range output.CommonPrefixes { if err := f(topDir); err != nil { - return errors.Wrap(err, "failed to call f for top dir object") + return errors.Wrapf(err, "failed to call iter function for top dir object %s", topDir) } }