From a32f1ff378120424dde1f8d0b6edf5f5a0beaebb Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 26 Feb 2021 11:14:24 +0100 Subject: [PATCH] Replaces errutil with tools/pkg/merrors; Fixes very ugly misuse of multi-error. Previous multi-error implementation could cause very ugly bug of returnig empty multi-error that should be treated as success not error by API, but if .Err() is not invoked it will be used as non nil error. There were 9 places where we had this bug in handler due to MultiError lib allowing to do so. Signed-off-by: Bartlomiej Plotka # Conflicts: # pkg/block/fetcher.go # pkg/compact/compact.go # pkg/compact/compact_test.go # pkg/errutil/multierror.go # pkg/receive/handler.go # pkg/receive/handler_test.go --- Makefile | 1 + cmd/thanos/rule.go | 7 +- cmd/thanos/tools.go | 6 +- go.mod | 1 + go.sum | 2 + pkg/block/fetcher.go | 18 ++- pkg/block/writer.go | 20 ++- pkg/compact/compact.go | 41 +++--- pkg/compact/compact_test.go | 12 +- pkg/compact/downsample/downsample.go | 7 +- .../downsample/streamed_block_writer.go | 20 ++- pkg/compactv2/compactor.go | 8 +- pkg/discovery/dns/provider.go | 4 +- pkg/errutil/multierror.go | 53 ------- pkg/receive/handler.go | 19 +-- pkg/receive/handler_test.go | 137 ++++++++---------- pkg/receive/multitsdb.go | 26 ++-- pkg/receive/writer.go | 5 +- pkg/rules/manager.go | 16 +- pkg/runutil/runutil.go | 20 +-- pkg/store/multitsdb.go | 8 +- 21 files changed, 187 insertions(+), 244 deletions(-) delete mode 100644 pkg/errutil/multierror.go diff --git a/Makefile b/Makefile index 68fa8afbd1..61a9dd5bd4 100644 --- a/Makefile +++ b/Makefile @@ -299,6 +299,7 @@ go-lint: check-git deps $(GOLANGCI_LINT) $(FAILLINT) @# TODO(bwplotka): Add, Printf, DefaultRegisterer, NewGaugeFunc and MustRegister once exception are accepted. Add fmt.{Errorf}=github.com/pkg/errors.{Errorf} once https://github.com/fatih/faillint/issues/10 is addressed. @$(FAILLINT) -paths "errors=github.com/pkg/errors,\ github.com/prometheus/tsdb=github.com/prometheus/prometheus/tsdb,\ +github.com/prometheus/prometheus/tsdb/errors=github.com/efficientgo/tools/core/pkg/merrors,\ github.com/prometheus/prometheus/pkg/testutils=github.com/thanos-io/thanos/pkg/testutil,\ github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUntypedFunc,UntypedFunc},\ github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\ diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 1078c49bd4..f35b03c45f 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -29,7 +30,6 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/strutil" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/alert" @@ -811,10 +811,11 @@ func reloadRules(logger log.Logger, ruleFiles []string, ruleMgr *thanosrules.Manager, evalInterval time.Duration, - metrics *RuleMetrics) error { + metrics *RuleMetrics, +) error { level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) var ( - errs errutil.MultiError + errs = merrors.New() files []string seenFiles = make(map[string]struct{}) ) diff --git a/cmd/thanos/tools.go b/cmd/thanos/tools.go index c0c9b04ed8..aec498e1e1 100644 --- a/cmd/thanos/tools.go +++ b/cmd/thanos/tools.go @@ -6,12 +6,12 @@ package main import ( "os" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/rules" ) @@ -35,7 +35,7 @@ func registerCheckRules(app extkingpin.AppClause) { } func checkRulesFiles(logger log.Logger, files *[]string) error { - var failed errutil.MultiError + failed := merrors.New() for _, fn := range *files { level.Info(logger).Log("msg", "checking", "filename", fn) @@ -51,7 +51,7 @@ func checkRulesFiles(logger log.Logger, files *[]string) error { n, errs := rules.ValidateAndCount(f) if errs.Err() != nil { level.Error(logger).Log("result", "FAILED") - for _, e := range errs { + for _, e := range errs.Err().Errors() { level.Error(logger).Log("error", e.Error()) failed.Add(e) } diff --git a/go.mod b/go.mod index e3ce8199d3..c1e93ac48c 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/chromedp/chromedp v0.5.3 github.com/cortexproject/cortex v1.7.1-0.20210224085859-66d6fb5b0d42 github.com/davecgh/go-spew v1.1.1 + github.com/efficientgo/tools/core v0.0.0-20210201224146-3d78f4d30648 github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.1.0 github.com/felixge/fgprof v0.9.1 diff --git a/go.sum b/go.sum index f530ac84c6..e0b5b58085 100644 --- a/go.sum +++ b/go.sum @@ -304,6 +304,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/efficientgo/tools/core v0.0.0-20210201224146-3d78f4d30648 h1:zY9fs6qlXtS/YlrijZ+7vTqduJRybPYwJ8Mjo4zWrS8= +github.com/efficientgo/tools/core v0.0.0-20210201224146-3d78f4d30648/go.mod h1:cFZoHUhKg31xkPnPjhPKFtevnx0Xcg67ptBRxbpaxtk= github.com/elastic/go-sysinfo v1.1.1 h1:ZVlaLDyhVkDfjwPGU55CQRCRolNpc7P0BbyhhQZQmMI= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index c0e04dbe8e..8a82e9d4b9 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/golang/groupcache/singleflight" @@ -28,7 +29,6 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -285,7 +285,7 @@ type response struct { metas map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error // If metaErr > 0 it means incomplete view, so some metas, failed to be loaded. - metaErrs errutil.MultiError + metaErrs merrors.NilOrMultiError noMetas float64 corruptedMetas float64 @@ -362,7 +362,8 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { return nil, errors.Wrap(err, "BaseFetcher: iter bucket") } - if len(resp.metaErrs) > 0 { + if resp.metaErrs.Err() != nil { + // Incomplete view is fine, errors are handled by metric, but we stil return success. return resp, nil } @@ -433,7 +434,12 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter metas[id] = m } - metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs))) + if errs := resp.metaErrs.Err(); errs != nil { + metrics.synced.WithLabelValues(failedMeta).Set(float64(len(errs.Errors()))) + } else { + metrics.synced.WithLabelValues(failedMeta).Set(0) + } + metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas) metrics.synced.WithLabelValues(corruptedMeta).Set(resp.corruptedMetas) @@ -454,8 +460,8 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) metrics.submit() - if len(resp.metaErrs) > 0 { - return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view") + if errs := resp.metaErrs.Err(); errs != nil { + return metas, resp.partial, errors.Wrap(errs, "incomplete view") } level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) diff --git a/pkg/block/writer.go b/pkg/block/writer.go index 995d8f72ae..4ec46b5986 100644 --- a/pkg/block/writer.go +++ b/pkg/block/writer.go @@ -9,13 +9,13 @@ import ( "os" "path/filepath" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" ) @@ -56,6 +56,14 @@ type DiskWriter struct { const tmpForCreationBlockDirSuffix = ".tmp-for-creation" +func closeAll(closers []io.Closer) error { + errs := merrors.New() + for _, c := range closers { + errs.Add(c.Close()) + } + return errs.Err() +} + // NewDiskWriter allows to write single TSDB block to disk and returns statistics. // Destination block directory has to exists. func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *DiskWriter, err error) { @@ -68,7 +76,7 @@ func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *Disk } defer func() { if err != nil { - err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + err = merrors.New(err, closeAll(d.closers)).Err() if err := os.RemoveAll(bTmp); err != nil { level.Error(logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) } @@ -102,7 +110,7 @@ func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *Disk func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) { defer func() { if err != nil { - err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + err = merrors.New(err, closeAll(d.closers)).Err() if err := os.RemoveAll(d.bTmp); err != nil { level.Error(d.logger).Log("msg", "removed tmp folder failed after block(s) write", "err", err.Error()) } @@ -114,7 +122,7 @@ func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) { } defer func() { if df != nil { - err = tsdb_errors.NewMulti(err, df.Close()).Err() + err = merrors.New(err, df.Close()).Err() } }() @@ -128,7 +136,7 @@ func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) { } df = nil - if err := tsdb_errors.CloseAll(d.closers); err != nil { + if err := closeAll(d.closers); err != nil { d.closers = nil return tsdb.BlockStats{}, err } @@ -180,5 +188,5 @@ func (s *statsGatheringSeriesWriter) WriteChunks(chks ...chunks.Meta) error { } func (s statsGatheringSeriesWriter) Close() error { - return tsdb_errors.NewMulti(s.iw.Close(), s.cw.Close()).Err() + return merrors.New(s.iw.Close(), s.cw.Close()).Err() } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a4345319fa..b433467416 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -5,6 +5,7 @@ package compact import ( "context" + stderrors "errors" //lint:ignore faillint explicitly using stderrors.As "fmt" "io/ioutil" "math" @@ -14,6 +15,7 @@ import ( "sync" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" @@ -27,7 +29,6 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -543,17 +544,7 @@ func (e HaltError) Error() string { // IsHaltError returns true if the base error is a HaltError. // If a multierror is passed, any halt error will return true. func IsHaltError(err error) bool { - if multiErr, ok := errors.Cause(err).(errutil.NonNilMultiError); ok { - for _, err := range multiErr { - if _, ok := errors.Cause(err).(HaltError); ok { - return true - } - } - return false - } - - _, ok := errors.Cause(err).(HaltError) - return ok + return stderrors.As(err, &HaltError{}) } // RetryError is a type wrapper for errors that should trigger warning log and retry whole compaction loop, but aborting @@ -576,17 +567,19 @@ func (e RetryError) Error() string { // IsRetryError returns true if the base error is a RetryError. // If a multierror is passed, all errors must be retriable. func IsRetryError(err error) bool { - if multiErr, ok := errors.Cause(err).(errutil.NonNilMultiError); ok { - for _, err := range multiErr { - if _, ok := errors.Cause(err).(RetryError); !ok { + if !stderrors.As(err, &RetryError{}) { + return false + } + // Check if it's not multi-error with some non-retry errors. + errs, ok := merrors.AsMulti(err) + if ok { + for _, e := range errs.Errors() { + if !IsRetryError(e) { return false } } - return true } - - _, ok := errors.Cause(err).(RetryError) - return ok + return true } func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metadata.Meta) error { @@ -953,12 +946,12 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { level.Info(c.logger).Log("msg", "start of compactions") // Send all groups found during this pass to the compaction workers. - var groupErrs errutil.MultiError + errs := merrors.New() groupLoop: for _, g := range groups { select { case groupErr := <-errChan: - groupErrs.Add(groupErr) + errs.Add(groupErr) break groupLoop case groupChan <- g: } @@ -970,12 +963,12 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { // while we were waiting for the last batch of groups to run the compaction. close(errChan) for groupErr := range errChan { - groupErrs.Add(groupErr) + errs.Add(groupErr) } workCtxCancel() - if len(groupErrs) > 0 { - return groupErrs.Err() + if err := errs.Err(); err != nil { + return err } if finishedAllGroups { diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 4f8140e7ce..cb798ddc38 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -6,11 +6,10 @@ package compact import ( "testing" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -32,7 +31,7 @@ func TestHaltMultiError(t *testing.T) { haltErr := halt(errors.New("halt error")) nonHaltErr := errors.New("not a halt error") - errs := errutil.MultiError{nonHaltErr} + errs := merrors.New(nonHaltErr) testutil.Assert(t, !IsHaltError(errs.Err()), "should not be a halt error") errs.Add(haltErr) @@ -45,15 +44,14 @@ func TestRetryMultiError(t *testing.T) { retryErr := retry(errors.New("retry error")) nonRetryErr := errors.New("not a retry error") - errs := errutil.MultiError{nonRetryErr} + errs := merrors.New(nonRetryErr) testutil.Assert(t, !IsRetryError(errs.Err()), "should not be a retry error") - errs = errutil.MultiError{retryErr} + errs = merrors.New(retryErr) testutil.Assert(t, IsRetryError(errs.Err()), "if all errors are retriable this should return true") - testutil.Assert(t, IsRetryError(errors.Wrap(errs.Err(), "wrap")), "retry error with wrap") - errs = errutil.MultiError{nonRetryErr, retryErr} + errs = merrors.New(nonRetryErr, retryErr) testutil.Assert(t, !IsRetryError(errs.Err()), "mixed errors should return false") } diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 8d271b3ee6..c4f79c2943 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -10,6 +10,7 @@ import ( "path/filepath" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -20,7 +21,6 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -73,10 +73,7 @@ func Downsample( // Remove blockDir in case of errors. defer func() { if err != nil { - var merr errutil.MultiError - merr.Add(err) - merr.Add(os.RemoveAll(blockDir)) - err = merr.Err() + err = merrors.New(err, os.RemoveAll(blockDir)).Err() } }() diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 1872091bfd..161ebfc9c1 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -19,7 +20,6 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -61,12 +61,11 @@ func NewStreamedBlockWriter( // We should close any opened Closer up to an error. defer func() { if err != nil { - var merr errutil.MultiError - merr.Add(err) + errs := merrors.New(err) for _, cl := range closers { - merr.Add(cl.Close()) + errs.Add(cl.Close()) } - err = merr.Err() + err = errs.Err() } }() @@ -143,20 +142,19 @@ func (w *streamedBlockWriter) Close() error { } w.finalized = true - merr := errutil.MultiError{} - + errs := merrors.New() if w.ignoreFinalize { // Close open file descriptors anyway. for _, cl := range w.closers { - merr.Add(cl.Close()) + errs.Add(cl.Close()) } - return merr.Err() + return errs.Err() } // Finalize saves prepared index and metadata to corresponding files. for _, cl := range w.closers { - merr.Add(cl.Close()) + errs.Add(cl.Close()) } if err := w.writeMetaFile(); err != nil { @@ -167,7 +165,7 @@ func (w *streamedBlockWriter) Close() error { return errors.Wrap(err, "sync blockDir") } - if err := merr.Err(); err != nil { + if err := errs.Err(); err != nil { return errors.Wrap(err, "finalize") } diff --git a/pkg/compactv2/compactor.go b/pkg/compactv2/compactor.go index 3da860a998..19f5c73605 100644 --- a/pkg/compactv2/compactor.go +++ b/pkg/compactv2/compactor.go @@ -9,13 +9,13 @@ import ( "io" "strings" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" ) @@ -83,9 +83,9 @@ func (w *Compactor) WriteSeries(ctx context.Context, readers []block.Reader, sWr closers []io.Closer ) defer func() { - errs := tsdb_errors.NewMulti(err) - if cerr := tsdb_errors.CloseAll(closers); cerr != nil { - errs.Add(errors.Wrap(cerr, "close")) + errs := merrors.New() + for _, c := range closers { + errs.Add(c.Close()) } err = errs.Err() }() diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 060d54c57d..923df61a6b 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -16,7 +17,6 @@ import ( "github.com/thanos-io/thanos/pkg/discovery/dns/godns" "github.com/thanos-io/thanos/pkg/discovery/dns/miekgdns" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" ) @@ -112,7 +112,7 @@ func GetQTypeName(addr string) (qtype string, name string) { // For non-SRV records, it will return an error if a port is not supplied. func (p *Provider) Resolve(ctx context.Context, addrs []string) error { resolvedAddrs := map[string][]string{} - errs := errutil.MultiError{} + errs := merrors.New() for _, addr := range addrs { var resolved []string diff --git a/pkg/errutil/multierror.go b/pkg/errutil/multierror.go deleted file mode 100644 index aa53706dde..0000000000 --- a/pkg/errutil/multierror.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package errutil - -import ( - "bytes" - "fmt" -) - -// The MultiError type implements the error interface, and contains the -// Errors used to construct it. -type MultiError []error - -// Add adds the error to the error list if it is not nil. -func (es *MultiError) Add(err error) { - if err == nil { - return - } - if merr, ok := err.(NonNilMultiError); ok { - *es = append(*es, merr...) - } else { - *es = append(*es, err) - } -} - -// Err returns the error list as an error or nil if it is empty. -func (es MultiError) Err() error { - if len(es) == 0 { - return nil - } - return NonNilMultiError(es) -} - -type NonNilMultiError MultiError - -// Returns a concatenated string of the contained errors. -func (es NonNilMultiError) Error() string { - var buf bytes.Buffer - - if len(es) > 1 { - fmt.Fprintf(&buf, "%d errors: ", len(es)) - } - - for i, err := range es { - if i != 0 { - buf.WriteString("; ") - } - buf.WriteString(err.Error()) - } - - return buf.String() -} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 0337fe495f..425f87ef54 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -33,7 +34,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/errutil" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" @@ -333,7 +333,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { case errBadReplica: http.Error(w, err.Error(), http.StatusBadRequest) default: - level.Error(h.logger).Log("err", err, "msg", "internal server error") + level.Error(h.logger).Log("err", err, "msg", "internal server error while handling receive request") http.Error(w, err.Error(), http.StatusInternalServerError) } } @@ -394,7 +394,7 @@ func (h *Handler) writeQuorum() int { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest, successThreshold int) error { - var errs errutil.MultiError + errs := merrors.New() fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { @@ -691,11 +691,12 @@ func determineWriteErrorCause(err error, threshold int) error { return nil } - unwrappedErr := errors.Cause(err) - errs, ok := unwrappedErr.(errutil.NonNilMultiError) - if !ok { - errs = []error{unwrappedErr} + errs := []error{err} + merrs, ok := merrors.AsMulti(err) + if ok { + errs = merrs.Errors() } + if len(errs) == 0 { return nil } @@ -717,12 +718,12 @@ func determineWriteErrorCause(err error, threshold int) error { } } } - // Determine which error occurred most. + + // Determine which error occurred the most. sort.Sort(sort.Reverse(expErrs)) if exp := expErrs[0]; exp.count >= threshold { return exp.err } - return err } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 70637d49fd..b3170f6348 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -10,10 +10,13 @@ import ( "math/rand" "net/http" "net/http/httptest" + "os" "sync" "testing" "time" + "github.com/efficientgo/tools/core/pkg/merrors" + "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -25,7 +28,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -42,10 +44,6 @@ func TestDetermineWriteErrorCause(t *testing.T) { { name: "nil", }, - { - name: "nil multierror", - err: errutil.NonNilMultiError([]error{}), - }, { name: "matching simple", err: errConflict, @@ -54,81 +52,81 @@ func TestDetermineWriteErrorCause(t *testing.T) { }, { name: "non-matching multierror", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( errors.New("foo"), errors.New("bar"), - }), + ).Err(), exp: errors.New("2 errors: foo; bar"), }, { name: "nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ + err: errors.Wrap(merrors.New( errors.New("foo"), errors.New("bar"), - }), "baz"), + ).Err(), "baz"), threshold: 1, exp: errors.New("baz: 2 errors: foo; bar"), }, { name: "deep nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ + err: errors.Wrap(merrors.New( errors.New("foo"), - errutil.NonNilMultiError([]error{ + merrors.New( errors.New("bar"), errors.New("qux"), - }), - }), "baz"), + ).Err(), + ).Err(), "baz"), threshold: 1, - exp: errors.New("baz: 2 errors: foo; 2 errors: bar; qux"), + exp: errors.New("baz: 3 errors: foo; bar; qux"), }, { name: "matching multierror", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( storage.ErrOutOfOrderSample, errors.New("foo"), errors.New("bar"), - }), + ).Err(), threshold: 1, exp: errConflict, }, { name: "matching but below threshold multierror", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( storage.ErrOutOfOrderSample, errors.New("foo"), errors.New("bar"), - }), + ).Err(), threshold: 2, exp: errors.New("3 errors: out of order sample; foo; bar"), }, { name: "matching multierror many", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( storage.ErrOutOfOrderSample, errConflict, status.Error(codes.AlreadyExists, "conflict"), errors.New("foo"), errors.New("bar"), - }), + ).Err(), threshold: 1, exp: errConflict, }, { name: "matching multierror many, one above threshold", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( storage.ErrOutOfOrderSample, errConflict, tsdb.ErrNotReady, tsdb.ErrNotReady, tsdb.ErrNotReady, errors.New("foo"), - }), + ).Err(), threshold: 2, exp: errNotReady, }, { name: "matching multierror many, both above threshold, conflict have precedence", - err: errutil.NonNilMultiError([]error{ + err: merrors.New( storage.ErrOutOfOrderSample, errConflict, tsdb.ErrNotReady, @@ -136,73 +134,74 @@ func TestDetermineWriteErrorCause(t *testing.T) { tsdb.ErrNotReady, status.Error(codes.AlreadyExists, "conflict"), errors.New("foo"), - }), + ).Err(), threshold: 2, exp: errConflict, }, { name: "nested matching multierror", - err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{ + err: errors.Wrap(errors.Wrap(merrors.New( storage.ErrOutOfOrderSample, errors.New("foo"), errors.New("bar"), - }), "baz"), "qux"), + ).Err(), "baz"), "qux"), threshold: 1, exp: errConflict, }, { name: "deep nested matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errutil.NonNilMultiError([]error{ + err: errors.Wrap(merrors.New( + merrors.New( errors.New("qux"), status.Error(codes.AlreadyExists, "conflict"), status.Error(codes.AlreadyExists, "conflict"), - }), + ).Err(), errors.New("foo"), errors.New("bar"), - }), "baz"), + ).Err(), "baz"), threshold: 1, - exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), + exp: errConflict, }, } { - err := determineWriteErrorCause(tc.err, tc.threshold) - if tc.exp != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tc.exp.Error(), err.Error()) - continue - } - testutil.Ok(t, err) + t.Run(tc.name, func(t *testing.T) { + err := determineWriteErrorCause(tc.err, tc.threshold) + if tc.exp != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.exp.Error(), err.Error()) + return + } + testutil.Ok(t, err) + }) } } func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { - cfg := []HashringConfig{ - { - Hashring: "test", - }, - } - var handlers []*Handler - // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers - // This removes the network from the tests and creates a more consistent testing harness. - peers := &peerGroup{ - dialOpts: nil, - m: sync.RWMutex{}, - cache: map[string]storepb.WriteableStoreClient{}, - dialer: func(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error) { - // dialer should never be called since we are creating fake clients with fake addresses - // this protects against some leaking test that may attempt to dial random IP addresses - // which may pose a security risk. - return nil, errors.New("unexpected dial called in testing") - }, - } + var ( + cfg = []HashringConfig{{Hashring: "test"}} + handlers []*Handler + // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers + // This removes the network from the tests and creates a more consistent testing harness. + peers = &peerGroup{ + dialOpts: nil, + m: sync.RWMutex{}, + cache: map[string]storepb.WriteableStoreClient{}, + dialer: func(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error) { + // dialer should never be called since we are creating fake clients with fake addresses + // this protects against some leaking test that may attempt to dial random IP addresses + // which may pose a security risk. + return nil, errors.New("unexpected dial called in testing") + }, + } + ) + logger := log.NewLogfmtLogger(os.Stdout) for i := range appendables { - h := NewHandler(nil, &Options{ + h := NewHandler(logger, &Options{ TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, ForwardTimeout: 5 * time.Second, - Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), + Writer: NewWriter(logger, newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) h.peers = peers @@ -226,24 +225,12 @@ func TestReceiveQuorum(t *testing.T) { Timeseries: []prompb.TimeSeries{ { Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, + {Name: "foo", Value: "bar"}, }, Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, + {Value: 1, Timestamp: 1}, + {Value: 2, Timestamp: 2}, + {Value: 3, Timestamp: 3}, }, }, }, diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index ed297db7c8..70cea73f62 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -11,6 +11,7 @@ import ( "path/filepath" "sync" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -20,7 +21,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" @@ -144,7 +144,7 @@ func (t *MultiTSDB) Flush() error { defer t.mtx.RUnlock() errmtx := &sync.Mutex{} - merr := errutil.MultiError{} + errs := merrors.New() wg := &sync.WaitGroup{} for id, tenant := range t.tenants { db := tenant.readyStorage().Get() @@ -158,7 +158,7 @@ func (t *MultiTSDB) Flush() error { head := db.Head() if err := db.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime()-1)); err != nil { errmtx.Lock() - merr.Add(err) + errs.Add(err) errmtx.Unlock() } wg.Done() @@ -166,14 +166,14 @@ func (t *MultiTSDB) Flush() error { } wg.Wait() - return merr.Err() + return errs.Err() } func (t *MultiTSDB) Close() error { t.mtx.Lock() defer t.mtx.Unlock() - merr := errutil.MultiError{} + errs := merrors.New() for id, tenant := range t.tenants { db := tenant.readyStorage().Get() if db == nil { @@ -181,9 +181,9 @@ func (t *MultiTSDB) Close() error { continue } level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id) - merr.Add(db.Close()) + errs.Add(db.Close()) } - return merr.Err() + return errs.Err() } func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { @@ -196,7 +196,7 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { var ( errmtx = &sync.Mutex{} - merr = errutil.MultiError{} + errs = merrors.New() wg = &sync.WaitGroup{} uploaded atomic.Int64 ) @@ -212,7 +212,7 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { up, err := s.Sync(ctx) if err != nil { errmtx.Lock() - merr.Add(errors.Wrap(err, "upload")) + errs.Add(errors.Wrap(err, "upload")) errmtx.Unlock() } uploaded.Add(int64(up)) @@ -220,7 +220,7 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { }() } wg.Wait() - return int(uploaded.Load()), merr.Err() + return int(uploaded.Load()), errs.Err() } func (t *MultiTSDB) RemoveLockFilesIfAny() error { @@ -232,7 +232,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return err } - merr := errutil.MultiError{} + errs := merrors.New() for _, fi := range fis { if !fi.IsDir() { continue @@ -241,12 +241,12 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { if os.IsNotExist(err) { continue } - merr.Add(err) + errs.Add(err) continue } level.Info(t.logger).Log("msg", "a leftover lockfile found and removed", "tenant", fi.Name()) } - return merr.Err() + return errs.Err() } func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 8fe1cfe68c..82081e88ae 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -7,14 +7,13 @@ import ( "context" "sync" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -59,7 +58,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR return errors.Wrap(err, "get appender") } - var errs errutil.MultiError + errs := merrors.New() for _, t := range wreq.Timeseries { lset := make(labels.Labels, len(t.Labels)) for j := range t.Labels { diff --git a/pkg/rules/manager.go b/pkg/rules/manager.go index 322c785011..34372f2c4b 100644 --- a/pkg/rules/manager.go +++ b/pkg/rules/manager.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,7 +24,6 @@ import ( "github.com/prometheus/prometheus/rules" "gopkg.in/yaml.v3" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -277,9 +277,13 @@ func (g configRuleAdapter) validate() (errs []error) { // ValidateAndCount validates all rules in the rule groups and return overal number of rules in all groups. // TODO(bwplotka): Replace this with upstream implementation after https://github.com/prometheus/prometheus/issues/7128 is fixed. -func ValidateAndCount(group io.Reader) (numRules int, errs errutil.MultiError) { - var rgs configGroups - d := yaml.NewDecoder(group) +func ValidateAndCount(group io.Reader) (numRules int, _ *merrors.NilOrMultiError) { + var ( + errs = merrors.New() + rgs configGroups + d = yaml.NewDecoder(group) + ) + d.KnownFields(true) if err := d.Decode(&rgs); err != nil { errs.Add(err) @@ -309,7 +313,7 @@ type configGroups struct { // special field in configGroups.configRuleAdapter struct. func (m *Manager) Update(evalInterval time.Duration, files []string) error { var ( - errs errutil.MultiError + errs = merrors.New() filesByStrategy = map[storepb.PartialResponseStrategy][]string{} ruleFiles = map[string]string{} ) @@ -349,7 +353,7 @@ func (m *Manager) Update(evalInterval time.Duration, files []string) error { for s, rg := range groupsByStrategy { b, err := yaml.Marshal(configGroups{Groups: rg}) if err != nil { - errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) + errs.Add(errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) continue } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 3f817a62ee..32505972b8 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -56,13 +56,13 @@ import ( "os" "time" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - - "github.com/thanos-io/thanos/pkg/errutil" ) +// TODO(bwplotka): Replace with github.com/efficientgo/tools/core // Repeat executes f every interval seconds until stopc is closed or f returns an error. // It executes f once right after being called. func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error { @@ -137,12 +137,12 @@ func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, // CloseWithErrCapture runs function and on error return error by argument including the given error (usually // from caller function). func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) { - merr := errutil.MultiError{} + errs := merrors.New() - merr.Add(*err) - merr.Add(errors.Wrapf(closer.Close(), format, a...)) + errs.Add(*err) + errs.Add(errors.Wrapf(closer.Close(), format, a...)) - *err = merr.Err() + *err = errs.Err() } // ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. @@ -152,9 +152,9 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a .. CloseWithErrCapture(err, r, format, a...) // Prepend the io.Copy error. - merr := errutil.MultiError{} - merr.Add(copyErr) - merr.Add(*err) + errs := merrors.New() + errs.Add(copyErr) + errs.Add(*err) - *err = merr.Err() + *err = errs.Err() } diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 902611e658..4c8ae2c164 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -9,6 +9,7 @@ import ( "io" "sync" + "github.com/efficientgo/tools/core/pkg/merrors" "github.com/go-kit/kit/log" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/opentracing/opentracing-go" @@ -21,7 +22,6 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -168,11 +168,11 @@ func (s *tenantSeriesSetServer) Delegate(closer io.Closer) { } func (s *tenantSeriesSetServer) Close() error { - var merr errutil.MultiError + errs := merrors.New() for _, c := range s.closers { - merr.Add(c.Close()) + errs.Add(c.Close()) } - return merr.Err() + return errs.Err() } func (s *tenantSeriesSetServer) Next() (ok bool) {