diff --git a/CHANGELOG.md b/CHANGELOG.md index a529047..0d0df77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased ### Fixed +- [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents. - [#115](https://github.com/thanos-io/objstore/pull/115) GCS: Fix creation of bucket with GRPC connections. Also update storage client to `v1.40.0`. - [#102](https://github.com/thanos-io/objstore/pull/102) Azure: bump azblob sdk to get concurrency fixes. - [#33](https://github.com/thanos-io/objstore/pull/33) Tracing: Add `ContextWithTracer()` to inject the tracer into the context. diff --git a/objstore.go b/objstore.go index c913068..31c167e 100644 --- a/objstore.go +++ b/objstore.go @@ -756,7 +756,7 @@ func (r *timingReader) Read(b []byte) (n int, err error) { r.readBytes += int64(n) // Report metric just once. if !r.alreadyGotErr && err != nil && err != io.EOF { - if !r.isFailureExpected(err) { + if !r.isFailureExpected(err) && !errors.Is(err, context.Canceled) { r.failed.WithLabelValues(r.op).Inc() } r.alreadyGotErr = true diff --git a/objstore_test.go b/objstore_test.go index ababe62..7d56dfb 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -412,7 +412,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { func TestTimingReader(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), nil, "") r := bytes.NewReader([]byte("hello world")) - tr := newTimingReader(r, true, "", m.opsDuration, m.opsFailures, func(err error) bool { + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) @@ -438,6 +438,59 @@ func TestTimingReader(t *testing.T) { _, isReaderAt := tr.(io.ReaderAt) testutil.Assert(t, isReaderAt) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +func TestTimingReader_ExpectedError(t *testing.T) { + readerErr := errors.New("something went wrong") + + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{readerErr} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, readerErr, err) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +func TestTimingReader_UnexpectedError(t *testing.T) { + readerErr := errors.New("something went wrong") + + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{readerErr} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, readerErr, err) + + testutil.Equals(t, float64(1), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +func TestTimingReader_ContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + m := WrapWithMetrics(NewInMemBucket(), nil, "") + r := dummyReader{ctx.Err()} + tr := newTimingReader(r, true, OpGet, m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes, m.opsTransferredBytes) + + buf := make([]byte, 1) + _, err := io.ReadFull(tr, buf) + testutil.Equals(t, ctx.Err(), err) + + testutil.Equals(t, float64(0), promtest.ToFloat64(m.opsFailures.WithLabelValues(OpGet))) +} + +type dummyReader struct { + err error +} + +func (r dummyReader) Read(_ []byte) (int, error) { + return 0, r.err } func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) {