From f7b22c7b3d2c0d3b7607c672c97d6cba1f38c128 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Fri, 18 Oct 2019 14:13:45 +0800 Subject: [PATCH 1/9] add oss support Signed-off-by: wujinhu --- CHANGELOG.md | 1 + CONTRIBUTING.md | 1 + Makefile | 4 +- docs/storage.md | 18 ++ go.mod | 2 + go.sum | 4 + pkg/objstore/client/factory.go | 14 +- pkg/objstore/objtesting/foreach.go | 18 ++ pkg/objstore/oss/oss.go | 380 +++++++++++++++++++++++++++++ scripts/cfggen/main.go | 12 +- 10 files changed, 442 insertions(+), 12 deletions(-) create mode 100644 pkg/objstore/oss/oss.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eb96112711..4e07272add 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up. +- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss-configuration) for further information. ### Fixed diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e589fb3d50..b6b6962505 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -57,6 +57,7 @@ $ git push origin - THANOS_SKIP_AZURE_TESTS to skip Azure tests. - THANOS_SKIP_SWIFT_TESTS to skip SWIFT tests. - THANOS_SKIP_TENCENT_COS_TESTS to skip Tencent COS tests. +- THANOS_SKIP_ALIYUN_OSS_TESTS to skip Aliyun OSS tests. If you skip all of these, the store specific tests will be run against memory object storage only. CI runs GCS and inmem tests only for now. Not having these variables will produce auth errors against GCS, AWS, Azure or COS tests. diff --git a/Makefile b/Makefile index 0f81720236..3f5aebc976 100644 --- a/Makefile +++ b/Makefile @@ -160,8 +160,8 @@ docs: $(EMBEDMD) build .PHONY: check-docs check-docs: $(EMBEDMD) $(LICHE) build @EMBEDMD_BIN="$(EMBEDMD)" scripts/genflagdocs.sh check - @$(LICHE) --recursive docs --exclude "cloud.tencent.com" --document-root . - @$(LICHE) --exclude "cloud.tencent.com|goreportcard.com" --document-root . *.md + @$(LICHE) --recursive docs --exclude "(cloud.tencent.com|alibabacloud.com)" --document-root . + @$(LICHE) --exclude "(cloud.tencent.com|alibabacloud.com)" --document-root . *.md # checks Go code comments if they have trailing period (excludes protobuffers and vendor files). # Comments with more than 3 spaces at beginning are omitted from the check, example: '// - foo'. diff --git a/docs/storage.md b/docs/storage.md index 20d815a439..c97ca29bbc 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -60,6 +60,7 @@ Current object storage client implementations: | [Azure Storage Account](./storage.md#azure) | Stable (production usage) | yes | @vglafirov | | [OpenStack Swift](./storage.md#openstack-swift) | Beta (working PoCs, testing usage) | no | @sudhi-vm | | [Tencent COS](./storage.md#tencent-cos) | Beta (testing usage) | no | @jojohappy | +| [AliYun OSS](./storage.md#aliyun-oss) | Beta (testing usage) | no | @shaulboozhiao,@wujinhu | NOTE: Currently Thanos requires strong consistency (write-read) for object store implementation. @@ -336,3 +337,20 @@ config: ``` Set the flags `--objstore.config-file` to reference to the configuration file. + +## AliYun OSS Configuration +In order to use AliYun OSS object storage, you should first create a bucket with proper Storage Class , ACLs and get the access key on the AliYun cloud. Go to [https://www.alibabacloud.com/product/oss](https://www.alibabacloud.com/product/oss) for more detail. + +To use AliYun OSS object storage, please specify following yaml configuration file in `objstore.config*` flag. + +[embedmd]:# (flags/config_aliyunoss.txt $) +```$ +type: ALIYUNOSS +config: + endpoint: "" + bucket: "" + access_key_id: "" + access_key_secret: "" +``` + +Use --objstore.config-file to reference to this configuration file. \ No newline at end of file diff --git a/go.mod b/go.mod index a9cd7f1f63..7409165622 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ require ( cloud.google.com/go v0.44.1 github.com/Azure/azure-storage-blob-go v0.7.0 github.com/NYTimes/gziphandler v1.1.1 + github.com/aliyun/aliyun-oss-go-sdk v2.0.1+incompatible github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 + github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/cespare/xxhash v1.1.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/fatih/structtag v1.0.0 diff --git a/go.sum b/go.sum index 655af0f96f..312e68f3cf 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/aliyun/aliyun-oss-go-sdk v2.0.1+incompatible h1:/MzpJOMHn/uBtd1dkS7Q9PF2ZjT6xTQMXSvv1e6ydXc= +github.com/aliyun/aliyun-oss-go-sdk v2.0.1+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -47,6 +49,8 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.23.12 h1:2UnxgNO6Y5J1OrkXS8XNp0UatDxD1bWHiDT62RDPggI= github.com/aws/aws-sdk-go v1.23.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= +github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go index 2a4edc86b0..50caf4d8e7 100644 --- a/pkg/objstore/client/factory.go +++ b/pkg/objstore/client/factory.go @@ -13,6 +13,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/gcs" + "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" yaml "gopkg.in/yaml.v2" @@ -21,11 +22,12 @@ import ( type ObjProvider string const ( - GCS ObjProvider = "GCS" - S3 ObjProvider = "S3" - AZURE ObjProvider = "AZURE" - SWIFT ObjProvider = "SWIFT" - COS ObjProvider = "COS" + GCS ObjProvider = "GCS" + S3 ObjProvider = "S3" + AZURE ObjProvider = "AZURE" + SWIFT ObjProvider = "SWIFT" + COS ObjProvider = "COS" + ALIYUNOSS ObjProvider = "ALIYUNOSS" ) type BucketConfig struct { @@ -59,6 +61,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe bucket, err = swift.NewContainer(logger, config) case string(COS): bucket, err = cos.NewBucket(logger, config, component) + case string(ALIYUNOSS): + bucket, err = oss.NewBucket(logger, config, component) default: return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type) } diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index 0b5dffcb03..0eddd59ac9 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -11,6 +11,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/gcs" "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" "github.com/thanos-io/thanos/pkg/testutil" @@ -116,4 +117,21 @@ func ForeachStore(t *testing.T, testFn func(t testing.TB, bkt objstore.Bucket)) } else { t.Log("THANOS_SKIP_TENCENT_COS_TESTS envvar present. Skipping test against Tencent COS.") } + + // Optional OSS. + if _, ok := os.LookupEnv("THANOS_SKIP_ALIYUN_OSS_TESTS"); !ok { + bkt, closeFn, err := oss.NewTestBucket(t) + testutil.Ok(t, err) + + ok := t.Run("AliYun oss", func(t *testing.T) { + testFn(t, bkt) + }) + + closeFn() + if !ok { + return + } + } else { + t.Log("THANOS_SKIP_ALIYUN_OSS_TESTS envvar present. Skipping test against AliYun OSS.") + } } diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go new file mode 100644 index 0000000000..eeb8c45258 --- /dev/null +++ b/pkg/objstore/oss/oss.go @@ -0,0 +1,380 @@ +package oss + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math" + "math/rand" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + "gopkg.in/yaml.v2" +) + +type Config struct { + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + AccessKeyID string `yaml:"access_key_id"` + AccessKeySecret string `yaml:"access_key_secret"` +} + +type Bucket struct { + name string + logger log.Logger + client *alioss.Client + config Config + bucket *alioss.Bucket +} + +func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { + c := configFromEnv() + err := func(conf Config) error { + if conf.Endpoint == "" { + return errors.New("no aliyun oss endpoint in config file") + } + if conf.AccessKeyID == "" { + return errors.New("access_key_id is not present in config file") + } + if conf.AccessKeySecret == "" { + return errors.New("access_key_secret is not present in config file") + } + return nil + }(c) + if err != nil { + return nil, nil, err + } + if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "true" { + t.Log("ALIYUNOSS_BUCKET is defined. Normally this tests will create temporary bucket " + + "and delete it after test. Unset ALIYUNOSS_BUCKET env variable to use default logic. If you really want to run " + + "tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true.") + return NewTestBucketFromConfig(t, c, true) + } + return NewTestBucketFromConfig(t, c, false) +} + +func calculateChunks(name string, r io.Reader) (int, int64, error) { + switch r.(type) { + case *os.File: + f, _ := r.(*os.File) + if fileInfo, err := f.Stat(); err == nil { + s := fileInfo.Size() + return int(math.Floor(float64(s) / alioss.MaxPartSize)), s % alioss.MaxPartSize, nil + } + case *strings.Reader: + f, _ := r.(*strings.Reader) + return int(math.Floor(float64(f.Size()) / alioss.MaxPartSize)), f.Size() % alioss.MaxPartSize, nil + } + return -1, 0, errors.New("unsupported implement of io.Reader") +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { + chunksnum, lastslice, err := calculateChunks(name, r) + if err != nil { + return err + } + + ncloser := ioutil.NopCloser(r) + switch chunksnum { + case 0: + if err := b.bucket.PutObject(name, ncloser); err != nil { + return errors.Wrap(err, "failed to upload oss object") + } + default: + { + init, err := b.bucket.InitiateMultipartUpload(name) + if err != nil { + return errors.Wrap(err, "failed to initiate multi-part upload") + } + chunk := 0 + uploadEveryPart := func(everypartsize int64, cnk int) (alioss.UploadPart, error) { + prt, err := b.bucket.UploadPart(init, ncloser, everypartsize, cnk) + if err != nil { + if err := b.bucket.AbortMultipartUpload(init); err != nil { + return prt, errors.Wrap(err, "failed to upload multi-part chunk") + } + } + return prt, nil + } + var parts []alioss.UploadPart + for ; chunk < chunksnum; chunk++ { + part, err := uploadEveryPart(alioss.MaxPartSize, chunk+1) + if err != nil { + return err + } + parts = append(parts, part) + } + if lastslice != 0 { + part, err := uploadEveryPart(lastslice, chunksnum+1) + if err != nil { + return errors.Wrap(err, "failed to upload the last chunk") + } + parts = append(parts, part) + } + _, err = b.bucket.CompleteMultipartUpload(init, parts) + if err != nil { + return errors.Wrap(err, "failed to set multi-part upload completive") + } + } + } + return nil +} + +// Delete removes the object with the given name. +func (b *Bucket) Delete(ctx context.Context, name string) error { + if err := b.bucket.DeleteObject(name); err != nil { + return errors.Wrap(err, "delete oss object") + } + return nil +} + +func configFromEnv() Config { + c := Config{ + Endpoint: os.Getenv("ALIYUNOSS_ENDPOINT"), + Bucket: os.Getenv("ALIYUNOSS_BUCKET"), + AccessKeyID: os.Getenv("ALIYUNOSS_ACCESS_KEY_ID"), + AccessKeySecret: os.Getenv("ALIYUNOSS_ACCESS_KEY_SECRET"), + } + return c +} + +func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { + config, err := func(conf []byte) (Config, error) { + var config Config + if err := yaml.Unmarshal(conf, &config); err != nil { + return Config{}, err + } + + return config, nil + }(conf) + + if err != nil { + return nil, errors.Wrap(err, "parse aliyun oss config file failed") + } + if err := validate(config); err != nil { + return nil, errors.Wrap(err, "validate aliyun oss config file failed") + } + + client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret) + if err != nil { + return nil, errors.Wrap(err, "create aliyun oss client failed") + } + bk, err := client.Bucket(config.Bucket) + if err != nil { + return nil, errors.Wrapf(err, "use aliyun oss bucket %s failed", config.Bucket) + } + + bkt := &Bucket{ + logger: logger, + client: client, + name: config.Bucket, + config: config, + bucket: bk, + } + return bkt, nil +} + +// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { + if dir != "" { + dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim + } + + marker := alioss.Marker("") + for { + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "context closed while iterating bucket") + } + objects, err := b.bucket.ListObjects(alioss.Prefix(dir), alioss.Delimiter(objstore.DirDelim), marker) + if err != nil { + return errors.Wrap(err, "listing aliyun oss bucket failed") + } + marker = alioss.Marker(objects.NextMarker) + + for _, object := range objects.Objects { + if err := f(object.Key); err != nil { + return errors.Wrapf(err, "callback func invoke for object %s failed ", object.Key) + } + } + + for _, object := range objects.CommonPrefixes { + if err := f(object); err != nil { + return errors.Wrapf(err, "callback func invoke for directory %s failed", object) + } + } + if !objects.IsTruncated { + break + } + } + + return nil +} + +func (b *Bucket) Name() string { + return b.name +} + +// validate checks to see the config options are set. +func validate(conf Config) error { + if conf.Endpoint == "" { + return errors.New("no aliyun oss endpoint in config file") + } + if conf.Bucket == "" { + return errors.New("no aliyun oss bucket in config file") + } + + if conf.AccessKeyID == "" { + return errors.New("access_key_id is not present in config file") + } + if conf.AccessKeySecret == "" { + return errors.New("access_key_secret is not present in config file") + } + return nil +} + +func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore.Bucket, func(), error) { + if c.Bucket == "" { + src := rand.NewSource(time.Now().UnixNano()) + + bktToCreate := strings.Replace(fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), "_", "-", -1) + if len(bktToCreate) >= 63 { + bktToCreate = bktToCreate[:63] + } + testclient, err := alioss.New(c.Endpoint, c.AccessKeyID, c.AccessKeySecret) + if err != nil { + return nil, nil, errors.Wrap(err, "create aliyun oss client failed") + } + + if err := testclient.CreateBucket(bktToCreate); err != nil { + return nil, nil, errors.Wrapf(err, "create aliyun oss bucket %s failed", bktToCreate) + } + c.Bucket = bktToCreate + } + + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-aliyun-oss-test") + if err != nil { + return nil, nil, err + } + + if reuseBucket { + if err := b.Iter(context.Background(), "", func(f string) error { + return errors.Errorf("bucket %s is not empty", c.Bucket) + }); err != nil { + return nil, nil, errors.Wrapf(err, "oss check bucket %s", c.Bucket) + } + + t.Log("WARNING. Reusing", c.Bucket, "Aliyun OSS bucket for OSS tests. Manual cleanup afterwards is required") + return b, func() {}, nil + } + + return b, func() { + objstore.EmptyBucket(t, context.Background(), b) + if err := b.client.DeleteBucket(c.Bucket); err != nil { + t.Logf("deleting bucket %s failed: %s", c.Bucket, err) + } + }, nil +} + +func (b *Bucket) Close() error { return nil } + +func (b *Bucket) setRange(start, end int64, name string) (alioss.Option, error) { + var opt alioss.Option + if 0 <= start && start <= end { + header, err := b.bucket.GetObjectMeta(name) + if err != nil { + return nil, err + } + + size, err := strconv.ParseInt(header["Content-Length"][0], 10, 0) + if err != nil { + return nil, err + } + + if end > size { + end = size - 1 + } + + opt = alioss.Range(start, end) + } else { + return nil, errors.Errorf("Invalid range specified: start=%d end=%d", start, end) + } + return opt, nil +} + +func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if len(name) == 0 { + return nil, errors.New("given object name should not empty") + } + + var opts []alioss.Option + if length != -1 { + opt, err := b.setRange(off, off+length-1, name) + if err != nil { + return nil, err + } + opts = append(opts, opt) + } + + resp, err := b.bucket.GetObject(name, opts...) + if err != nil { + return nil, err + } + + if _, err := resp.Read(nil); err != nil { + runutil.CloseWithLogOnErr(b.logger, resp, "oss get range obj close") + return nil, err + } + + return resp, nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getRange(ctx, name, 0, -1) +} + +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getRange(ctx, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { + exists, err := b.bucket.IsObjectExist(name) + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "cloud not check if object exists") + } + + return exists, nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + switch aliErr := err.(type) { + case alioss.ServiceError: + if aliErr.StatusCode == http.StatusNotFound { + return true + } + } + return false +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 4bf93a69c8..7b79819415 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -16,6 +16,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/gcs" + "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" trclient "github.com/thanos-io/thanos/pkg/tracing/client" @@ -28,11 +29,12 @@ import ( var ( bucketConfigs = map[client.ObjProvider]interface{}{ - client.AZURE: azure.Config{}, - client.GCS: gcs.Config{}, - client.S3: s3.DefaultConfig, - client.SWIFT: swift.SwiftConfig{}, - client.COS: cos.Config{}, + client.AZURE: azure.Config{}, + client.GCS: gcs.Config{}, + client.S3: s3.DefaultConfig, + client.SWIFT: swift.SwiftConfig{}, + client.COS: cos.Config{}, + client.ALIYUNOSS: oss.Config{}, } tracingConfigs = map[trclient.TracingProvider]interface{}{ trclient.JAEGER: jaeger.Config{}, From e7a5d5d699b7a55bb92443172ab4bd2405a8496b Mon Sep 17 00:00:00 2001 From: wujinhu Date: Fri, 18 Oct 2019 16:23:12 +0800 Subject: [PATCH 2/9] fix docs Signed-off-by: wujinhu --- docs/storage.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/storage.md b/docs/storage.md index c97ca29bbc..2abd2b5ea1 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -343,7 +343,7 @@ In order to use AliYun OSS object storage, you should first create a bucket with To use AliYun OSS object storage, please specify following yaml configuration file in `objstore.config*` flag. -[embedmd]:# (flags/config_aliyunoss.txt $) +[embedmd]:# (flags/config_bucket_aliyunoss.txt $) ```$ type: ALIYUNOSS config: @@ -353,4 +353,4 @@ config: access_key_secret: "" ``` -Use --objstore.config-file to reference to this configuration file. \ No newline at end of file +Use --objstore.config-file to reference to this configuration file. From 79b43df6260ab1bf61a75ed7a93a8b2273ffea8d Mon Sep 17 00:00:00 2001 From: wujinhu Date: Fri, 18 Oct 2019 16:34:20 +0800 Subject: [PATCH 3/9] fix Makefile Signed-off-by: wujinhu --- Makefile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3f5aebc976..284cc40b2c 100644 --- a/Makefile +++ b/Makefile @@ -203,17 +203,19 @@ test: check-git install-deps @go install github.com/thanos-io/thanos/cmd/thanos # Be careful on GOCACHE. Those tests are sometimes using built Thanos/Prometheus binaries directly. Don't cache those. @rm -rf ${GOCACHE} - @echo ">> running all tests. Do export THANOS_SKIP_GCS_TESTS='true' or/and THANOS_SKIP_S3_AWS_TESTS='true' or/and THANOS_SKIP_AZURE_TESTS='true' and/or THANOS_SKIP_SWIFT_TESTS='true' and/or THANOS_SKIP_TENCENT_COS_TESTS='true' if you want to skip e2e tests against real store buckets" + @echo ">> running all tests. Do export THANOS_SKIP_GCS_TESTS='true' or/and THANOS_SKIP_S3_AWS_TESTS='true' or/and THANOS_SKIP_AZURE_TESTS='true' and/or THANOS_SKIP_SWIFT_TESTS='true' and/or THANOS_SKIP_ALIYUN_OSS_TESTS='true' and/or THANOS_SKIP_TENCENT_COS_TESTS='true' if you want to skip e2e tests against real store buckets" @go test $(shell go list ./... | grep -v /vendor/); .PHONY: test-ci test-ci: export THANOS_SKIP_AZURE_TESTS = true test-ci: export THANOS_SKIP_SWIFT_TESTS = true test-ci: export THANOS_SKIP_TENCENT_COS_TESTS = true +test-ci: export THANOS_SKIP_ALIYUN_OSS_TESTS = true test-ci: @echo ">> Skipping AZURE tests" @echo ">> Skipping SWIFT tests" @echo ">> Skipping TENCENT tests" + @echo ">> Skipping ALIYUN tests" $(MAKE) test .PHONY: test-local From d3f7eae2b72e3b030ecbe0648d42352a6882ed9a Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 21 Oct 2019 10:49:33 +0800 Subject: [PATCH 4/9] review comments Signed-off-by: wujinhu --- pkg/objstore/oss/oss.go | 87 +++++++++++++---------------------------- 1 file changed, 28 insertions(+), 59 deletions(-) diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index eeb8c45258..96bc8b0af1 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -23,6 +23,10 @@ import ( "gopkg.in/yaml.v2" ) +// Part size for multi part upload +const PartSize = 1024 * 1024 * 128 + +// Config stores the configuration for oss bucket. type Config struct { Endpoint string `yaml:"endpoint"` Bucket string `yaml:"bucket"` @@ -30,6 +34,7 @@ type Config struct { AccessKeySecret string `yaml:"access_key_secret"` } +// Bucket implements the store.Bucket interface. type Bucket struct { name string logger log.Logger @@ -39,21 +44,16 @@ type Bucket struct { } func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { - c := configFromEnv() - err := func(conf Config) error { - if conf.Endpoint == "" { - return errors.New("no aliyun oss endpoint in config file") - } - if conf.AccessKeyID == "" { - return errors.New("access_key_id is not present in config file") - } - if conf.AccessKeySecret == "" { - return errors.New("access_key_secret is not present in config file") - } - return nil - }(c) - if err != nil { - return nil, nil, err + c := Config{ + Endpoint: os.Getenv("ALIYUNOSS_ENDPOINT"), + Bucket: os.Getenv("ALIYUNOSS_BUCKET"), + AccessKeyID: os.Getenv("ALIYUNOSS_ACCESS_KEY_ID"), + AccessKeySecret: os.Getenv("ALIYUNOSS_ACCESS_KEY_SECRET"), + } + + if c.Endpoint == "" || c.AccessKeyID == "" || c.AccessKeySecret == "" { + return nil, nil, errors.New("aliyun oss endpoint or access_key_id or access_key_secret " + + "is not present in config file") } if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "true" { t.Log("ALIYUNOSS_BUCKET is defined. Normally this tests will create temporary bucket " + @@ -70,11 +70,11 @@ func calculateChunks(name string, r io.Reader) (int, int64, error) { f, _ := r.(*os.File) if fileInfo, err := f.Stat(); err == nil { s := fileInfo.Size() - return int(math.Floor(float64(s) / alioss.MaxPartSize)), s % alioss.MaxPartSize, nil + return int(math.Floor(float64(s) / PartSize)), s % PartSize, nil } case *strings.Reader: f, _ := r.(*strings.Reader) - return int(math.Floor(float64(f.Size()) / alioss.MaxPartSize)), f.Size() % alioss.MaxPartSize, nil + return int(math.Floor(float64(f.Size()) / PartSize)), f.Size() % PartSize, nil } return -1, 0, errors.New("unsupported implement of io.Reader") } @@ -103,8 +103,10 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { prt, err := b.bucket.UploadPart(init, ncloser, everypartsize, cnk) if err != nil { if err := b.bucket.AbortMultipartUpload(init); err != nil { - return prt, errors.Wrap(err, "failed to upload multi-part chunk") + return prt, errors.Wrap(err, "failed to abort multi-part upload") } + + return prt, errors.Wrap(err, "failed to upload multi-part chunk") } return prt, nil } @@ -112,7 +114,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { for ; chunk < chunksnum; chunk++ { part, err := uploadEveryPart(alioss.MaxPartSize, chunk+1) if err != nil { - return err + return errors.Wrap(err,"failed to upload every part") } parts = append(parts, part) } @@ -140,31 +142,16 @@ func (b *Bucket) Delete(ctx context.Context, name string) error { return nil } -func configFromEnv() Config { - c := Config{ - Endpoint: os.Getenv("ALIYUNOSS_ENDPOINT"), - Bucket: os.Getenv("ALIYUNOSS_BUCKET"), - AccessKeyID: os.Getenv("ALIYUNOSS_ACCESS_KEY_ID"), - AccessKeySecret: os.Getenv("ALIYUNOSS_ACCESS_KEY_SECRET"), - } - return c -} - +// NewBucket returns a new Bucket using the provided oss config values. func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { - config, err := func(conf []byte) (Config, error) { - var config Config - if err := yaml.Unmarshal(conf, &config); err != nil { - return Config{}, err - } - - return config, nil - }(conf) - - if err != nil { + var config Config + if err := yaml.Unmarshal(conf, &config); err != nil { return nil, errors.Wrap(err, "parse aliyun oss config file failed") } - if err := validate(config); err != nil { - return nil, errors.Wrap(err, "validate aliyun oss config file failed") + + if config.Endpoint == "" || config.Bucket == "" || config.AccessKeyID == "" || config.AccessKeySecret == "" { + return nil, errors.New("aliyun oss endpoint or bucket or access_key_id or access_key_secret " + + "is not present in config file") } client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret) @@ -227,24 +214,6 @@ func (b *Bucket) Name() string { return b.name } -// validate checks to see the config options are set. -func validate(conf Config) error { - if conf.Endpoint == "" { - return errors.New("no aliyun oss endpoint in config file") - } - if conf.Bucket == "" { - return errors.New("no aliyun oss bucket in config file") - } - - if conf.AccessKeyID == "" { - return errors.New("access_key_id is not present in config file") - } - if conf.AccessKeySecret == "" { - return errors.New("access_key_secret is not present in config file") - } - return nil -} - func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore.Bucket, func(), error) { if c.Bucket == "" { src := rand.NewSource(time.Now().UnixNano()) From 1cb9cb1c7d25eae995b9e53ae8c5c158eb14a319 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 21 Oct 2019 10:59:08 +0800 Subject: [PATCH 5/9] fix style Signed-off-by: wujinhu --- pkg/objstore/oss/oss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 96bc8b0af1..3c8df5e498 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -114,7 +114,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { for ; chunk < chunksnum; chunk++ { part, err := uploadEveryPart(alioss.MaxPartSize, chunk+1) if err != nil { - return errors.Wrap(err,"failed to upload every part") + return errors.Wrap(err, "failed to upload every part") } parts = append(parts, part) } From 8f818df167bcd5b9417438ce9b387f2068e79ed8 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 21 Oct 2019 11:40:28 +0800 Subject: [PATCH 6/9] review comments Signed-off-by: wujinhu --- pkg/objstore/oss/oss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 3c8df5e498..80f8a0d443 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -112,7 +112,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { } var parts []alioss.UploadPart for ; chunk < chunksnum; chunk++ { - part, err := uploadEveryPart(alioss.MaxPartSize, chunk+1) + part, err := uploadEveryPart(PartSize, chunk+1) if err != nil { return errors.Wrap(err, "failed to upload every part") } From 21f758fe6ee457754f983e84fdb54cf1927f0bce Mon Sep 17 00:00:00 2001 From: wujinhu Date: Wed, 23 Oct 2019 10:16:27 +0800 Subject: [PATCH 7/9] review comments Signed-off-by: wujinhu --- Makefile | 2 +- pkg/objstore/oss/oss.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 284cc40b2c..b9de15fc2b 100644 --- a/Makefile +++ b/Makefile @@ -161,7 +161,7 @@ docs: $(EMBEDMD) build check-docs: $(EMBEDMD) $(LICHE) build @EMBEDMD_BIN="$(EMBEDMD)" scripts/genflagdocs.sh check @$(LICHE) --recursive docs --exclude "(cloud.tencent.com|alibabacloud.com)" --document-root . - @$(LICHE) --exclude "(cloud.tencent.com|alibabacloud.com)" --document-root . *.md + @$(LICHE) --exclude "(cloud.tencent.com|goreportcard.com|alibabacloud.com)" --document-root . *.md # checks Go code comments if they have trailing period (excludes protobuffers and vendor files). # Comments with more than 3 spaces at beginning are omitted from the check, example: '// - foo'. diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 80f8a0d443..7fc413ccdc 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -15,7 +15,6 @@ import ( "time" alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" - "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" @@ -23,7 +22,7 @@ import ( "gopkg.in/yaml.v2" ) -// Part size for multi part upload +// Part size for multi part upload. const PartSize = 1024 * 1024 * 128 // Config stores the configuration for oss bucket. @@ -125,8 +124,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { } parts = append(parts, part) } - _, err = b.bucket.CompleteMultipartUpload(init, parts) - if err != nil { + if _, err := b.bucket.CompleteMultipartUpload(init, parts); err != nil { return errors.Wrap(err, "failed to set multi-part upload completive") } } @@ -304,7 +302,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( resp, err := b.bucket.GetObject(name, opts...) if err != nil { - return nil, err + return nil, errors.Wrap(err, "get object failed") } if _, err := resp.Read(nil); err != nil { From a0cdef13036d9e8cebf83c2c863406b2c24e5909 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Wed, 23 Oct 2019 10:32:06 +0800 Subject: [PATCH 8/9] review comments Signed-off-by: wujinhu --- pkg/objstore/oss/oss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 7fc413ccdc..f61906d5d7 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -302,7 +302,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( resp, err := b.bucket.GetObject(name, opts...) if err != nil { - return nil, errors.Wrap(err, "get object failed") + return nil, err } if _, err := resp.Read(nil); err != nil { From 96620b995d85a3b9dcadc902f0c247703e3f2894 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Wed, 23 Oct 2019 17:07:41 +0800 Subject: [PATCH 9/9] review comments Signed-off-by: wujinhu --- pkg/objstore/oss/oss.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index f61906d5d7..bd913f6c96 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -18,7 +18,6 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/runutil" "gopkg.in/yaml.v2" ) @@ -305,11 +304,6 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } - if _, err := resp.Read(nil); err != nil { - runutil.CloseWithLogOnErr(b.logger, resp, "oss get range obj close") - return nil, err - } - return resp, nil }