Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objectstore/cos: support multi-part upload (#5137) #5139

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4908](https://github.com/thanos-io/thanos/pull/4908) UI: Show 'minus' icon and add tooltip when store min / max time is not available.
- [#4883](https://github.com/thanos-io/thanos/pull/4883) Mixin: adhere to RFC 1123 compatible component naming.
- [#5114](https://github.com/thanos-io/thanos/pull/5114) Tools `thanos bucket inspect` fix time formatting.
- [#5139](https://github.com/thanos-io/thanos/pull/5139) COS: Support multi-part upload, fix upload issue when index size more than 5GB.

## [v0.24.0](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.22

Expand Down
81 changes: 79 additions & 2 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -194,10 +195,86 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
}, nil
}

var (
_ cos.FixedLengthReader = (*fixedLengthReader)(nil)
)

type fixedLengthReader struct {
io.Reader
size int64
}

func newFixedLengthReader(r io.Reader, size int64) io.Reader {
return fixedLengthReader{
Reader: io.LimitReader(r, size),
size: size,
}
}

// Size implement cos.FixedLengthReader interface.
func (r fixedLengthReader) Size() int64 {
return r.size
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrap(err, "upload cos object")
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
// partSize 128MB.
const partSize = 1024 * 1024 * 128
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrapf(err, "Put object: %s", name)
}
return nil
}
// 1. init.
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil)
if err != nil {
return errors.Wrapf(err, "InitiateMultipartUpload %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) {
r := newFixedLengthReader(r, partSize)
resp, err := b.client.Object.UploadPart(ctx, name, uploadID, part, r, &cos.ObjectUploadPartOptions{
ContentLength: partSize,
})
if err != nil {
if _, err := b.client.Object.AbortMultipartUpload(ctx, name, uploadID); err != nil {
return "", err
}
return "", err
}
etag := resp.Header.Get("ETag")
return etag, nil
}
optcom := &cos.CompleteMultipartUploadOptions{}
// 2. upload parts.
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 3. upload last part.
if lastSlice != 0 {
part := partNums + 1
etag, err := uploadEveryPart(lastSlice, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 4. complete.
if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil {
return errors.Wrapf(err, "CompleteMultipartUpload %s", name)
}
return nil
}
Expand Down