Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace BucketReader.ObjectSize() with .Attributes() #2613

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ config:

chunk_subrange_size: 16000
max_chunks_get_range_requests: 3
chunk_object_size_ttl: 24h
chunk_object_attrs_ttl: 24h
chunk_subrange_ttl: 24h
blocks_iter_ttl: 5m
metafile_exists_ttl: 2h
Expand All @@ -307,7 +307,7 @@ Additional options to configure various aspects of chunks cache are available:

- `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with.
- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges.
- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache.
- `chunk_object_attrs_ttl`: how long to keep information about chunk file attributes (e.g. size) in the cache.
- `chunk_subrange_ttl`: how long to keep individual subranges in the cache.

Following options are used for metadata caching (meta.json files, deletion mark files, iteration result):
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ type chunkedIndexReader struct {

func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID) (*chunkedIndexReader, int, error) {
indexFilepath := filepath.Join(id.String(), block.IndexFilename)
size, err := bkt.ObjectSize(ctx, indexFilepath)
attrs, err := bkt.Attributes(ctx, indexFilepath)
if err != nil {
return nil, 0, errors.Wrapf(err, "get object size of %s", indexFilepath)
return nil, 0, errors.Wrapf(err, "get object attributes of %s", indexFilepath)
}

rc, err := bkt.GetRange(ctx, indexFilepath, 0, index.HeaderLen)
Expand Down Expand Up @@ -164,7 +164,7 @@ func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ul
ir := &chunkedIndexReader{
ctx: ctx,
path: indexFilepath,
size: size,
size: uint64(attrs.Size),
povilasv marked this conversation as resolved.
Show resolved Hide resolved
bkt: bkt,
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/objstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,23 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return b.getBlobReader(ctx, name, off, length)
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return 0, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name)
return objstore.ObjectAttributes{}, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name)
}

var props *blob.BlobGetPropertiesResponse
props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
if err != nil {
return 0, err
return objstore.ObjectAttributes{}, err
}
return uint64(props.ContentLength()), nil

return objstore.ObjectAttributes{
Size: props.ContentLength(),
LastModified: props.LastModified(),
}, nil
}

// Exists checks if the given object exists.
Expand Down
56 changes: 56 additions & 0 deletions pkg/objstore/clientutil/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package clientutil

import (
"net/http"
"strconv"
"time"

"github.com/pkg/errors"
)

// ParseContentLength returns the content length (in bytes) parsed from the Content-Length
// HTTP header in input.
func ParseContentLength(m http.Header) (int64, error) {
const name = "Content-Length"

v, ok := m[name]
if !ok {
return 0, errors.Errorf("%s header not found", name)
}

if len(v) == 0 {
return 0, errors.Errorf("%s header has no values", name)
}

ret, err := strconv.ParseInt(v[0], 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "convert %s", name)
}

return ret, nil
}

// ParseLastModified returns the timestamp parsed from the Last-Modified
// HTTP header in input (expected to be in the RFC3339 format).
func ParseLastModified(m http.Header) (time.Time, error) {
pracucci marked this conversation as resolved.
Show resolved Hide resolved
const name = "Last-Modified"

v, ok := m[name]
if !ok {
return time.Time{}, errors.Errorf("%s header not found", name)
}

if len(v) == 0 {
return time.Time{}, errors.Errorf("%s header has no values", name)
}

mod, err := time.Parse(time.RFC3339, v[0])
if err != nil {
return time.Time{}, errors.Wrapf(err, "parse %s", name)
}

return mod, nil
}
91 changes: 91 additions & 0 deletions pkg/objstore/clientutil/parse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package clientutil

import (
"net/http"
"testing"
"time"

alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestParseLastModified(t *testing.T) {
tests := map[string]struct {
headerValue string
expectedVal time.Time
expectedErr string
}{
"no header": {
expectedErr: "Last-Modified header not found",
},
"invalid header value": {
headerValue: "invalid",
expectedErr: `parse Last-Modified: parsing time "invalid" as "2006-01-02T15:04:05Z07:00": cannot parse "invalid" as "2006"`,
},
"valid header value": {
headerValue: "2015-11-06T10:07:11.000Z",
expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC),
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
meta := http.Header{}
if testData.headerValue != "" {
meta.Add(alioss.HTTPHeaderLastModified, testData.headerValue)
}

actual, err := ParseLastModified(meta)

if testData.expectedErr != "" {
testutil.NotOk(t, err)
testutil.Equals(t, testData.expectedErr, err.Error())
} else {
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedVal, actual)
}
})
}
}

func TestParseContentLength(t *testing.T) {
tests := map[string]struct {
headerValue string
expectedVal int64
expectedErr string
}{
"no header": {
expectedErr: "Content-Length header not found",
},
"invalid header value": {
headerValue: "invalid",
expectedErr: `convert Content-Length: strconv.ParseInt: parsing "invalid": invalid syntax`,
},
"valid header value": {
headerValue: "12345",
expectedVal: 12345,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
meta := http.Header{}
if testData.headerValue != "" {
meta.Add(alioss.HTTPHeaderContentLength, testData.headerValue)
}

actual, err := ParseContentLength(meta)

if testData.expectedErr != "" {
testutil.NotOk(t, err)
testutil.Equals(t, testData.expectedErr, err.Error())
} else {
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedVal, actual)
}
})
}
}
32 changes: 18 additions & 14 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/mozillazg/go-cos"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/clientutil"
"github.com/thanos-io/thanos/pkg/runutil"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -93,23 +93,27 @@ func (b *Bucket) Name() string {
return b.name
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
resp, err := b.client.Object.Head(ctx, name, nil)
if err != nil {
return 0, err
return objstore.ObjectAttributes{}, err
}
if v, ok := resp.Header["Content-Length"]; ok {
if len(v) == 0 {
return 0, errors.New("content-length header has no values")
}
ret, err := strconv.ParseUint(v[0], 10, 64)
if err != nil {
return 0, errors.Wrap(err, "convert content-length")
}
return ret, nil

size, err := clientutil.ParseContentLength(resp.Header)
if err != nil {
return objstore.ObjectAttributes{}, err
}
return 0, errors.New("content-length header not found")

mod, err := clientutil.ParseLastModified(resp.Header)
if err != nil {
return objstore.ObjectAttributes{}, err
}

return objstore.ObjectAttributes{
Size: size,
LastModified: mod,
}, nil
}

// Upload the contents of the reader as an object into the bucket.
Expand Down
14 changes: 9 additions & 5 deletions pkg/objstore/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,18 @@ func (r *rangeReaderCloser) Close() error {
return r.f.Close()
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
file := filepath.Join(b.rootDir, name)
st, err := os.Stat(file)
stat, err := os.Stat(file)
if err != nil {
return 0, errors.Wrapf(err, "stat %s", file)
return objstore.ObjectAttributes{}, errors.Wrapf(err, "stat %s", file)
}
return uint64(st.Size()), nil

return objstore.ObjectAttributes{
Size: stat.Size(),
LastModified: stat.ModTime(),
}, nil
}

// GetRange returns a new range reader for the given object name and range.
Expand Down
14 changes: 9 additions & 5 deletions pkg/objstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
obj, err := b.bkt.Object(name).Attrs(ctx)
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
attrs, err := b.bkt.Object(name).Attrs(ctx)
if err != nil {
return 0, err
return objstore.ObjectAttributes{}, err
}
return uint64(obj.Size), nil

return objstore.ObjectAttributes{
Size: attrs.Size,
LastModified: attrs.Updated,
}, nil
}

// Handle returns the underlying GCS bucket handle.
Expand Down
22 changes: 16 additions & 6 deletions pkg/objstore/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/pkg/errors"
)
Expand All @@ -22,12 +23,16 @@ var errNotFound = errors.New("inmem: object not found")
type InMemBucket struct {
mtx sync.RWMutex
objects map[string][]byte
attrs map[string]ObjectAttributes
}

// NewInMemBucket returns a new in memory Bucket.
// NOTE: Returned bucket is just a naive in memory bucket implementation. For test use cases only.
func NewInMemBucket() *InMemBucket {
return &InMemBucket{objects: map[string][]byte{}}
return &InMemBucket{
objects: map[string][]byte{},
attrs: map[string]ObjectAttributes{},
}
}

// Objects returns internally stored objects.
Expand Down Expand Up @@ -144,15 +149,15 @@ func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error) {
return ok, nil
}

// ObjectSize returns the size of the specified object.
func (b *InMemBucket) ObjectSize(_ context.Context, name string) (uint64, error) {
// Attributes returns information about the specified object.
func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttributes, error) {
b.mtx.RLock()
file, ok := b.objects[name]
attrs, ok := b.attrs[name]
b.mtx.RUnlock()
if !ok {
return 0, errNotFound
return ObjectAttributes{}, errNotFound
}
return uint64(len(file)), nil
return attrs, nil
}

// Upload writes the file specified in src to into the memory.
Expand All @@ -164,6 +169,10 @@ func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error
return err
}
b.objects[name] = body
b.attrs[name] = ObjectAttributes{
Size: int64(len(body)),
povilasv marked this conversation as resolved.
Show resolved Hide resolved
LastModified: time.Now(),
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand All @@ -175,6 +184,7 @@ func (b *InMemBucket) Delete(_ context.Context, name string) error {
return errNotFound
}
delete(b.objects, name)
delete(b.attrs, name)
return nil
}

Expand Down
Loading