From 65aa0d7282029d5832d8a985d453886b26b1f0fa Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 09:00:01 +0200 Subject: [PATCH 1/6] Added Attributes() to BucketReader Signed-off-by: Marco Pracucci --- pkg/objstore/azure/azure.go | 19 ++++++ pkg/objstore/clientutil/parse.go | 49 +++++++++++++++ pkg/objstore/clientutil/parse_test.go | 88 +++++++++++++++++++++++++++ pkg/objstore/cos/cos.go | 41 +++++++++---- pkg/objstore/filesystem/filesystem.go | 14 +++++ pkg/objstore/gcs/gcs.go | 13 ++++ pkg/objstore/inmem.go | 23 ++++++- pkg/objstore/objstore.go | 43 ++++++++++--- pkg/objstore/objstore_test.go | 18 +++--- pkg/objstore/oss/oss.go | 40 +++++++++--- pkg/objstore/s3/s3.go | 13 ++++ pkg/objstore/swift/swift.go | 14 +++++ pkg/objstore/testing.go | 4 ++ 13 files changed, 341 insertions(+), 38 deletions(-) create mode 100644 pkg/objstore/clientutil/parse.go create mode 100644 pkg/objstore/clientutil/parse_test.go diff --git a/pkg/objstore/azure/azure.go b/pkg/objstore/azure/azure.go index 0c132daf61..4abf9e85f0 100644 --- a/pkg/objstore/azure/azure.go +++ b/pkg/objstore/azure/azure.go @@ -245,6 +245,25 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { return uint64(props.ContentLength()), nil } +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name) + } + + var props *blob.BlobGetPropertiesResponse + props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: props.ContentLength(), + LastModified: props.LastModified(), + }, nil +} + // Exists checks if the given object exists. func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name) diff --git a/pkg/objstore/clientutil/parse.go b/pkg/objstore/clientutil/parse.go new file mode 100644 index 0000000000..c7dda67c70 --- /dev/null +++ b/pkg/objstore/clientutil/parse.go @@ -0,0 +1,49 @@ +package clientutil + +import ( + "net/http" + "strconv" + "time" + + "github.com/pkg/errors" +) + +func ParseContentLength(m http.Header) (int64, error) { + name := "Content-Length" + + v, ok := m[name] + if !ok { + return 0, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return 0, errors.Errorf("%s header has no values", name) + } + + ret, err := strconv.ParseInt(v[0], 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "convert %s", name) + } + + return ret, nil +} + +func ParseLastModified(m http.Header) (time.Time, error) { + name := "Last-Modified" + + v, ok := m[name] + if !ok { + return time.Time{}, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return time.Time{}, errors.Errorf("%s header has no values", name) + } + + mod, err := time.Parse(time.RFC3339, v[0]) + if err != nil { + return time.Time{}, errors.Wrapf(err, "parse %s", name) + } + + return mod, nil +} diff --git a/pkg/objstore/clientutil/parse_test.go b/pkg/objstore/clientutil/parse_test.go new file mode 100644 index 0000000000..ca047da0cd --- /dev/null +++ b/pkg/objstore/clientutil/parse_test.go @@ -0,0 +1,88 @@ +package clientutil + +import ( + "net/http" + "testing" + "time" + + alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestParseLastModified(t *testing.T) { + tests := map[string]struct { + headerValue string + expectedVal time.Time + expectedErr string + }{ + "no header": { + expectedErr: "Last-Modified header not found", + }, + "invalid header value": { + headerValue: "invalid", + expectedErr: `parse Last-Modified: parsing time "invalid" as "2006-01-02T15:04:05Z07:00": cannot parse "invalid" as "2006"`, + }, + "valid header value": { + headerValue: "2015-11-06T10:07:11.000Z", + expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + meta := http.Header{} + if testData.headerValue != "" { + meta.Add(alioss.HTTPHeaderLastModified, testData.headerValue) + } + + actual, err := ParseLastModified(meta) + + if testData.expectedErr != "" { + testutil.NotOk(t, err) + testutil.Equals(t, testData.expectedErr, err.Error()) + } else { + testutil.Ok(t, err) + testutil.Equals(t, testData.expectedVal, actual) + } + }) + } +} + +func TestParseContentLength(t *testing.T) { + tests := map[string]struct { + headerValue string + expectedVal int64 + expectedErr string + }{ + "no header": { + expectedErr: "Content-Length header not found", + }, + "invalid header value": { + headerValue: "invalid", + expectedErr: `convert Content-Length: strconv.ParseInt: parsing "invalid": invalid syntax`, + }, + "valid header value": { + headerValue: "12345", + expectedVal: 12345, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + meta := http.Header{} + if testData.headerValue != "" { + meta.Add(alioss.HTTPHeaderContentLength, testData.headerValue) + } + + actual, err := ParseContentLength(meta) + + if testData.expectedErr != "" { + testutil.NotOk(t, err) + testutil.Equals(t, testData.expectedErr, err.Error()) + } else { + testutil.Ok(t, err) + testutil.Equals(t, testData.expectedVal, actual) + } + }) + } +} diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index 6db898f6ca..ec85d41129 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "os" - "strconv" "strings" "testing" @@ -17,6 +16,7 @@ import ( "github.com/mozillazg/go-cos" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/clientutil" "github.com/thanos-io/thanos/pkg/runutil" "gopkg.in/yaml.v2" ) @@ -99,17 +99,36 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { if err != nil { return 0, err } - if v, ok := resp.Header["Content-Length"]; ok { - if len(v) == 0 { - return 0, errors.New("content-length header has no values") - } - ret, err := strconv.ParseUint(v[0], 10, 64) - if err != nil { - return 0, errors.Wrap(err, "convert content-length") - } - return ret, nil + + ret, err := clientutil.ParseContentLength(resp.Header) + if err != nil { + return 0, err + } + + return uint64(ret), err +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + resp, err := b.client.Object.Head(ctx, name, nil) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + size, err := clientutil.ParseContentLength(resp.Header) + if err != nil { + return objstore.ObjectAttributes{}, err } - return 0, errors.New("content-length header not found") + + mod, err := clientutil.ParseLastModified(resp.Header) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: size, + LastModified: mod, + }, nil } // Upload the contents of the reader as an object into the bucket. diff --git a/pkg/objstore/filesystem/filesystem.go b/pkg/objstore/filesystem/filesystem.go index 0a13994d93..4bb8d33048 100644 --- a/pkg/objstore/filesystem/filesystem.go +++ b/pkg/objstore/filesystem/filesystem.go @@ -117,6 +117,20 @@ func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { return uint64(st.Size()), nil } +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + file := filepath.Join(b.rootDir, name) + stat, err := os.Stat(file) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "stat %s", file) + } + + return objstore.ObjectAttributes{ + Size: stat.Size(), + LastModified: stat.ModTime(), + }, nil +} + // GetRange returns a new range reader for the given object name and range. func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) { if name == "" { diff --git a/pkg/objstore/gcs/gcs.go b/pkg/objstore/gcs/gcs.go index ae2cf3202b..2b77df2ec7 100644 --- a/pkg/objstore/gcs/gcs.go +++ b/pkg/objstore/gcs/gcs.go @@ -134,6 +134,19 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { return uint64(obj.Size), nil } +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + attrs, err := b.bkt.Object(name).Attrs(ctx) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: attrs.Size, + LastModified: attrs.Updated, + }, nil +} + // Handle returns the underlying GCS bucket handle. // Used for testing purposes (we return handle, so it is not instrumented). func (b *Bucket) Handle() *storage.BucketHandle { diff --git a/pkg/objstore/inmem.go b/pkg/objstore/inmem.go index 0233f795f0..0b2114212e 100644 --- a/pkg/objstore/inmem.go +++ b/pkg/objstore/inmem.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/pkg/errors" ) @@ -22,12 +23,16 @@ var errNotFound = errors.New("inmem: object not found") type InMemBucket struct { mtx sync.RWMutex objects map[string][]byte + attrs map[string]ObjectAttributes } // NewInMemBucket returns a new in memory Bucket. // NOTE: Returned bucket is just a naive in memory bucket implementation. For test use cases only. func NewInMemBucket() *InMemBucket { - return &InMemBucket{objects: map[string][]byte{}} + return &InMemBucket{ + objects: map[string][]byte{}, + attrs: map[string]ObjectAttributes{}, + } } // Objects returns internally stored objects. @@ -155,6 +160,17 @@ func (b *InMemBucket) ObjectSize(_ context.Context, name string) (uint64, error) return uint64(len(file)), nil } +// Attributes returns information about the specified object. +func (b *InMemBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { + b.mtx.RLock() + attrs, ok := b.attrs[name] + b.mtx.RUnlock() + if !ok { + return ObjectAttributes{}, errNotFound + } + return attrs, nil +} + // Upload writes the file specified in src to into the memory. func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error { b.mtx.Lock() @@ -164,6 +180,10 @@ func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error return err } b.objects[name] = body + b.attrs[name] = ObjectAttributes{ + Size: int64(len(body)), + LastModified: time.Now(), + } return nil } @@ -175,6 +195,7 @@ func (b *InMemBucket) Delete(_ context.Context, name string) error { return errNotFound } delete(b.objects, name) + delete(b.attrs, name) return nil } diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index c1bb44d079..b91b90da29 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -72,6 +72,9 @@ type BucketReader interface { // ObjectSize returns the size of the specified object. ObjectSize(ctx context.Context, name string) (uint64, error) + + // Attributes returns information about the specified object. + Attributes(ctx context.Context, name string) (ObjectAttributes, error) } // InstrumentedBucket is a BucketReader with optional instrumentation control. @@ -83,6 +86,14 @@ type InstrumentedBucketReader interface { ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader } +type ObjectAttributes struct { + // Size is the object size in bytes. + Size int64 + + // LastModified is the timestamp the object was last modified. + LastModified time.Time +} + // TryToGetSize tries to get upfront size from reader. // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. func TryToGetSize(r io.Reader) (int64, error) { @@ -211,13 +222,14 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, } const ( - iterOp = "iter" - sizeOp = "objectsize" - getOp = "get" - getRangeOp = "get_range" - existsOp = "exists" - uploadOp = "upload" - deleteOp = "delete" + iterOp = "iter" + sizeOp = "objectsize" + getOp = "get" + getRangeOp = "get_range" + existsOp = "exists" + uploadOp = "upload" + deleteOp = "delete" + attributesOp = "attributes" ) // IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment thanos_objstore_bucket_operation_failures_total metric. @@ -262,6 +274,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric existsOp, uploadOp, deleteOp, + attributesOp, } { bkt.ops.WithLabelValues(op) bkt.opsFailures.WithLabelValues(op) @@ -324,6 +337,22 @@ func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, err return rc, nil } +func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { + const op = attributesOp + b.ops.WithLabelValues(op).Inc() + + start := time.Now() + attrs, err := b.bkt.Attributes(ctx, name) + if err != nil { + if !b.isOpFailureExpected(err) { + b.opsFailures.WithLabelValues(op).Inc() + } + return attrs, err + } + b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + return attrs, nil +} + func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { const op = getOp b.ops.WithLabelValues(op).Inc() diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index f78d8550d9..dea5e6aa09 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -13,9 +13,9 @@ import ( func TestMetricBucket_Close(t *testing.T) { bkt := BucketWithMetrics("abc", NewInMemBucket(), nil) // Expected initialized metrics. - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr)) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp))) @@ -25,7 +25,7 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp))) testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp))) @@ -33,8 +33,8 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime) testutil.Assert(t, lastUpload > 0, "last upload not greater than 0, val: %f", lastUpload) @@ -48,7 +48,7 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) // Not expected not found error here. testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp))) @@ -58,7 +58,7 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) } diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 5c081d7a5a..2d0d4d7853 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/clientutil" "gopkg.in/yaml.v2" ) @@ -137,17 +138,36 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { if err != nil { return 0, err } - if v, ok := m["Content-Length"]; ok { - if len(v) == 0 { - return 0, errors.New("content-length header has no values") - } - ret, err := strconv.ParseUint(v[0], 10, 64) - if err != nil { - return 0, errors.Wrap(err, "convert content-length") - } - return ret, nil + + ret, err := clientutil.ParseContentLength(m) + if err != nil { + return 0, err + } + + return uint64(ret), err +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + m, err := b.bucket.GetObjectMeta(name) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + size, err := clientutil.ParseContentLength(m) + if err != nil { + return objstore.ObjectAttributes{}, err } - return 0, errors.New("content-length header not found") + + mod, err := clientutil.ParseLastModified(m) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: size, + LastModified: mod, + }, nil } // NewBucket returns a new Bucket using the provided oss config values. diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index 285bebaf7d..017915fcad 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -341,6 +341,19 @@ func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { return uint64(objInfo.Size), nil } +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { + objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{}) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: objInfo.Size, + LastModified: objInfo.LastModified, + }, nil +} + // Delete removes the object with the given name. func (b *Bucket) Delete(_ context.Context, name string) error { return b.client.RemoveObject(b.name, name) diff --git a/pkg/objstore/swift/swift.go b/pkg/objstore/swift/swift.go index 676c3f7d59..3ab441d7d2 100644 --- a/pkg/objstore/swift/swift.go +++ b/pkg/objstore/swift/swift.go @@ -138,6 +138,20 @@ func (c *Container) ObjectSize(ctx context.Context, name string) (uint64, error) return uint64(headers.ContentLength), nil } +// Attributes returns information about the specified object. +func (c *Container) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + response := objects.Get(c.client, c.name, name, nil) + headers, err := response.Extract() + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: headers.ContentLength, + LastModified: headers.LastModified, + }, nil +} + // Exists checks if the given object exists. func (c *Container) Exists(ctx context.Context, name string) (bool, error) { err := objects.Get(c.client, c.name, name, nil).Err diff --git a/pkg/objstore/testing.go b/pkg/objstore/testing.go index a33475db09..2069131112 100644 --- a/pkg/objstore/testing.go +++ b/pkg/objstore/testing.go @@ -98,6 +98,10 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.NotOk(t, err) testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err) + _, err = bkt.Attributes(ctx, "id1/obj_1.some") + testutil.NotOk(t, err) + testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err) + // Upload first object. testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@"))) From 784efddd96dc6c569747775e6d6711ca1c47e903 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 09:41:06 +0200 Subject: [PATCH 2/6] Removed ObjectSize() from BucketReader Signed-off-by: Marco Pracucci --- docs/components/store.md | 4 ++-- pkg/block/indexheader/binary_reader.go | 6 +++--- pkg/objstore/azure/azure.go | 14 -------------- pkg/objstore/cos/cos.go | 15 --------------- pkg/objstore/filesystem/filesystem.go | 10 ---------- pkg/objstore/gcs/gcs.go | 9 --------- pkg/objstore/inmem.go | 13 +------------ pkg/objstore/objstore.go | 25 ++----------------------- pkg/objstore/objstore_test.go | 26 +++++++++++++------------- pkg/objstore/oss/oss.go | 16 ---------------- pkg/objstore/s3/s3.go | 9 --------- pkg/objstore/swift/swift.go | 10 ---------- pkg/objstore/testing.go | 8 ++------ 13 files changed, 23 insertions(+), 142 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 20d6a0b83a..5912d11101 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -292,7 +292,7 @@ config: chunk_subrange_size: 16000 max_chunks_get_range_requests: 3 -chunk_object_size_ttl: 24h +chunk_object_attrs_ttl: 24h chunk_subrange_ttl: 24h blocks_iter_ttl: 5m metafile_exists_ttl: 2h @@ -307,7 +307,7 @@ Additional options to configure various aspects of chunks cache are available: - `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. - `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges. -- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache. +- `chunk_object_attrs_ttl`: how long to keep information about chunk file attributes (ie. size) in the cache. - `chunk_subrange_ttl`: how long to keep individual subranges in the cache. Following options are used for metadata caching (meta.json files, deletion mark files, iteration result): diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index a070d9275b..0687e74d5d 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -131,9 +131,9 @@ type chunkedIndexReader struct { func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID) (*chunkedIndexReader, int, error) { indexFilepath := filepath.Join(id.String(), block.IndexFilename) - size, err := bkt.ObjectSize(ctx, indexFilepath) + attrs, err := bkt.Attributes(ctx, indexFilepath) if err != nil { - return nil, 0, errors.Wrapf(err, "get object size of %s", indexFilepath) + return nil, 0, errors.Wrapf(err, "get object attributes of %s", indexFilepath) } rc, err := bkt.GetRange(ctx, indexFilepath, 0, index.HeaderLen) @@ -164,7 +164,7 @@ func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ul ir := &chunkedIndexReader{ ctx: ctx, path: indexFilepath, - size: size, + size: uint64(attrs.Size), bkt: bkt, } diff --git a/pkg/objstore/azure/azure.go b/pkg/objstore/azure/azure.go index 4abf9e85f0..d40260ed18 100644 --- a/pkg/objstore/azure/azure.go +++ b/pkg/objstore/azure/azure.go @@ -231,20 +231,6 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) ( return b.getBlobReader(ctx, name, off, length) } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - blobURL, err := getBlobURL(ctx, *b.config, name) - if err != nil { - return 0, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name) - } - var props *blob.BlobGetPropertiesResponse - props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}) - if err != nil { - return 0, err - } - return uint64(props.ContentLength()), nil -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { blobURL, err := getBlobURL(ctx, *b.config, name) diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index ec85d41129..5514d8b1c6 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -93,21 +93,6 @@ func (b *Bucket) Name() string { return b.name } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - resp, err := b.client.Object.Head(ctx, name, nil) - if err != nil { - return 0, err - } - - ret, err := clientutil.ParseContentLength(resp.Header) - if err != nil { - return 0, err - } - - return uint64(ret), err -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { resp, err := b.client.Object.Head(ctx, name, nil) diff --git a/pkg/objstore/filesystem/filesystem.go b/pkg/objstore/filesystem/filesystem.go index 4bb8d33048..3a42d0ccd3 100644 --- a/pkg/objstore/filesystem/filesystem.go +++ b/pkg/objstore/filesystem/filesystem.go @@ -107,16 +107,6 @@ func (r *rangeReaderCloser) Close() error { return r.f.Close() } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { - file := filepath.Join(b.rootDir, name) - st, err := os.Stat(file) - if err != nil { - return 0, errors.Wrapf(err, "stat %s", file) - } - return uint64(st.Size()), nil -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { file := filepath.Join(b.rootDir, name) diff --git a/pkg/objstore/gcs/gcs.go b/pkg/objstore/gcs/gcs.go index 2b77df2ec7..81522b85b9 100644 --- a/pkg/objstore/gcs/gcs.go +++ b/pkg/objstore/gcs/gcs.go @@ -125,15 +125,6 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) ( return b.bkt.Object(name).NewRangeReader(ctx, off, length) } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - obj, err := b.bkt.Object(name).Attrs(ctx) - if err != nil { - return 0, err - } - return uint64(obj.Size), nil -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { attrs, err := b.bkt.Object(name).Attrs(ctx) diff --git a/pkg/objstore/inmem.go b/pkg/objstore/inmem.go index 0b2114212e..5e7cb232cd 100644 --- a/pkg/objstore/inmem.go +++ b/pkg/objstore/inmem.go @@ -149,19 +149,8 @@ func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error) { return ok, nil } -// ObjectSize returns the size of the specified object. -func (b *InMemBucket) ObjectSize(_ context.Context, name string) (uint64, error) { - b.mtx.RLock() - file, ok := b.objects[name] - b.mtx.RUnlock() - if !ok { - return 0, errNotFound - } - return uint64(len(file)), nil -} - // Attributes returns information about the specified object. -func (b *InMemBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { +func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttributes, error) { b.mtx.RLock() attrs, ok := b.attrs[name] b.mtx.RUnlock() diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index b91b90da29..f5a436cc2c 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -70,9 +70,6 @@ type BucketReader interface { // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. IsObjNotFoundErr(err error) bool - // ObjectSize returns the size of the specified object. - ObjectSize(ctx context.Context, name string) (uint64, error) - // Attributes returns information about the specified object. Attributes(ctx context.Context, name string) (ObjectAttributes, error) } @@ -88,10 +85,10 @@ type InstrumentedBucketReader interface { type ObjectAttributes struct { // Size is the object size in bytes. - Size int64 + Size int64 `json:"size"` // LastModified is the timestamp the object was last modified. - LastModified time.Time + LastModified time.Time `json:"last_modified"` } // TryToGetSize tries to get upfront size from reader. @@ -223,7 +220,6 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, const ( iterOp = "iter" - sizeOp = "objectsize" getOp = "get" getRangeOp = "get_range" existsOp = "exists" @@ -268,7 +264,6 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric } for _, op := range []string{ iterOp, - sizeOp, getOp, getRangeOp, existsOp, @@ -321,22 +316,6 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) return err } -func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - const op = sizeOp - b.ops.WithLabelValues(op).Inc() - - start := time.Now() - rc, err := b.bkt.ObjectSize(ctx, name) - if err != nil { - if !b.isOpFailureExpected(err) { - b.opsFailures.WithLabelValues(op).Inc() - } - return 0, err - } - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) - return rc, nil -} - func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { const op = attributesOp b.ops.WithLabelValues(op).Inc() diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index dea5e6aa09..c007cccc74 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -13,28 +13,28 @@ import ( func TestMetricBucket_Close(t *testing.T) { bkt := BucketWithMetrics("abc", NewInMemBucket(), nil) // Expected initialized metrics. - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr)) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp))) - testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(sizeOp))) + testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp))) testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp))) testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp))) testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp))) testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime) testutil.Assert(t, lastUpload > 0, "last upload not greater than 0, val: %f", lastUpload) @@ -42,23 +42,23 @@ func TestMetricBucket_Close(t *testing.T) { bkt.bkt = NewInMemBucket() AcceptanceTest(t, bkt) testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp))) - testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(sizeOp))) + testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp))) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp))) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp))) testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.ops)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) // Not expected not found error here. - testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp))) + testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp))) // Not expected not found errors, this should increment failure metric on get for not found as well, so +2. testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsFailures)) - testutil.Equals(t, 8, promtest.CollectAndCount(bkt.opsDuration)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) + testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) } diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go index 2d0d4d7853..2ef0cee99e 100644 --- a/pkg/objstore/oss/oss.go +++ b/pkg/objstore/oss/oss.go @@ -131,22 +131,6 @@ func (b *Bucket) Delete(ctx context.Context, name string) error { return nil } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - // refer to https://github.com/aliyun/aliyun-oss-go-sdk/blob/cee409f5b4d75d7ad077cacb7e6f4590a7f2e172/oss/bucket.go#L668. - m, err := b.bucket.GetObjectMeta(name) - if err != nil { - return 0, err - } - - ret, err := clientutil.ParseContentLength(m) - if err != nil { - return 0, err - } - - return uint64(ret), err -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { m, err := b.bucket.GetObjectMeta(name) diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index 017915fcad..84a6ed6644 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -332,15 +332,6 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { return nil } -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { - objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{}) - if err != nil { - return 0, err - } - return uint64(objInfo.Size), nil -} - // Attributes returns information about the specified object. func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{}) diff --git a/pkg/objstore/swift/swift.go b/pkg/objstore/swift/swift.go index 3ab441d7d2..c11715ce4f 100644 --- a/pkg/objstore/swift/swift.go +++ b/pkg/objstore/swift/swift.go @@ -128,16 +128,6 @@ func (c *Container) GetRange(ctx context.Context, name string, off, length int64 return response.Body, response.Err } -// ObjectSize returns the size of the specified object. -func (c *Container) ObjectSize(ctx context.Context, name string) (uint64, error) { - response := objects.Get(c.client, c.name, name, nil) - headers, err := response.Extract() - if err != nil { - return 0, err - } - return uint64(headers.ContentLength), nil -} - // Attributes returns information about the specified object. func (c *Container) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { response := objects.Get(c.client, c.name, name, nil) diff --git a/pkg/objstore/testing.go b/pkg/objstore/testing.go index 2069131112..70e17923e7 100644 --- a/pkg/objstore/testing.go +++ b/pkg/objstore/testing.go @@ -94,10 +94,6 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, err) testutil.Assert(t, !ok, "expected not exits") - _, err = bkt.ObjectSize(ctx, "id1/obj_1.some") - testutil.NotOk(t, err) - testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err) - _, err = bkt.Attributes(ctx, "id1/obj_1.some") testutil.NotOk(t, err) testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err) @@ -114,9 +110,9 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Equals(t, "@test-data@", string(content)) // Check if we can get the correct size. - sz, err := bkt.ObjectSize(ctx, "id1/obj_1.some") + attrs, err := bkt.Attributes(ctx, "id1/obj_1.some") testutil.Ok(t, err) - testutil.Assert(t, sz == 11, "expected size to be equal to 11") + testutil.Assert(t, attrs.Size == 11, "expected size to be equal to 11") rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3) testutil.Ok(t, err) From 0be28e23d25a5ad097ae56e7d3c9b251ce058639 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 09:57:49 +0200 Subject: [PATCH 3/6] Added copyright header Signed-off-by: Marco Pracucci --- pkg/objstore/clientutil/parse.go | 3 +++ pkg/objstore/clientutil/parse_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/objstore/clientutil/parse.go b/pkg/objstore/clientutil/parse.go index c7dda67c70..3cc7a4c9bd 100644 --- a/pkg/objstore/clientutil/parse.go +++ b/pkg/objstore/clientutil/parse.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package clientutil import ( diff --git a/pkg/objstore/clientutil/parse_test.go b/pkg/objstore/clientutil/parse_test.go index ca047da0cd..45d51089a4 100644 --- a/pkg/objstore/clientutil/parse_test.go +++ b/pkg/objstore/clientutil/parse_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package clientutil import ( From e8b17f596c9d8b37c05e067fbd62ddb5b4d03424 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 18 May 2020 14:50:40 +0200 Subject: [PATCH 4/6] Fixes after rebase Signed-off-by: Marco Pracucci --- pkg/store/cache/caching_bucket.go | 65 +++++++++++++----------- pkg/store/cache/caching_bucket_config.go | 26 +++++----- pkg/store/cache/caching_bucket_test.go | 10 ++-- 3 files changed, 54 insertions(+), 47 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index c725719a73..13c6928dcd 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -6,7 +6,6 @@ package storecache import ( "bytes" "context" - "encoding/binary" "encoding/json" "fmt" "io" @@ -36,7 +35,7 @@ const ( opGetRange = "getrange" opIter = "iter" opExists = "exists" - opObjectSize = "objectsize" + opAttributes = "attributes" ) var errObjNotFound = errors.Errorf("object not found") @@ -291,50 +290,58 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length return r, err } -func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - cfgName, cfg := cb.cfg.findObjectSizeConfig(name) +func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + cfgName, cfg := cb.cfg.findAttributesConfig(name) if cfg == nil { - return cb.Bucket.ObjectSize(ctx, name) + return cb.Bucket.Attributes(ctx, name) } - return cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.ttl) + return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl) } -func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (uint64, error) { - key := cachingKeyObjectSize(name) +func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { + key := cachingKeyAttributes(name) - cb.operationRequests.WithLabelValues(opObjectSize, cfgName).Inc() + cb.operationRequests.WithLabelValues(opAttributes, cfgName).Inc() hits := cache.Fetch(ctx, []string{key}) - if s := hits[key]; len(s) == 8 { - cb.operationHits.WithLabelValues(opObjectSize, cfgName).Inc() - return binary.BigEndian.Uint64(s), nil + if raw, ok := hits[key]; ok { + var attrs objstore.ObjectAttributes + err := json.Unmarshal(raw, &attrs) + if err == nil { + cb.operationHits.WithLabelValues(opAttributes, cfgName).Inc() + return attrs, nil + } + + level.Warn(cb.logger).Log("msg", "failed to decode cached Attributes result", "key", key, "err", err) } - size, err := cb.Bucket.ObjectSize(ctx, name) + attrs, err := cb.Bucket.Attributes(ctx, name) if err != nil { - return 0, err + return objstore.ObjectAttributes{}, err } - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], size) - cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) + if raw, err := json.Marshal(attrs); err == nil { + cache.Store(ctx, map[string][]byte{key: raw}, ttl) + } else { + level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err) + } - return size, nil + return attrs, nil } func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc() cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length)) - size, err := cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.objectSizeTTL) + attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.attributesTTL) if err != nil { - return nil, errors.Wrapf(err, "failed to get size of object: %s", name) + return nil, errors.Wrapf(err, "failed to get object attributes: %s", name) } // If length goes over object size, adjust length. We use it later to limit number of read bytes. - if uint64(offset+length) > size { - length = int64(size - uint64(offset)) + if offset+length > attrs.Size { + length = attrs.Size - offset } // Start and end range are subrange-aligned offsets into object, that we're going to read. @@ -347,9 +354,9 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset // The very last subrange in the object may have length that is not divisible by subrange size. lastSubrangeOffset := endRange - cfg.subrangeSize lastSubrangeLength := int(cfg.subrangeSize) - if uint64(endRange) > size { - lastSubrangeOffset = (int64(size) / cfg.subrangeSize) * cfg.subrangeSize - lastSubrangeLength = int(int64(size) - lastSubrangeOffset) + if endRange > attrs.Size { + lastSubrangeOffset = (attrs.Size / cfg.subrangeSize) * cfg.subrangeSize + lastSubrangeLength = int(attrs.Size - lastSubrangeOffset) } numSubranges := (endRange - startRange) / cfg.subrangeSize @@ -360,8 +367,8 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset totalRequestedBytes := int64(0) for off := startRange; off < endRange; off += cfg.subrangeSize { end := off + cfg.subrangeSize - if end > int64(size) { - end = int64(size) + if end > attrs.Size { + end = attrs.Size } totalRequestedBytes += (end - off) @@ -489,8 +496,8 @@ func mergeRanges(input []rng, limit int64) []rng { return input[:last+1] } -func cachingKeyObjectSize(name string) string { - return fmt.Sprintf("size:%s", name) +func cachingKeyAttributes(name string) string { + return fmt.Sprintf("attrs:%s", name) } func cachingKeyObjectSubrange(name string, start int64, end int64) string { diff --git a/pkg/store/cache/caching_bucket_config.go b/pkg/store/cache/caching_bucket_config.go index dce0350fdf..fbc19bfaf7 100644 --- a/pkg/store/cache/caching_bucket_config.go +++ b/pkg/store/cache/caching_bucket_config.go @@ -23,7 +23,7 @@ type CachingBucketConfig struct { iter map[string]*iterConfig exists map[string]*existsConfig getRange map[string]*getRangeConfig - objectSize map[string]*objectSizeConfig + attributes map[string]*attributesConfig } func NewCachingBucketConfig() *CachingBucketConfig { @@ -32,7 +32,7 @@ func NewCachingBucketConfig() *CachingBucketConfig { iter: map[string]*iterConfig{}, exists: map[string]*existsConfig{}, getRange: map[string]*getRangeConfig{}, - objectSize: map[string]*objectSizeConfig{}, + attributes: map[string]*attributesConfig{}, } } @@ -65,11 +65,11 @@ type getRangeConfig struct { operationConfig subrangeSize int64 maxSubRequests int - objectSizeTTL time.Duration + attributesTTL time.Duration subrangeTTL time.Duration } -type objectSizeConfig struct { +type attributesConfig struct { operationConfig ttl time.Duration } @@ -124,19 +124,19 @@ func (cfg *CachingBucketConfig) CacheExists(configName string, cache cache.Cache // Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket. // MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests // for adjacent missing subranges are still merged). -func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, objectSizeTTL, subrangeTTL time.Duration, maxSubRequests int) { +func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, attributesTTL, subrangeTTL time.Duration, maxSubRequests int) { cfg.getRange[configName] = &getRangeConfig{ operationConfig: newOperationConfig(cache, matcher), subrangeSize: subrangeSize, - objectSizeTTL: objectSizeTTL, + attributesTTL: attributesTTL, subrangeTTL: subrangeTTL, maxSubRequests: maxSubRequests, } } -// CacheObjectSize configures caching of "ObjectSize" operation for matching files. -func (cfg *CachingBucketConfig) CacheObjectSize(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) { - cfg.objectSize[configName] = &objectSizeConfig{ +// CacheAttributes configures caching of "Attributes" operation for matching files. +func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) { + cfg.attributes[configName] = &attributesConfig{ operationConfig: newOperationConfig(cache, matcher), ttl: ttl, } @@ -156,8 +156,8 @@ func (cfg *CachingBucketConfig) allConfigNames() map[string][]string { for n := range cfg.getRange { result[opGetRange] = append(result[opGetRange], n) } - for n := range cfg.objectSize { - result[opObjectSize] = append(result[opObjectSize], n) + for n := range cfg.attributes { + result[opAttributes] = append(result[opAttributes], n) } return result } @@ -198,8 +198,8 @@ func (cfg *CachingBucketConfig) findGetRangeConfig(name string) (string, *getRan return "", nil } -func (cfg *CachingBucketConfig) findObjectSizeConfig(name string) (string, *objectSizeConfig) { - for n, cfg := range cfg.objectSize { +func (cfg *CachingBucketConfig) findAttributesConfig(name string) (string, *attributesConfig) { + for n, cfg := range cfg.attributes { if cfg.matcher(name) { return n, cfg } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index d392d9066e..92f7516cd0 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -620,7 +620,7 @@ func TestObjectSize(t *testing.T) { cfg := NewCachingBucketConfig() const cfgName = "test" - cfg.CacheObjectSize(cfgName, cache, matchAll, time.Minute) + cfg.CacheAttributes(cfgName, cache, matchAll, time.Minute) cb, err := NewCachingBucket(inmem, cfg, nil, nil) testutil.Ok(t, err) @@ -637,16 +637,16 @@ func TestObjectSize(t *testing.T) { func verifyObjectSize(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { t.Helper() - hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName))) - length, err := cb.ObjectSize(context.Background(), file) + attrs, err := cb.Attributes(context.Background(), file) if expectedLength < 0 { testutil.Assert(t, cb.IsObjNotFoundErr(err)) } else { testutil.Ok(t, err) - testutil.Equals(t, uint64(expectedLength), length) + testutil.Equals(t, int64(expectedLength), attrs.Size) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName))) if cacheUsed { testutil.Equals(t, 1, hitsAfter-hitsBefore) } else { From e7d0d6d06df619cee193195b9900665f8a9e40a2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 18 May 2020 14:54:52 +0200 Subject: [PATCH 5/6] Fixed config Signed-off-by: Marco Pracucci --- pkg/store/cache/caching_bucket_factory.go | 8 ++++---- pkg/store/cache/caching_bucket_test.go | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index 04c7c1507e..e991ec548b 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -38,8 +38,8 @@ type CachingWithBackendConfig struct { MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` // TTLs for various cache items. - ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` - ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` + ChunkObjectAttrsTTL time.Duration `yaml:"chunk_object_attrs_ttl"` + ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` // How long to cache result of Iter call in root directory. BlocksIterTTL time.Duration `yaml:"blocks_iter_ttl"` @@ -53,7 +53,7 @@ type CachingWithBackendConfig struct { func (cfg *CachingWithBackendConfig) Defaults() { cfg.ChunkSubrangeSize = 16000 // Equal to max chunk size. - cfg.ChunkObjectSizeTTL = 24 * time.Hour + cfg.ChunkObjectAttrsTTL = 24 * time.Hour cfg.ChunkSubrangeTTL = 24 * time.Hour cfg.MaxChunksGetRangeRequests = 3 cfg.BlocksIterTTL = 5 * time.Minute @@ -96,7 +96,7 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger cfg := NewCachingBucketConfig() // Configure cache. - cfg.CacheGetRange("chunks", c, isTSDBChunkFile, config.ChunkSubrangeSize, config.ChunkObjectSizeTTL, config.ChunkSubrangeTTL, config.MaxChunksGetRangeRequests) + cfg.CacheGetRange("chunks", c, isTSDBChunkFile, config.ChunkSubrangeSize, config.ChunkObjectAttrsTTL, config.ChunkSubrangeTTL, config.MaxChunksGetRangeRequests) cfg.CacheExists("meta.jsons", c, isMetaFile, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) cfg.CacheGet("meta.jsons", c, isMetaFile, int(config.MetafileMaxSize), config.MetafileContentTTL, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 92f7516cd0..ce25cadacc 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -612,7 +612,7 @@ func verifyGet(t *testing.T, cb *CachingBucket, file string, expectedData []byte } } -func TestObjectSize(t *testing.T) { +func TestAttributes(t *testing.T) { inmem := objstore.NewInMemBucket() // We reuse cache between tests (!) @@ -625,17 +625,17 @@ func TestObjectSize(t *testing.T) { cb, err := NewCachingBucket(inmem, cfg, nil, nil) testutil.Ok(t, err) - verifyObjectSize(t, cb, testFilename, -1, false, cfgName) - verifyObjectSize(t, cb, testFilename, -1, false, cfgName) // ObjectSize doesn't cache non-existent files. + verifyObjectAttrs(t, cb, testFilename, -1, false, cfgName) + verifyObjectAttrs(t, cb, testFilename, -1, false, cfgName) // Attributes doesn't cache non-existent files. data := []byte("hello world") testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) - verifyObjectSize(t, cb, testFilename, len(data), false, cfgName) - verifyObjectSize(t, cb, testFilename, len(data), true, cfgName) + verifyObjectAttrs(t, cb, testFilename, len(data), false, cfgName) + verifyObjectAttrs(t, cb, testFilename, len(data), true, cfgName) } -func verifyObjectSize(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { +func verifyObjectAttrs(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { t.Helper() hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName))) From 001bbf7a759bb9c235f46953722f7a355057edde Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 21 May 2020 09:53:20 +0200 Subject: [PATCH 6/6] Addressed review comments Signed-off-by: Marco Pracucci --- docs/components/store.md | 2 +- pkg/objstore/clientutil/parse.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 5912d11101..8aee4429d2 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -307,7 +307,7 @@ Additional options to configure various aspects of chunks cache are available: - `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. - `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges. -- `chunk_object_attrs_ttl`: how long to keep information about chunk file attributes (ie. size) in the cache. +- `chunk_object_attrs_ttl`: how long to keep information about chunk file attributes (e.g. size) in the cache. - `chunk_subrange_ttl`: how long to keep individual subranges in the cache. Following options are used for metadata caching (meta.json files, deletion mark files, iteration result): diff --git a/pkg/objstore/clientutil/parse.go b/pkg/objstore/clientutil/parse.go index 3cc7a4c9bd..f6d94c931e 100644 --- a/pkg/objstore/clientutil/parse.go +++ b/pkg/objstore/clientutil/parse.go @@ -11,8 +11,10 @@ import ( "github.com/pkg/errors" ) +// ParseContentLength returns the content length (in bytes) parsed from the Content-Length +// HTTP header in input. func ParseContentLength(m http.Header) (int64, error) { - name := "Content-Length" + const name = "Content-Length" v, ok := m[name] if !ok { @@ -31,8 +33,10 @@ func ParseContentLength(m http.Header) (int64, error) { return ret, nil } +// ParseLastModified returns the timestamp parsed from the Last-Modified +// HTTP header in input (expected to be in the RFC3339 format). func ParseLastModified(m http.Header) (time.Time, error) { - name := "Last-Modified" + const name = "Last-Modified" v, ok := m[name] if !ok {