From e56e6f180a3b4a73fbafac3140db61a0603388de Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 May 2024 11:50:07 +1000 Subject: [PATCH 1/5] Add tests for existing behaviour. Signed-off-by: Charles Korn --- objstore_test.go | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/objstore_test.go b/objstore_test.go index ababe62c..8221932f 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -412,7 +412,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { func TestTimingReader(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := bytes.NewReader([]byte("hello world")) - tr := newTimingReader(r, true, "", m.opsDuration, m.opsFailures, func(err error) bool { + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) @@ -438,6 +438,44 @@ func TestTimingReader(t *testing.T) { _, isReaderAt := tr.(io.ReaderAt) testutil.Assert(t, isReaderAt) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +func TestTimingReader_ExpectedError(t *testing.T) { + readerErr := errors.New("something went wrong") + + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{readerErr} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, readerErr, err) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +func TestTimingReader_UnexpectedError(t *testing.T) { + readerErr := errors.New("something went wrong") + + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{readerErr} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, readerErr, err) + + testutil.Equals(t, float64(1), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +type dummyReader struct { + err error +} + +func (r dummyReader) Read(_ []byte) (int, error) { + return 0, r.err } func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) { From f1ab9d81b4979d2ab8bc2b2a1824b36c3c550c9d Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 May 2024 11:51:58 +1000 Subject: [PATCH 2/5] Add failing test Signed-off-by: Charles Korn --- objstore_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/objstore_test.go b/objstore_test.go index 8221932f..f83073c4 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -470,6 +470,18 @@ func TestTimingReader_UnexpectedError(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) } +func TestTimingReader_ContextCancellation(t *testing.T) { + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{context.Canceled} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, context.Canceled, err) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + type dummyReader struct { err error } From 88acb325249ee799563b434116e1144cfc7d3686 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 May 2024 11:55:23 +1000 Subject: [PATCH 3/5] Add fix for issue. Signed-off-by: Charles Korn --- objstore.go | 10 ++++++++-- objstore_test.go | 17 ++++++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/objstore.go b/objstore.go index c9130687..a83f7d2e 100644 --- a/objstore.go +++ b/objstore.go @@ -551,6 +551,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err return nil, err } return newTimingReader( + ctx, rc, true, op, @@ -574,6 +575,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in return nil, err } return newTimingReader( + ctx, rc, true, op, @@ -606,6 +608,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err b.ops.WithLabelValues(op).Inc() trc := newTimingReader( + ctx, r, false, op, @@ -663,6 +666,8 @@ func (b *metricBucket) Name() string { type timingReader struct { io.Reader + ctx context.Context + // closeReader holds whether the wrapper io.Reader should be closed when // Close() is called on the timingReader. closeReader bool @@ -682,7 +687,7 @@ type timingReader struct { transferredBytes *prometheus.HistogramVec } -func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { +func newTimingReader(ctx context.Context, r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) @@ -690,6 +695,7 @@ func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.H trc := timingReader{ Reader: r, + ctx: ctx, closeReader: closeReader, objSize: objSize, objSizeErr: objSizeErr, @@ -756,7 +762,7 @@ func (r *timingReader) Read(b []byte) (n int, err error) { r.readBytes += int64(n) // Report metric just once. if !r.alreadyGotErr && err != nil && err != io.EOF { - if !r.isFailureExpected(err) { + if !r.isFailureExpected(err) && r.ctx.Err() != context.Canceled { r.failed.WithLabelValues(r.op).Inc() } r.alreadyGotErr = true diff --git a/objstore_test.go b/objstore_test.go index f83073c4..35a5d88a 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -412,7 +412,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { func TestTimingReader(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := bytes.NewReader([]byte("hello world")) - tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { + tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) @@ -447,7 +447,7 @@ func TestTimingReader_ExpectedError(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := dummyReader{readerErr} - tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) + tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) @@ -461,7 +461,7 @@ func TestTimingReader_UnexpectedError(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := dummyReader{readerErr} - tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) @@ -471,13 +471,16 @@ func TestTimingReader_UnexpectedError(t *testing.T) { } func TestTimingReader_ContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + m := WrapWithMetrics(NewInMemBucket(), nil, "") - r := dummyReader{context.Canceled} - tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + r := dummyReader{ctx.Err()} + tr := newTimingReader(ctx, r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) - testutil.Equals(t, context.Canceled, err) + testutil.Equals(t, ctx.Err(), err) testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) } @@ -503,7 +506,7 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) { }) m := WrapWithMetrics(NewInMemBucket(), nil, "") - r := newTimingReader(file, true, "", m.opsDuration, m.opsFailures, func(err error) bool { + r := newTimingReader(context.Background(), file, true, "", m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) From 374a7dc6f6a117675f53ed3b3edc8ff72ebe8a21 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 May 2024 12:00:59 +1000 Subject: [PATCH 4/5] Add changelog entry. Signed-off-by: Charles Korn --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5290479..0d0df778 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased ### Fixed +- [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents. - [#115](https://github.com/thanos-io/objstore/pull/115) GCS: Fix creation of bucket with GRPC connections. Also update storage client to `v1.40.0`. - [#102](https://github.com/thanos-io/objstore/pull/102) Azure: bump azblob sdk to get concurrency fixes. - [#33](https://github.com/thanos-io/objstore/pull/33) Tracing: Add `ContextWithTracer()` to inject the tracer into the context. From d44110ffb7e7c1155d3b01940d0a9adf724ff310 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 6 May 2024 17:21:46 +1000 Subject: [PATCH 5/5] Don't pass context to timingReader Signed-off-by: Charles Korn --- objstore.go | 10 ++-------- objstore_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/objstore.go b/objstore.go index a83f7d2e..31c167eb 100644 --- a/objstore.go +++ b/objstore.go @@ -551,7 +551,6 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err return nil, err } return newTimingReader( - ctx, rc, true, op, @@ -575,7 +574,6 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in return nil, err } return newTimingReader( - ctx, rc, true, op, @@ -608,7 +606,6 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err b.ops.WithLabelValues(op).Inc() trc := newTimingReader( - ctx, r, false, op, @@ -666,8 +663,6 @@ func (b *metricBucket) Name() string { type timingReader struct { io.Reader - ctx context.Context - // closeReader holds whether the wrapper io.Reader should be closed when // Close() is called on the timingReader. closeReader bool @@ -687,7 +682,7 @@ type timingReader struct { transferredBytes *prometheus.HistogramVec } -func newTimingReader(ctx context.Context, r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { +func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) @@ -695,7 +690,6 @@ func newTimingReader(ctx context.Context, r io.Reader, closeReader bool, op stri trc := timingReader{ Reader: r, - ctx: ctx, closeReader: closeReader, objSize: objSize, objSizeErr: objSizeErr, @@ -762,7 +756,7 @@ func (r *timingReader) Read(b []byte) (n int, err error) { r.readBytes += int64(n) // Report metric just once. if !r.alreadyGotErr && err != nil && err != io.EOF { - if !r.isFailureExpected(err) && r.ctx.Err() != context.Canceled { + if !r.isFailureExpected(err) && !errors.Is(err, context.Canceled) { r.failed.WithLabelValues(r.op).Inc() } r.alreadyGotErr = true diff --git a/objstore_test.go b/objstore_test.go index 35a5d88a..7d56dfb3 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -412,7 +412,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { func TestTimingReader(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := bytes.NewReader([]byte("hello world")) - tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) @@ -447,7 +447,7 @@ func TestTimingReader_ExpectedError(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := dummyReader{readerErr} - tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) @@ -461,7 +461,7 @@ func TestTimingReader_UnexpectedError(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := dummyReader{readerErr} - tr := newTimingReader(context.Background(), r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) @@ -476,7 +476,7 @@ func TestTimingReader_ContextCancellation(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := dummyReader{ctx.Err()} - tr := newTimingReader(ctx, r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) buf := make([]byte, 1) _, err := io.ReadFull(tr, buf) @@ -506,7 +506,7 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) { }) m := WrapWithMetrics(NewInMemBucket(), nil, "") - r := newTimingReader(context.Background(), file, true, "", m.opsDuration, m.opsFailures, func(err error) bool { + r := newTimingReader(file, true, "", m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes)