Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: fix decompression after recent changes #47

Open
wants to merge 1 commit into
base: v0.31.0+vinted
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func runQuery(
selectorLset,
storeResponseTimeout,
store.RetrievalStrategy(grpcProxyStrategy),
maxConcurrentDecompressWorkers > 0,
maxConcurrentDecompressWorkers,
)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func runReceive(
labels.Labels{},
0,
store.LazyRetrieval,
false,
0,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a8dfcb80d3a7 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
Expand All @@ -218,6 +217,7 @@ require (
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/shirou/gopsutil/v3 v3.22.9 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sourcegraph/conc v0.3.0
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 // indirect
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -828,8 +827,7 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rueian/rueidis v0.0.93 h1:cG905akj2+QyHx0x9y4mN0K8vLi6M94QiyoLulXS3l0=
Expand Down Expand Up @@ -863,6 +861,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestGRPCQueryAPIErrorHandling(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, false)
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, 0)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, 0)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
nil,
0,
store.EagerRetrieval,
false,
0,
)
}

Expand Down
119 changes: 0 additions & 119 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package query

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -25,7 +23,6 @@ import (
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -221,118 +218,6 @@ type seriesServer struct {
compressedSeriesSet []storepb.CompressedSeries
}

func (s *seriesServer) decompressSeriesIndex(i int) (*storepb.Series, error) {
newSeries := &storepb.Series{
Chunks: s.compressedSeriesSet[i].Chunks,
}

lbls := make(labels.Labels, 0, len(s.compressedSeriesSet[i].Labels))

for _, cLabel := range s.compressedSeriesSet[i].Labels {
var name, val string
for _, symTable := range s.symbolTables {
if foundName, ok := symTable[uint64(cLabel.NameRef)]; ok {
name = foundName
}

if foundValue, ok := symTable[uint64(cLabel.ValueRef)]; ok {
val = foundValue
}

if name != "" && val != "" {
break
}
}
if name == "" || val == "" {
return nil, fmt.Errorf("series %+v references do not exist", cLabel)
}

lbls = append(lbls, labels.Label{
Name: name,
Value: val,
})
}

newSeries.Labels = labelpb.ZLabelsFromPromLabels(lbls)
return newSeries, nil
}

func (s *seriesServer) DecompressSeries(maxWorkers int) error {
if len(s.compressedSeriesSet) == 0 {
return nil
}

workerInput := make(chan int)
workerOutput := make(chan *storepb.Series)

var elements uint64
for _, css := range s.compressedSeriesSet {
elements += uint64(len(css.Labels)) * 2
}

newSeriesSet := make([]storepb.Series, 0, len(s.seriesSet)+len(s.compressedSeriesSet))
newSeriesSet = append(newSeriesSet, s.seriesSet...)

// NOTE(GiedriusS): Ballpark estimate. With more workers I got slower results.
workerCount := 1 + (elements / 2000000)
if workerCount == 1 || maxWorkers < 1 {
for i := range s.compressedSeriesSet {
decompressedSeries, err := s.decompressSeriesIndex(i)
if err != nil {
return fmt.Errorf("decompressing element %d: %w", i, err)
}

newSeriesSet = append(newSeriesSet, *decompressedSeries)
}
s.seriesSet = newSeriesSet
return nil
}

if maxWorkers > 0 && workerCount > uint64(maxWorkers) {
workerCount = uint64(maxWorkers)
}

wg := &sync.WaitGroup{}
errLock := sync.Mutex{}
errs := tsdb_errors.NewMulti()

for i := uint64(0); i < workerCount; i++ {
wg.Add(1)

go func() {
defer wg.Done()
for ind := range workerInput {
decompressedSeries, err := s.decompressSeriesIndex(ind)
if err != nil {
errLock.Lock()
errs.Add(fmt.Errorf("decompressing element %d: %w", ind, err))
errLock.Unlock()
continue
}

workerOutput <- decompressedSeries
}
}()
}

go func() {
for i := range s.compressedSeriesSet {
workerInput <- i
}

close(workerInput)
wg.Wait()
close(workerOutput)
}()

for wo := range workerOutput {
newSeriesSet = append(newSeriesSet, *wo)
}
s.seriesSet = newSeriesSet

return errs.Err()
}

func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
if r.GetWarning() != "" {
s.warnings = append(s.warnings, r.GetWarning())
Expand Down Expand Up @@ -514,10 +399,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns = append(warns, errors.New(w))
}

if err := resp.DecompressSeries(q.maxConcurrentDecompressWorkers); err != nil {
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "decompressing series")
}

// Delete the metric's name from the result because that's what the
// PromQL does either way and we want our iterator to work with data
// that was either pushed down or not.
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore {
nil,
0,
store.EagerRetrieval,
false,
0,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestQuerier_Proxy(t *testing.T) {
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return clients },
component.Debug, nil, 5*time.Minute, store.EagerRetrieval, false),
component.Debug, nil, 5*time.Minute, store.EagerRetrieval, 0),
1000000,
5*time.Minute,
0,
Expand Down
28 changes: 17 additions & 11 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ type ProxyStore struct {
selectorLabels labels.Labels
buffers sync.Pool

responseTimeout time.Duration
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
enableCompressedRetrieval bool
responseTimeout time.Duration
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
maxDecompressWorkers int
}

type proxyStoreMetrics struct {
Expand Down Expand Up @@ -110,7 +110,7 @@ func NewProxyStore(
selectorLabels labels.Labels,
responseTimeout time.Duration,
retrievalStrategy RetrievalStrategy,
enableCompressedRetrieval bool,
maxDecompressWorkers int,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -126,10 +126,10 @@ func NewProxyStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
enableCompressedRetrieval: enableCompressedRetrieval,
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
maxDecompressWorkers: maxDecompressWorkers,
}
return s
}
Expand Down Expand Up @@ -289,8 +289,14 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
}

// Zero maximum slots indicated that we want uncompressed data.
if s.enableCompressedRetrieval {
if s.maxDecompressWorkers > 0 {
r.MaximumStringSlots = maxStringsPerStore(uint64(len(stores)))
// NOTE(GiedriusS): we force eager retrieval here
// because with symbol tabels either way we have to wait
// till the end to build the initial structure of the heap
// so switch to eager retrieval because there won't be any
// gain from laziness.
s.retrievalStrategy = EagerRetrieval
}
adjusterFactory := newReferenceAdjusterFactory(uint64(len(stores)))

Expand All @@ -302,7 +308,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

adjuster := adjusterFactory(uint64(storeIndex))
respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, adjuster)
respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, adjuster, s.maxDecompressWorkers)
if err != nil {
level.Error(reqLogger).Log("err", err)

Expand Down
Loading