diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b1fbd898a25..c69f3e3d07d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1024,14 +1024,16 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag hasher := hashPool.Get().(hash.Hash64) defer hashPool.Put(hasher) - if in.Encoding() == chunkenc.EncXOR { + if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram { b, err := save(in.Bytes()) if err != nil { return err } - out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} + storeEnc := storepb.Chunk_Encoding(in.Encoding() - 1) + out.Raw = &storepb.Chunk{Type: storeEnc, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} return nil } + if in.Encoding() != downsample.ChunkEncAggr { return errors.Errorf("unsupported chunk encoding %d", in.Encoding()) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c115be4d6b7..7aab0121986 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -30,7 +30,6 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" @@ -45,6 +44,7 @@ import ( "github.com/thanos-io/objstore/providers/filesystem" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -1238,14 +1238,21 @@ func benchmarkExpandedPostings( func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1) + }) +} + +func TestBucketHistogramSeries(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1) }) } func TestBucketSkipChunksSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, true, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1) }) } @@ -1253,7 +1260,7 @@ func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } @@ -1261,11 +1268,11 @@ func BenchmarkBucketSkipChunksSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { +func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { const numOfBlocks = 4 tmpDir := t.TempDir() @@ -1303,17 +1310,24 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer // Timestamp will be counted for each new series and new sample, so each each series will have unique timestamp. // This allows to pick time range that will correspond to number of series picked 1:1. for bi := 0; bi < numOfBlocks; bi++ { - head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{ + head, _ := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)), SamplesPerSeries: samplesPerSeriesPerBlock, Series: seriesPerBlock, PrependLabels: extLset, Random: random, SkipChunks: t.IsBenchmark() || skipChunk, + SampleType: sampleType, }) id := createBlockFromHead(t, blockDir, head) testutil.Ok(t, head.Close()) - series = append(series, bSeries...) + + // Histogram chunks are represented differently in memory and on disk. In order to + // have a precise comparison, we need to use the on-disk representation as the expected value + // instead of the in-memory one. + diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil) + testutil.Ok(t, err) + series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...) meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index 64baf181d5e..083eb5d9d51 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -16,6 +16,11 @@ import ( "time" "github.com/cespare/xxhash" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/gogo/protobuf/types" "github.com/prometheus/prometheus/model/labels" @@ -25,6 +30,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -50,6 +56,7 @@ type HeadGenOptions struct { WithWAL bool PrependLabels labels.Labels SkipChunks bool // Skips chunks in returned slice (not in generated head!). + SampleType chunkenc.ValueType Random *rand.Rand } @@ -85,27 +92,26 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = opts.TSDBDir + headOpts.EnableNativeHistograms = *atomic.NewBool(true) h, err := tsdb.NewHead(nil, nil, w, nil, headOpts, nil) testutil.Ok(t, err) app := h.Appender(context.Background()) for i := 0; i < opts.Series; i++ { tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries - ref, err := app.Append( - 0, - labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), - int64(tsLabel)*opts.ScrapeInterval.Milliseconds(), - opts.Random.Float64(), - ) - testutil.Ok(t, err) - - for is := 1; is < opts.SamplesPerSeries; is++ { - _, err := app.Append(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64()) - testutil.Ok(t, err) + switch opts.SampleType { + case chunkenc.ValFloat: + appendFloatSamples(t, app, tsLabel, opts) + case chunkenc.ValHistogram: + appendHistogramSamples(t, app, tsLabel, opts) } } testutil.Ok(t, app.Commit()) + return h, ReadSeriesFromBlock(t, h, opts.PrependLabels, opts.SkipChunks) +} + +func ReadSeriesFromBlock(t testing.TB, h tsdb.BlockReader, extLabels labels.Labels, skipChunks bool) []*storepb.Series { // Use TSDB and get all series for assertion. chks, err := h.Chunks() testutil.Ok(t, err) @@ -118,15 +124,15 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, var ( lset labels.Labels chunkMetas []chunks.Meta - expected = make([]*storepb.Series, 0, opts.Series) + expected = make([]*storepb.Series, 0) ) all := allPostings(t, ir) for all.Next() { testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) - expected = append(expected, &storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(append(opts.PrependLabels.Copy(), lset...))}) + expected = append(expected, &storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(append(extLabels.Copy(), lset...))}) - if opts.SkipChunks { + if skipChunks { continue } @@ -139,15 +145,59 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, c.MaxTime = c.MinTime + int64(chEnc.NumSamples()) - 1 } + storeChunkEnc := storepb.Chunk_Encoding(chEnc.Encoding() - 1) expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{ MinTime: c.MinTime, MaxTime: c.MaxTime, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes(), Hash: xxhash.Sum64(chEnc.Bytes())}, + Raw: &storepb.Chunk{Type: storeChunkEnc, Data: chEnc.Bytes(), Hash: xxhash.Sum64(chEnc.Bytes())}, }) } } testutil.Ok(t, all.Err()) - return h, expected + return expected +} + +func appendFloatSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) { + ref, err := app.Append( + 0, + labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), + int64(tsLabel)*opts.ScrapeInterval.Milliseconds(), + opts.Random.Float64(), + ) + testutil.Ok(t, err) + + for is := 1; is < opts.SamplesPerSeries; is++ { + _, err := app.Append(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64()) + testutil.Ok(t, err) + } +} + +func appendHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) { + sample := &histogram.Histogram{ + Schema: 0, + Count: 9, + Sum: -3.1415, + ZeroCount: 12, + ZeroThreshold: 0.001, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 1}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1}, + } + + ref, err := app.AppendHistogram( + 0, + labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), + int64(tsLabel)*opts.ScrapeInterval.Milliseconds(), + sample, + ) + testutil.Ok(t, err) + + for is := 1; is < opts.SamplesPerSeries; is++ { + _, err := app.AppendHistogram(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), sample) + testutil.Ok(t, err) + } } // SeriesServer is test gRPC storeAPI series server. @@ -252,20 +302,22 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series } // Huge responses can produce unreadable diffs - make it more human readable. - if len(c.ExpectedSeries) > 4 { + if len(c.ExpectedSeries) > 1 { for j := range c.ExpectedSeries { testutil.Equals(t, c.ExpectedSeries[j].Labels, srv.SeriesSet[j].Labels, "%v series chunks mismatch", j) // Check chunks when it is not a skip chunk query if !c.Req.SkipChunks { if len(c.ExpectedSeries[j].Chunks) > 20 { - testutil.Equals(t, len(c.ExpectedSeries[j].Chunks), len(srv.SeriesSet[j].Chunks), "%v series chunks number mismatch", j) + require.Equal(t, len(c.ExpectedSeries[j].Chunks), len(srv.SeriesSet[j].Chunks), "%v series chunks number mismatch", j) + } + for ci := range c.ExpectedSeries[j].Chunks { + testutil.Equals(t, c.ExpectedSeries[j].Chunks[ci], srv.SeriesSet[j].Chunks[ci], "%v series chunks mismatch %v", j, ci) } - testutil.Equals(t, c.ExpectedSeries[j].Chunks, srv.SeriesSet[j].Chunks, "%v series chunks mismatch", j) } } } else { - testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) + require.Equal(t, c.ExpectedSeries, srv.SeriesSet) } var actualHints []hintspb.SeriesResponseHints diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 8dd4405acc5..0856665d7aa 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" @@ -32,9 +33,11 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -53,6 +56,19 @@ const ( PromAddrPlaceHolder = "PROMETHEUS_ADDRESS" ) +var histogramSample = histogram.Histogram{ + Schema: 0, + Count: 9, + Sum: -3.1415, + ZeroCount: 12, + ZeroThreshold: 0.001, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 1}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1}, +} + func PrometheusBinary() string { return "prometheus-" + defaultPrometheusVersion } @@ -362,7 +378,7 @@ func CreateBlock( resolution int64, hashFunc metadata.HashFunc, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc) + return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, chunkenc.ValFloat) } // CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block. @@ -376,7 +392,7 @@ func CreateBlockWithTombstone( resolution int64, hashFunc metadata.HashFunc, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true, hashFunc) + return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true, hashFunc, chunkenc.ValFloat) } // CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each. @@ -393,7 +409,27 @@ func CreateBlockWithBlockDelay( resolution int64, hashFunc metadata.HashFunc, ) (ulid.ULID, error) { - blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc) + return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValFloat) +} + +// CreateHistogramBlockWithDelay writes a block with the given native histogram series and numSamples samples each. +// Samples will be in the time range [mint, maxt). +func CreateHistogramBlockWithDelay( + ctx context.Context, + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + blockDelay time.Duration, + extLset labels.Labels, + resolution int64, + hashFunc metadata.HashFunc, +) (id ulid.ULID, err error) { + return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValHistogram) +} + +func createBlockWithDelay(ctx context.Context, dir string, series []labels.Labels, numSamples int, mint int64, maxt int64, blockDelay time.Duration, extLset labels.Labels, resolution int64, hashFunc metadata.HashFunc, samplesType chunkenc.ValueType) (ulid.ULID, error) { + blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, samplesType) if err != nil { return ulid.ULID{}, errors.Wrap(err, "block creation") } @@ -428,10 +464,12 @@ func createBlock( resolution int64, tombstones bool, hashFunc metadata.HashFunc, + sampleType chunkenc.ValueType, ) (id ulid.ULID, err error) { headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = filepath.Join(dir, "chunks") headOpts.ChunkRange = 10000000000 + headOpts.EnableNativeHistograms = *atomic.NewBool(true) h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) if err != nil { return id, errors.Wrap(err, "create head block") @@ -462,7 +500,16 @@ func createBlock( app := h.Appender(ctx) for _, lset := range batch { - _, err := app.Append(0, lset, t, rand.Float64()) + sort.Slice(lset, func(i, j int) bool { + return lset[i].Name < lset[j].Name + }) + + var err error + if sampleType == chunkenc.ValFloat { + _, err = app.Append(0, lset, t, rand.Float64()) + } else if sampleType == chunkenc.ValHistogram { + _, err = app.AppendHistogram(0, lset, t, &histogramSample) + } if err != nil { if rerr := app.Rollback(); rerr != nil { err = errors.Wrapf(err, "rollback failed: %v", rerr) diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index 30ede9bfd6c..94e41e04171 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/test/e2e/e2ethanos" diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 9a6aaa0b3f5..48c131ce921 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/objstore/client" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" @@ -88,23 +89,27 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) dir := filepath.Join(e.SharedDir(), "tmp") testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) - series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + floatSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + nativeHistogramSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "3")} extLset := labels.FromStrings("ext1", "value1", "replica", "1") extLset2 := labels.FromStrings("ext1", "value1", "replica", "2") extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") extLset4 := labels.FromStrings("ext1", "value1", "replica", "3") + extLset5 := labels.FromStrings("ext1", "value3", "replica", "1") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) t.Cleanup(cancel) now := time.Now() - id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) - id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0, metadata.NoneFunc) + id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0, metadata.NoneFunc) testutil.Ok(t, err) - id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0, metadata.NoneFunc) + id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0, metadata.NoneFunc) testutil.Ok(t, err) - id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) + id4, err := e2eutil.CreateBlock(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id5, err := e2eutil.CreateHistogramBlockWithDelay(ctx, dir, nativeHistogramSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset5, 0, metadata.NoneFunc) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) bkt, err := s3.NewBucketWithConfig(l, @@ -115,13 +120,14 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String())) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String())) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id4.String()), id4.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String())) // Wait for store to sync blocks. // thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta. - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) @@ -143,13 +149,19 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "ext1": "value1", "replica": "2", }, + { + "a": "1", + "b": "3", + "ext1": "value3", + "replica": "1", + }, }, ) - // 2 x postings, 2 x series, 2x chunks. - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_bucket_store_series_data_touched")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_bucket_store_series_data_fetched")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_series_blocks_queried")) + // 2 x postings, 3 x series, 2 x chunks. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_touched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_fetched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_series_blocks_queried")) queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, time.Now, promclient.QueryOptions{ @@ -161,22 +173,27 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "b": "2", "ext1": "value1", }, + { + "a": "1", + "b": "3", + "ext1": "value3", + }, }, ) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_touched")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(8), "thanos_bucket_store_series_data_fetched")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2+2), "thanos_bucket_store_series_blocks_queried")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(18), "thanos_bucket_store_series_data_touched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3+3), "thanos_bucket_store_series_blocks_queried")) }) t.Run("remove meta.json from id1 block", func(t *testing.T) { testutil.Ok(t, bkt.Delete(ctx, filepath.Join(id1.String(), block.MetaFilename))) // Wait for store to sync blocks. // thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta. - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) @@ -192,12 +209,18 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "ext1": "value1", "replica": "2", }, + { + "a": "1", + "b": "3", + "ext1": "value3", + "replica": "1", + }, }, ) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+1), "thanos_bucket_store_series_blocks_queried")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+4), "thanos_bucket_store_series_blocks_queried")) }) t.Run("upload block id5, similar to id1", func(t *testing.T) { - id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0, metadata.NoneFunc) + id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String())) @@ -227,19 +250,25 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "ext1": "value1", "replica": "3", // New block. }, + { + "a": "1", + "b": "3", + "ext1": "value3", + "replica": "1", + }, }, ) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5+2), "thanos_bucket_store_series_blocks_queried")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(11+2), "thanos_bucket_store_series_blocks_queried")) }) t.Run("delete whole id2 block #yolo", func(t *testing.T) { testutil.Ok(t, block.Delete(ctx, l, bkt, id2)) // Wait for store to sync blocks. // thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta. - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1+1), "thanos_bucket_store_block_drops_total")) testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) @@ -254,9 +283,15 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "ext1": "value1", "replica": "3", }, + { + "a": "1", + "b": "3", + "ext1": "value3", + "replica": "1", + }, }, ) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(7+1), "thanos_bucket_store_series_blocks_queried")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(14+1), "thanos_bucket_store_series_blocks_queried")) }) t.Run("negative offset should work", func(t *testing.T) { @@ -271,9 +306,15 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) "ext1": "value1", "replica": "3", }, + { + "a": "1", + "b": "3", + "ext1": "value3", + "replica": "1", + }, }, ) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(7+2), "thanos_bucket_store_series_blocks_queried")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(15+2), "thanos_bucket_store_series_blocks_queried")) }) // TODO(khyati) Let's add some case for compaction-meta.json once the PR will be merged: https://github.com/thanos-io/thanos/pull/2136.