From ffc6a6bf3ec371421471c982c222da87dde85f57 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 29 Jan 2024 09:24:05 +0100 Subject: [PATCH 1/2] Fix S3 upload performance regression Signed-off-by: Marco Pracucci --- go.mod | 2 +- go.sum | 4 +- .../github.com/thanos-io/objstore/objstore.go | 135 ++++++++++-------- vendor/modules.txt | 2 +- 4 files changed, 78 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index 1264656dde3..b7de94282a6 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/prometheus/procfs v0.12.0 - github.com/thanos-io/objstore v0.0.0-20240116185442-6ecabdddaab1 + github.com/thanos-io/objstore v0.0.0-20240128223450-bdadaefbfe03 github.com/twmb/franz-go v1.15.4 github.com/twmb/franz-go/pkg/kadm v1.10.0 github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1 diff --git a/go.sum b/go.sum index 09a22ebf3d2..03a939b5a06 100644 --- a/go.sum +++ b/go.sum @@ -942,8 +942,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= -github.com/thanos-io/objstore v0.0.0-20240116185442-6ecabdddaab1 h1:8taDMdkqtKO6uB+JKwXnz2jByLwBLY+GnXp3ShXjUKY= -github.com/thanos-io/objstore v0.0.0-20240116185442-6ecabdddaab1/go.mod h1:RMvJQnpB4QQiYGg1gF8mnPJg6IkIPY28Buh8f6b+F0c= +github.com/thanos-io/objstore v0.0.0-20240128223450-bdadaefbfe03 h1:VEu81Atmor2RVr5iGzlLc7E8X9jd1ux0ze4Ie5Qw8n0= +github.com/thanos-io/objstore v0.0.0-20240128223450-bdadaefbfe03/go.mod h1:RMvJQnpB4QQiYGg1gF8mnPJg6IkIPY28Buh8f6b+F0c= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw= diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index a83dacdf95d..b1d7d09a1aa 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -232,19 +232,6 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser { return nopCloserWithObjectSize{r} } -type nopSeekerCloserWithObjectSize struct{ io.Reader } - -func (nopSeekerCloserWithObjectSize) Close() error { return nil } -func (n nopSeekerCloserWithObjectSize) ObjectSize() (int64, error) { return TryToGetSize(n.Reader) } - -func (n nopSeekerCloserWithObjectSize) Seek(offset int64, whence int) (int64, error) { - return n.Reader.(io.Seeker).Seek(offset, whence) -} - -func nopSeekerCloserWithSize(r io.Reader) io.ReadSeekCloser { - return nopSeekerCloserWithObjectSize{r} -} - // UploadDir uploads all files in srcdir to the bucket with into a top-level directory // named dstdir. It is a caller responsibility to clean partial upload in case of failure. func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error { @@ -555,8 +542,9 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err } return nil, err } - return newTimingReadCloser( + return newTimingReader( rc, + true, op, b.opsDuration, b.opsFailures, @@ -577,8 +565,9 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in } return nil, err } - return newTimingReadCloser( + return newTimingReader( rc, + true, op, b.opsDuration, b.opsFailures, @@ -608,16 +597,9 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err const op = OpUpload b.ops.WithLabelValues(op).Inc() - _, ok := r.(io.Seeker) - var nopR io.ReadCloser - if ok { - nopR = nopSeekerCloserWithSize(r) - } else { - nopR = NopCloserWithSize(r) - } - - trc := newTimingReadCloser( - nopR, + trc := newTimingReader( + r, + false, op, b.opsDuration, b.opsFailures, @@ -670,12 +652,13 @@ func (b *metricBucket) Name() string { return b.bkt.Name() } -type timingReadSeekCloser struct { - timingReadCloser -} +type timingReader struct { + io.Reader + + // closeReader holds whether the wrapper io.Reader should be closed when + // Close() is called on the timingReader. + closeReader bool -type timingReadCloser struct { - io.ReadCloser objSize int64 objSizeErr error @@ -691,14 +674,15 @@ type timingReadCloser struct { transferredBytes *prometheus.HistogramVec } -func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { +func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser { // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) - objSize, objSizeErr := TryToGetSize(rc) + objSize, objSizeErr := TryToGetSize(r) - trc := timingReadCloser{ - ReadCloser: rc, + trc := timingReader{ + Reader: r, + closeReader: closeReader, objSize: objSize, objSizeErr: objSizeErr, start: time.Now(), @@ -711,50 +695,79 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV readBytes: 0, } - _, ok := rc.(io.Seeker) - if ok { - return &timingReadSeekCloser{ - timingReadCloser: trc, - } + _, isSeeker := r.(io.Seeker) + _, isReaderAt := r.(io.ReaderAt) + + if isSeeker && isReaderAt { + // The assumption is that in most cases when io.ReaderAt() is implemented then + // io.Seeker is implemented too (e.g. os.File). + return &timingReaderSeekerReaderAt{timingReaderSeeker: timingReaderSeeker{timingReader: trc}} + } + if isSeeker { + return &timingReaderSeeker{timingReader: trc} } return &trc } -func (t *timingReadCloser) ObjectSize() (int64, error) { - return t.objSize, t.objSizeErr +func (r *timingReader) ObjectSize() (int64, error) { + return r.objSize, r.objSizeErr } -func (rc *timingReadCloser) Close() error { - err := rc.ReadCloser.Close() - if !rc.alreadyGotErr && err != nil { - rc.failed.WithLabelValues(rc.op).Inc() +func (r *timingReader) Close() error { + var closeErr error + + // Call the wrapped reader if it implements Close(), only if we've been asked to close it. + if closer, ok := r.Reader.(io.Closer); r.closeReader && ok { + closeErr = closer.Close() + + if !r.alreadyGotErr && closeErr != nil { + r.failed.WithLabelValues(r.op).Inc() + r.alreadyGotErr = true + } } - if !rc.alreadyGotErr && err == nil { - rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds()) - rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes)) - rc.alreadyGotErr = true + + // Track duration and transferred bytes only if no error occurred. + if !r.alreadyGotErr { + r.duration.WithLabelValues(r.op).Observe(time.Since(r.start).Seconds()) + r.transferredBytes.WithLabelValues(r.op).Observe(float64(r.readBytes)) + + // Trick to tracking metrics multiple times in case Close() gets called again. + r.alreadyGotErr = true } - return err + + return closeErr } -func (rc *timingReadCloser) Read(b []byte) (n int, err error) { - n, err = rc.ReadCloser.Read(b) - if rc.fetchedBytes != nil { - rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n)) +func (r *timingReader) Read(b []byte) (n int, err error) { + n, err = r.Reader.Read(b) + if r.fetchedBytes != nil { + r.fetchedBytes.WithLabelValues(r.op).Add(float64(n)) } - rc.readBytes += int64(n) + r.readBytes += int64(n) // Report metric just once. - if !rc.alreadyGotErr && err != nil && err != io.EOF { - if !rc.isFailureExpected(err) { - rc.failed.WithLabelValues(rc.op).Inc() + if !r.alreadyGotErr && err != nil && err != io.EOF { + if !r.isFailureExpected(err) { + r.failed.WithLabelValues(r.op).Inc() } - rc.alreadyGotErr = true + r.alreadyGotErr = true } return n, err } -func (rsc *timingReadSeekCloser) Seek(offset int64, whence int) (int64, error) { - return (rsc.ReadCloser).(io.Seeker).Seek(offset, whence) +type timingReaderSeeker struct { + timingReader +} + +func (rsc *timingReaderSeeker) Seek(offset int64, whence int) (int64, error) { + return (rsc.Reader).(io.Seeker).Seek(offset, whence) +} + +type timingReaderSeekerReaderAt struct { + timingReaderSeeker +} + +func (rsc *timingReaderSeekerReaderAt) ReadAt(p []byte, off int64) (int, error) { + return (rsc.Reader).(io.ReaderAt).ReadAt(p, off) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 5f61572c92b..f11e0505d59 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1011,7 +1011,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20240116185442-6ecabdddaab1 +# github.com/thanos-io/objstore v0.0.0-20240128223450-bdadaefbfe03 ## explicit; go 1.20 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp From 89f7538786ff1404ff1a6fa75dca7bb14cb485b5 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 29 Jan 2024 09:25:46 +0100 Subject: [PATCH 2/2] Add CHANGELOG entry Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c01a14201e..3e59b616e03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ * `label_names_and_values`: label names / values query * `active_series`: active series query * `other`: any other request +* [BUGFIX] Fix performance regression introduced in Mimir 2.11.0 when uploading blocks to AWS S3. #7240 ### Mixin