diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf05e9ac02..86b36ce56f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -562,7 +562,7 @@ func runQuery( selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), - maxConcurrentDecompressWorkers > 0, + maxConcurrentDecompressWorkers, ) rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c6dfb31abd..f2c7b97479 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -313,7 +313,7 @@ func runReceive( labels.Labels{}, 0, store.LazyRetrieval, - false, + 0, ) mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) rw := store.ReadWriteTSDBStore{ diff --git a/go.mod b/go.mod index d9aec5df4d..6cfe929bbc 100644 --- a/go.mod +++ b/go.mod @@ -193,7 +193,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect - github.com/kr/pretty v0.3.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a8dfcb80d3a7 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -218,6 +217,7 @@ require ( github.com/sercand/kuberesolver v2.4.0+incompatible // indirect github.com/shirou/gopsutil/v3 v3.22.9 // indirect github.com/sirupsen/logrus v1.9.0 // indirect + github.com/sourcegraph/conc v0.3.0 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/tencentyun/cos-go-sdk-v5 v0.7.40 // indirect diff --git a/go.sum b/go.sum index 57f6b2943a..4bb4deedc2 100644 --- a/go.sum +++ b/go.sum @@ -624,7 +624,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -828,8 +827,7 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rueian/rueidis v0.0.93 h1:cG905akj2+QyHx0x9y4mN0K8vLi6M94QiyoLulXS3l0= @@ -863,6 +861,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/pkg/api/query/grpc_test.go b/pkg/api/query/grpc_test.go index 8e6aafe49d..356864a20c 100644 --- a/pkg/api/query/grpc_test.go +++ b/pkg/api/query/grpc_test.go @@ -25,7 +25,7 @@ import ( func TestGRPCQueryAPIErrorHandling(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() - proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, false) + proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, 0) queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, 0) lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute } tests := []struct { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 1d1264d382..354c71f04d 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -658,7 +658,7 @@ func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { nil, 0, store.EagerRetrieval, - false, + 0, ) } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b2311e29cc..0dc50badac 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -5,7 +5,6 @@ package query import ( "context" - "fmt" "strings" "sync" "time" @@ -15,7 +14,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -25,7 +23,6 @@ import ( "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/hintspb" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -221,118 +218,6 @@ type seriesServer struct { compressedSeriesSet []storepb.CompressedSeries } -func (s *seriesServer) decompressSeriesIndex(i int) (*storepb.Series, error) { - newSeries := &storepb.Series{ - Chunks: s.compressedSeriesSet[i].Chunks, - } - - lbls := make(labels.Labels, 0, len(s.compressedSeriesSet[i].Labels)) - - for _, cLabel := range s.compressedSeriesSet[i].Labels { - var name, val string - for _, symTable := range s.symbolTables { - if foundName, ok := symTable[uint64(cLabel.NameRef)]; ok { - name = foundName - } - - if foundValue, ok := symTable[uint64(cLabel.ValueRef)]; ok { - val = foundValue - } - - if name != "" && val != "" { - break - } - } - if name == "" || val == "" { - return nil, fmt.Errorf("series %+v references do not exist", cLabel) - } - - lbls = append(lbls, labels.Label{ - Name: name, - Value: val, - }) - } - - newSeries.Labels = labelpb.ZLabelsFromPromLabels(lbls) - return newSeries, nil -} - -func (s *seriesServer) DecompressSeries(maxWorkers int) error { - if len(s.compressedSeriesSet) == 0 { - return nil - } - - workerInput := make(chan int) - workerOutput := make(chan *storepb.Series) - - var elements uint64 - for _, css := range s.compressedSeriesSet { - elements += uint64(len(css.Labels)) * 2 - } - - newSeriesSet := make([]storepb.Series, 0, len(s.seriesSet)+len(s.compressedSeriesSet)) - newSeriesSet = append(newSeriesSet, s.seriesSet...) - - // NOTE(GiedriusS): Ballpark estimate. With more workers I got slower results. - workerCount := 1 + (elements / 2000000) - if workerCount == 1 || maxWorkers < 1 { - for i := range s.compressedSeriesSet { - decompressedSeries, err := s.decompressSeriesIndex(i) - if err != nil { - return fmt.Errorf("decompressing element %d: %w", i, err) - } - - newSeriesSet = append(newSeriesSet, *decompressedSeries) - } - s.seriesSet = newSeriesSet - return nil - } - - if maxWorkers > 0 && workerCount > uint64(maxWorkers) { - workerCount = uint64(maxWorkers) - } - - wg := &sync.WaitGroup{} - errLock := sync.Mutex{} - errs := tsdb_errors.NewMulti() - - for i := uint64(0); i < workerCount; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - for ind := range workerInput { - decompressedSeries, err := s.decompressSeriesIndex(ind) - if err != nil { - errLock.Lock() - errs.Add(fmt.Errorf("decompressing element %d: %w", ind, err)) - errLock.Unlock() - continue - } - - workerOutput <- decompressedSeries - } - }() - } - - go func() { - for i := range s.compressedSeriesSet { - workerInput <- i - } - - close(workerInput) - wg.Wait() - close(workerOutput) - }() - - for wo := range workerOutput { - newSeriesSet = append(newSeriesSet, *wo) - } - s.seriesSet = newSeriesSet - - return errs.Err() -} - func (s *seriesServer) Send(r *storepb.SeriesResponse) error { if r.GetWarning() != "" { s.warnings = append(s.warnings, r.GetWarning()) @@ -514,10 +399,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns = append(warns, errors.New(w)) } - if err := resp.DecompressSeries(q.maxConcurrentDecompressWorkers); err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "decompressing series") - } - // Delete the metric's name from the result because that's what the // PromQL does either way and we want our iterator to work with data // that was either pushed down or not. diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 6c5c033baa..9451a4054c 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -855,7 +855,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore { nil, 0, store.EagerRetrieval, - false, + 0, ) } diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index e68e1f90a3..1fcbdd798c 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -37,7 +37,7 @@ func TestQuerier_Proxy(t *testing.T) { logger, nil, store.NewProxyStore(logger, nil, func() []store.Client { return clients }, - component.Debug, nil, 5*time.Minute, store.EagerRetrieval, false), + component.Debug, nil, 5*time.Minute, store.EagerRetrieval, 0), 1000000, 5*time.Minute, 0, diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index b681dc7803..f96c8bcdff 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -73,10 +73,10 @@ type ProxyStore struct { selectorLabels labels.Labels buffers sync.Pool - responseTimeout time.Duration - metrics *proxyStoreMetrics - retrievalStrategy RetrievalStrategy - enableCompressedRetrieval bool + responseTimeout time.Duration + metrics *proxyStoreMetrics + retrievalStrategy RetrievalStrategy + maxDecompressWorkers int } type proxyStoreMetrics struct { @@ -110,7 +110,7 @@ func NewProxyStore( selectorLabels labels.Labels, responseTimeout time.Duration, retrievalStrategy RetrievalStrategy, - enableCompressedRetrieval bool, + maxDecompressWorkers int, ) *ProxyStore { if logger == nil { logger = log.NewNopLogger() @@ -126,10 +126,10 @@ func NewProxyStore( b := make([]byte, 0, initialBufSize) return &b }}, - responseTimeout: responseTimeout, - metrics: metrics, - retrievalStrategy: retrievalStrategy, - enableCompressedRetrieval: enableCompressedRetrieval, + responseTimeout: responseTimeout, + metrics: metrics, + retrievalStrategy: retrievalStrategy, + maxDecompressWorkers: maxDecompressWorkers, } return s } @@ -289,8 +289,14 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } // Zero maximum slots indicated that we want uncompressed data. - if s.enableCompressedRetrieval { + if s.maxDecompressWorkers > 0 { r.MaximumStringSlots = maxStringsPerStore(uint64(len(stores))) + // NOTE(GiedriusS): we force eager retrieval here + // because with symbol tabels either way we have to wait + // till the end to build the initial structure of the heap + // so switch to eager retrieval because there won't be any + // gain from laziness. + s.retrievalStrategy = EagerRetrieval } adjusterFactory := newReferenceAdjusterFactory(uint64(len(stores))) @@ -302,7 +308,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) adjuster := adjusterFactory(uint64(storeIndex)) - respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, adjuster) + respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, adjuster, s.maxDecompressWorkers) if err != nil { level.Error(reqLogger).Log("err", err) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ee663e7e0..2f2bf59568 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -13,6 +13,7 @@ import ( "time" "github.com/gogo/protobuf/types" + "github.com/sourcegraph/conc/iter" "github.com/cespare/xxhash/v2" "github.com/go-kit/log" @@ -547,6 +548,7 @@ func newAsyncRespSet( logger log.Logger, emptyStreamResponses prometheus.Counter, adjuster adjusterFn, + maxDecompressWorkers int, ) (respSet, error) { var span opentracing.Span @@ -629,6 +631,7 @@ func newAsyncRespSet( labelsToRemove, req.MaximumStringSlots, adjuster, + maxDecompressWorkers, ), nil default: panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy)) @@ -664,6 +667,7 @@ type eagerRespSet struct { // Internal bookkeeping. bufferedResponses []*storepb.SeriesResponse + symbolTables []map[uint64]string wg *sync.WaitGroup i int } @@ -681,6 +685,7 @@ func newEagerRespSet( removeLabels map[string]struct{}, maximumStrings uint64, adjuster adjusterFn, + maxDecompressWorkers int, ) respSet { ret := &eagerRespSet{ span: span, @@ -690,6 +695,7 @@ func newEagerRespSet( frameTimeout: frameTimeout, ctx: ctx, bufferedResponses: []*storepb.SeriesResponse{}, + symbolTables: []map[uint64]string{}, wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, removeLabels: removeLabels, @@ -807,6 +813,7 @@ func newEagerRespSet( } resp = storepb.NewHintsSeriesResponse(anyHints) + l.symbolTables = append(l.symbolTables, adjustedTable) } l.bufferedResponses = append(l.bufferedResponses, resp) @@ -831,6 +838,53 @@ func newEagerRespSet( sortWithoutLabels(l.bufferedResponses, l.removeLabels) } + if len(l.symbolTables) > 0 { + // These can be interleaved with non-compressed series + // and we care about order so we have to step through all series. + workerCount := 1 + (len(l.bufferedResponses) / 2000000) + if workerCount == 1 || maxDecompressWorkers < 1 { + for i := range l.bufferedResponses { + decodedSeries, err := tryDecodeCompressedSeriesResponse(l.bufferedResponses[i], l.symbolTables) + if err != nil { + err = errors.Wrapf(err, "failed to decompress %d series from %s", i, st.String()) + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) + l.span.SetTag("err", err.Error()) + return + } + l.bufferedResponses[i] = decodedSeries + } + } + + if maxDecompressWorkers > 0 && workerCount > maxDecompressWorkers { + workerCount = maxDecompressWorkers + } + + mapper := iter.Mapper[*storepb.SeriesResponse, *storepb.SeriesResponse]{ + MaxGoroutines: workerCount, + } + + responses, err := mapper.MapErr(l.bufferedResponses, func(r **storepb.SeriesResponse) (*storepb.SeriesResponse, error) { + decodedSeries, err := tryDecodeCompressedSeriesResponse(*r, l.symbolTables) + if err != nil { + return nil, errors.Wrap(err, "failed to decompress series") + } + return decodedSeries, nil + }) + l.bufferedResponses = responses + if err == nil { + return + } + // Clear out failures. This is quite bad for performance. Hope that there won't be too many of them. + for ri := 0; ri < len(l.bufferedResponses); ri++ { + if l.bufferedResponses[ri] != nil { + continue + } + l.bufferedResponses = append(l.bufferedResponses[:ri], l.bufferedResponses[ri+1:]...) + ri-- + } + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) + l.span.SetTag("err", err.Error()) + } }(st, ret) return ret @@ -918,3 +972,45 @@ type respSet interface { Labelset() string Empty() bool } + +func tryDecodeCompressedSeriesResponse(r *storepb.SeriesResponse, symbolTables []map[uint64]string) (*storepb.SeriesResponse, error) { + if r.GetCompressedSeries() == nil { + return r, nil + } + + cs := r.GetCompressedSeries() + + newSeries := &storepb.Series{ + Chunks: cs.Chunks, + } + + lbls := make(labels.Labels, 0, len(cs.Labels)) + + for _, cLabel := range cs.Labels { + var name, val string + for _, symTable := range symbolTables { + if foundName, ok := symTable[uint64(cLabel.NameRef)]; ok { + name = foundName + } + + if foundValue, ok := symTable[uint64(cLabel.ValueRef)]; ok { + val = foundValue + } + + if name != "" && val != "" { + break + } + } + if name == "" || val == "" { + return nil, fmt.Errorf("series %+v references do not exist", cLabel) + } + + lbls = append(lbls, labels.Label{ + Name: name, + Value: val, + }) + } + + newSeries.Labels = labelpb.ZLabelsFromPromLabels(lbls) + return storepb.NewSeriesResponse(newSeries), nil +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 413f9952c5..04e265a263 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -66,7 +66,7 @@ func TestProxyStore_Info(t *testing.T) { func() []Client { return nil }, component.Query, nil, 0*time.Second, RetrievalStrategy(EagerRetrieval), - false, + 0, ) resp, err := q.Info(ctx, &storepb.InfoRequest{}) @@ -608,7 +608,7 @@ func TestProxyStore_Series(t *testing.T) { func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, - 5*time.Second, strategy, false, + 5*time.Second, strategy, 0, ) ctx := context.Background() @@ -1143,7 +1143,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { component.Query, tc.selectorLabels, 4*time.Second, strategy, - false, + 0, ) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -1202,7 +1202,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { component.Query, nil, 1*time.Second, EagerRetrieval, - false, + 0, ) ctx := context.Background() @@ -1265,7 +1265,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { component.Query, labels.FromStrings("fed", "a"), 5*time.Second, EagerRetrieval, - false, + 0, ) ctx := context.Background() @@ -1313,7 +1313,7 @@ func TestProxyStore_LabelValues(t *testing.T) { component.Query, nil, 0*time.Second, EagerRetrieval, - false, + 0, ) ctx := context.Background() @@ -1514,7 +1514,7 @@ func TestProxyStore_LabelNames(t *testing.T) { component.Query, nil, 5*time.Second, EagerRetrieval, - false, + 0, ) ctx := context.Background()