From 48b93628244209313d1241f6910f23acd1feefc4 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 23 Mar 2020 19:46:34 +0000 Subject: [PATCH] querier: Added ProxySeries and dedup iters Benchmarks. Signed-off-by: Bartlomiej Plotka --- pkg/query/iter_bench_test.go | 192 ++++++++++++++++++++++++++ pkg/store/bucket_test.go | 148 ++++++-------------- pkg/store/postings_codec_test.go | 5 +- pkg/store/prometheus_test.go | 5 - pkg/store/proxy.go | 4 + pkg/store/proxy_bench_test.go | 227 +++++++++++++++++++++++++++++++ pkg/store/proxy_test.go | 44 +++--- pkg/store/storepb/custom_test.go | 56 ++++++-- pkg/testutil/benchutil/series.go | 136 ++++++++++++++++++ pkg/testutil/testutil.go | 22 +-- 10 files changed, 677 insertions(+), 162 deletions(-) create mode 100644 pkg/query/iter_bench_test.go create mode 100644 pkg/store/proxy_bench_test.go create mode 100644 pkg/testutil/benchutil/series.go diff --git a/pkg/query/iter_bench_test.go b/pkg/query/iter_bench_test.go new file mode 100644 index 0000000000..c0bc952106 --- /dev/null +++ b/pkg/query/iter_bench_test.go @@ -0,0 +1,192 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package query + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" +) + +func TestStoreSeriesSet(t *testing.T) { + tb := testutil.NewTB(t) + tb.Run(benchutil.OneSampleSeriesSubTestName(200e3), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 200e3, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(200e3), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 200e3, benchutil.SamplesDimension) + }) +} + +func BenchmarkStoreSeriesSet(b *testing.B) { + tb := testutil.NewTB(b) + tb.Run(benchutil.OneSampleSeriesSubTestName(10e6), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 10e6, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(100e6), func(tb testutil.TB) { + // 100e6 samples = ~17361 days with 15s scrape. + benchStoreSeriesSet(tb, 100e6, benchutil.SamplesDimension) + }) +} + +func benchStoreSeriesSet(t testutil.TB, number int, dimension benchutil.Dimension) { + const numOfClients = 4 + + var ( + numberPerClient = number / 4 + series []storepb.Series + lbls = labels.FromStrings("ext1", "1", "foo", "bar", "i", postingsBenchSuffix) + random = rand.New(rand.NewSource(120)) + ) + switch dimension { + case seriesDimension: + series = make([]storepb.Series, 0, numOfClients*numberPerClient) + case samplesDimension: + series = []storepb.Series{newSeries(t, lbls, nil)} + default: + t.Fatal("unknown dimension", dimension) + } + + var lset storepb.LabelSet + for _, l := range lbls { + lset.Labels = append(lset.Labels, storepb.Label{Name: l.Name, Value: l.Value}) + } + + // Build numOfClients of clients. + clients := make([]Client, numOfClients) + resps := make([]*storepb.SeriesResponse, numOfClients) + for j := range clients { + switch dimension { + case seriesDimension: + fmt.Println("Building client with numSeries:", numberPerClient) + + for _, s := range createSeriesWithOneSample(t, j, numberPerClient, nopAppender{}) { + series = append(series, s) + resps = append(resps, storepb.NewSeriesResponse(&s)) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps[j*numberPerClient : j*numberPerClient+numberPerClient], + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: int64(j * numberPerClient), + maxTime: series[len(series)-1].Chunks[len(series[len(series)-1].Chunks)-1].MaxTime, + } + case samplesDimension: + fmt.Println("Building client with one series with numSamples:", numberPerClient) + + lblsSize := 0 + for _, l := range lset.Labels { + lblsSize += l.Size() + } + + c := chunkenc.NewXORChunk() + a, err := c.Appender() + testutil.Ok(t, err) + + sBytes := lblsSize + lastResps := 0 + + i := 0 + samples := 0 + nextTs := int64(j * numberPerClient) + for { + a.Append(int64(j*numberPerClient+i), random.Float64()) + i++ + samples++ + if i < numOfClients && samples < maxSamplesPerChunk { + continue + } + + series[0].Chunks = append(series[0].Chunks, storepb.AggrChunk{ + MinTime: nextTs, + MaxTime: int64(j*numberPerClient + i - 1), + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + }) + sBytes += len(c.Bytes()) + + // Compose many frames as remote read would do (so sidecar StoreAPI): 1048576 + if i >= numOfClients || sBytes >= 1048576 { + resps = append(resps, storepb.NewSeriesResponse(&storepb.Series{ + Labels: lset.Labels, + Chunks: series[0].Chunks[lastResps:], + })) + lastResps = len(series[0].Chunks) - 1 + } + if i >= numOfClients { + break + } + + nextTs = int64(j*numberPerClient + i) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps[j*numberPerClient : j*numberPerClient+numberPerClient], + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: int64(j * numberPerClient), + maxTime: nextTs, + } + default: + t.Fatal("unknown dimension", dimension) + } + } + + logger := log.NewNopLogger() + store := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + maxTime := series[len(series)-1].Chunks[len(series[len(series)-1].Chunks)-1].MaxTime + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("%d of client with %d each, total %d", numOfClients, numberPerClient, number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: series, + }, + ) + + // Change client to just one. + clients = clients[1:] + clients[0] = &testClient{ + StoreClient: &mockedStoreAPI{ + // All responses. + RespSeries: resps, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: 0, + maxTime: maxTime, + } + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("single client with %d", number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: series, + }, + ) +} diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a7829bf052..f365ab0be1 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -33,7 +33,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" @@ -48,6 +47,7 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "gopkg.in/yaml.v2" ) @@ -920,11 +920,6 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) { benchmarkExpandedPostings(tb, bkt, id, r, 50e5) } -// Make entries ~50B in size, to emulate real-world high cardinality. -const ( - postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" -) - func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series int) ulid.ULID { h, err := tsdb.NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) @@ -959,12 +954,12 @@ func appendTestData(t testing.TB, app tsdb.Appender, series int) { series = series / 5 for n := 0; n < 10; n++ { for i := 0; i < series/10; i++ { - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+benchutil.LabelLongSuffix, "n", strconv.Itoa(n)+benchutil.LabelLongSuffix, "j", "foo")) // Have some series that won't be matched, to properly test inverted matches. - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+benchutil.LabelLongSuffix, "n", strconv.Itoa(n)+benchutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+benchutil.LabelLongSuffix, "n", "0_"+strconv.Itoa(n)+benchutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+benchutil.LabelLongSuffix, "n", "1_"+strconv.Itoa(n)+benchutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+benchutil.LabelLongSuffix, "n", "2_"+strconv.Itoa(n)+benchutil.LabelLongSuffix, "j", "foo")) } } testutil.Ok(t, app.Commit()) @@ -992,7 +987,7 @@ func benchmarkExpandedPostings( r indexheader.Reader, series int, ) { - n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+benchutil.LabelLongSuffix) jFoo := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") @@ -1002,7 +997,7 @@ func benchmarkExpandedPostings( i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$") iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$") iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "") - iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix) + iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+benchutil.LabelLongSuffix) iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$") series = series / 5 @@ -1053,106 +1048,43 @@ func benchmarkExpandedPostings( } } -func newSeries(t testing.TB, lset labels.Labels, smplChunks [][]sample) storepb.Series { - var s storepb.Series - - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) - } - - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - testutil.Ok(t, err) - - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } - - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } - - s.Chunks = append(s.Chunks, ch) - } - return s -} - -func TestSeries(t *testing.T) { +func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) - tb.Run("200e3SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 200e3, seriesDimension, 200e3) + tb.Run(benchutil.OneSampleSeriesSubTestName(200e3), func(tb testutil.TB) { + benchBucketSeries(tb, 200e3, benchutil.SeriesDimension, 200e3) }) - tb.Run("OneSeriesWith200e3Samples", func(tb testutil.TB) { - benchSeries(tb, 200e3, samplesDimension, 200e3) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(200e3), func(tb testutil.TB) { + benchBucketSeries(tb, 200e3, benchutil.SamplesDimension, 200e3) }) } -func BenchmarkSeries(b *testing.B) { +func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) - tb.Run("10e6SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 10e6, seriesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5) // This is too big for my machine: 10e6. + tb.Run(benchutil.OneSampleSeriesSubTestName(10e6), func(tb testutil.TB) { + benchBucketSeries(tb, 10e6, benchutil.SeriesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5) // This is too big for my machine: 10e6. }) - tb.Run("OneSeriesWith100e6Samples", func(tb testutil.TB) { + tb.Run(benchutil.OneSeriesManySamplesSubTestName(100e6), func(tb testutil.TB) { // 100e6 samples = ~17361 days with 15s scrape. - benchSeries(tb, 100e6, samplesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5, 10e6) // This is too big for my machine: 100e6. + benchBucketSeries(tb, 100e6, benchutil.SamplesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5, 10e6) // This is too big for my machine: 100e6. }) } func createBlockWithOneSample(t testutil.TB, dir string, blockIndex int, totalSeries int) (ulid.ULID, []storepb.Series) { - fmt.Println("Building block with numSeries:", totalSeries) - - var series []storepb.Series - h, err := tsdb.NewHead(nil, nil, nil, 1) - testutil.Ok(t, err) + h, series := benchutil.CreateSeriesWithOneSample(t, blockIndex, totalSeries) defer testutil.Ok(t, h.Close()) - app := h.Appender() - - for i := 0; i < totalSeries; i++ { - ts := int64(blockIndex*totalSeries + i) - lbls := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) - series = append(series, newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), [][]sample{{sample{t: ts, v: 0}}})) - - _, err := app.Add(lbls, ts, 0) - testutil.Ok(t, err) - } - testutil.Ok(t, app.Commit()) - return createBlockFromHead(t, dir, h), series } -func createBlockWithOneSeries(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand) ulid.ULID { - fmt.Println("Building block with one series with numSamples:", totalSamples) - - h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)) - testutil.Ok(t, err) +func createBlockWithOneSeries(t testutil.TB, dir string, blockIndex int, totalSamples int, random *rand.Rand) ulid.ULID { + h := benchutil.CreateOneSeriesWithManySamples(t, blockIndex, totalSamples, random) defer testutil.Ok(t, h.Close()) - app := h.Appender() - - ref, err := app.Add(lbls, int64(blockIndex*totalSamples), random.Float64()) - testutil.Ok(t, err) - for i := 1; i < totalSamples; i++ { - ts := int64(blockIndex*totalSamples + i) - testutil.Ok(t, app.AddFast(ref, ts, random.Float64())) - } - testutil.Ok(t, app.Commit()) - return createBlockFromHead(t, dir, h) } -type Dimension string - -const ( - seriesDimension = Dimension("series") - samplesDimension = Dimension("samples") -) - -func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { - tmpDir, err := ioutil.TempDir("", "testorbench-series") +func benchBucketSeries(t testutil.TB, number int, dimension benchutil.Dimension, cases ...int) { + tmpDir, err := ioutil.TempDir("", "testorbench-bucketseries") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1168,12 +1100,11 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { ) numberPerBlock := number / 4 - lbls := labels.FromStrings("foo", "bar", "i", postingsBenchSuffix) switch dimension { - case seriesDimension: + case benchutil.SeriesDimension: series = make([]storepb.Series, 0, 4*numberPerBlock) - case samplesDimension: - series = []storepb.Series{newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), nil)} + case benchutil.SamplesDimension: + series = []storepb.Series{benchutil.SingleSeries} default: t.Fatal("unknown dimension", dimension) } @@ -1200,12 +1131,12 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { // TODO(bwplotka): Provide them in objstore instead?. if t.IsBenchmark() { switch dimension { - case seriesDimension: + case benchutil.SeriesDimension: p := filepath.Join(".", "test-data", "10e6seriesOneSample") if _, err := os.Stat(p); err == nil { blockDir = p } - case samplesDimension: + case benchutil.SamplesDimension: p := filepath.Join(".", "test-data", "1series100e6Samples") if _, err := os.Stat(p); err == nil { blockDir = p @@ -1234,7 +1165,7 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { var id ulid.ULID switch dimension { - case seriesDimension: + case benchutil.SeriesDimension: if len(preBuildBlockIDs) > 0 { id = preBuildBlockIDs[bi] fmt.Println("Using pre-build block:", id) @@ -1245,7 +1176,7 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { // This allows to pick time range that will correspond to number of series picked 1:1. id, bSeries = createBlockWithOneSample(t, blockDir, bi, numberPerBlock) series = append(series, bSeries...) - case samplesDimension: + case benchutil.SamplesDimension: if len(preBuildBlockIDs) > 0 { id = preBuildBlockIDs[bi] fmt.Println("Using pre-build block:", id) @@ -1253,7 +1184,7 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { // Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only. // Timestamp will be counted for each new series, so each series will have unique timestamp. // This allows to pick time range that will correspond to number of series picked 1:1. - id = createBlockWithOneSeries(t, blockDir, lbls, bi, numberPerBlock, random) + id = createBlockWithOneSeries(t, blockDir, bi, numberPerBlock, random) } if !t.IsBenchmark() { @@ -1331,9 +1262,9 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { var expected []storepb.Series switch dimension { - case seriesDimension: + case benchutil.SeriesDimension: expected = series[:c] - case samplesDimension: + case benchutil.SamplesDimension: expected = series } @@ -1350,8 +1281,7 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { }) } - fmt.Println("Starting") - benchmarkSeries(t, store, bCases) + benchmarkSeries(t, store, bCases...) if !t.IsBenchmark() { // Make sure the pool is correctly used. This is expected for 200k numbers. testutil.Equals(t, 4, int(chunkPool.(*mockedPool).gets)) @@ -1413,7 +1343,7 @@ type benchSeriesCase struct { expected []storepb.Series } -func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase) { +func benchmarkSeries(t testutil.TB, store storepb.StoreServer, cases ...*benchSeriesCase) { for _, c := range cases { t.Run(c.name, func(t testutil.TB) { t.ResetTimer() @@ -1425,7 +1355,7 @@ func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase if !t.IsBenchmark() { if len(c.expected) == 1 { - // Chunks are not sorted within response. TODO: Investigate: Is this fine? + // For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine? sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime }) @@ -1440,7 +1370,7 @@ func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase } // Regression test against: https://github.com/thanos-io/thanos/issues/2147. -func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { +func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { tmpDir, err := ioutil.TempDir("", "segfault-series") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1485,7 +1415,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, benchutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) @@ -1523,7 +1453,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, benchutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index adb9f57d4b..e93d54f892 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -30,7 +31,7 @@ func TestDiffVarintCodec(t *testing.T) { postingsMap := map[string]index.Postings{ "all": allPostings(t, idx), - `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+benchutil.LabelLongSuffix)), `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), @@ -38,7 +39,7 @@ func TestDiffVarintCodec(t *testing.T) { `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), - `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+benchutil.LabelLongSuffix)), `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index ebcec0126d..ec69bf5ad5 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -166,11 +166,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { } } -type sample struct { - t int64 - v float64 -} - func expandChunk(cit chunkenc.Iterator) (res []sample) { for cit.Next() { t, v := cit.At() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 8440de2f96..ed5cd4292e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -293,6 +293,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return nil } + // TODO(bwplotka): Currently we stream into big frames. Consider ensuring 1MB maximum. + // This however does not matter much when used with QueryAPI. Matters for federated Queries a lot. + // https://github.com/thanos-io/thanos/issues/2332 + // Series are not necessarily merged across themselves. mergedSet := storepb.MergeSeriesSets(seriesSet...) for mergedSet.Next() { var series storepb.Series diff --git a/pkg/store/proxy_bench_test.go b/pkg/store/proxy_bench_test.go new file mode 100644 index 0000000000..cf1e864a25 --- /dev/null +++ b/pkg/store/proxy_bench_test.go @@ -0,0 +1,227 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "fmt" + "math" + "math/rand" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" +) + +func TestProxySeries(t *testing.T) { + tb := testutil.NewTB(t) + tb.Run(benchutil.OneSampleSeriesSubTestName(200e3), func(tb testutil.TB) { + benchProxySeries(tb, 200e3, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(200e3), func(tb testutil.TB) { + benchProxySeries(tb, 200e3, benchutil.SamplesDimension) + }) +} + +func BenchmarkProxySeries(b *testing.B) { + tb := testutil.NewTB(b) + tb.Run(benchutil.OneSampleSeriesSubTestName(10e6), func(tb testutil.TB) { + benchProxySeries(tb, 10e6, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(100e6), func(tb testutil.TB) { + // 100e6 samples = ~17361 days with 15s scrape. + benchProxySeries(tb, 100e6, benchutil.SamplesDimension) + }) +} + +func benchProxySeries(t testutil.TB, number int, dimension benchutil.Dimension) { + const numOfClients = 4 + + var ( + numberPerClient = number / 4 + random = rand.New(rand.NewSource(120)) + ) + + // Build numOfClients of clients. + clients := make([]Client, numOfClients) + + for j := range clients { + var resps []*storepb.SeriesResponse + + switch dimension { + case benchutil.SeriesDimension: + fmt.Println("Building client with numSeries:", numberPerClient) + + h, created := benchutil.CreateSeriesWithOneSample(t, j, numberPerClient) + testutil.Ok(t, h.Close()) + + for i := 0; i < len(created); i++ { + resps = append(resps, storepb.NewSeriesResponse(&created[i])) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + case benchutil.SamplesDimension: + fmt.Println("Building client with one series with numSamples:", numberPerClient) + + lblsSize := 0 + for _, l := range benchutil.SingleSeries.Labels { + lblsSize += l.Size() + } + func() { + h := benchutil.CreateOneSeriesWithManySamples(t, j, numberPerClient, random) + defer h.Close() + + chks, err := h.Chunks() + testutil.Ok(t, err) + + ir, err := h.Index() + testutil.Ok(t, err) + defer ir.Close() + + var ( + lset labels.Labels + chunkMetas []chunks.Meta + sBytes = lblsSize + ) + + all := allPostings(t, ir) + for all.Next() { + testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) + + i := 0 + r := storepb.NewSeriesResponse(&storepb.Series{ + Labels: storepb.PromLabelsToLabelsUnsafe(lset), + }) + for { + c := chunkMetas[i] + i++ + + chBytes, err := chks.Chunk(c.Ref) + testutil.Ok(t, err) + + sBytes += len(chBytes.Bytes()) + + r.GetSeries().Chunks = append(r.GetSeries().Chunks, storepb.AggrChunk{ + MinTime: c.MinTime, + MaxTime: c.MaxTime, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chBytes.Bytes()}, + }) + + // Compose many frames as remote read would do (so sidecar StoreAPI): 1048576 + if i >= len(chunkMetas) || sBytes >= 1048576 { + resps = append(resps, r) + r = storepb.NewSeriesResponse(&storepb.Series{ + Labels: storepb.PromLabelsToLabelsUnsafe(lset), + }) + } + if i >= len(chunkMetas) { + break + } + + } + } + testutil.Ok(t, all.Err()) + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + testutil.Ok(t, h.Close()) + }() + + default: + t.Fatal("unknown dimension", dimension) + } + } + + logger := log.NewNopLogger() + store := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + var resps []*storepb.SeriesResponse + var expected []storepb.Series + lastLabels := storepb.Series{} + for _, c := range clients { + m := c.(*testClient).StoreClient.(*mockedStoreAPI) + + for _, r := range m.RespSeries { + resps = append(resps, r) + + // Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). + // Let's do this here as well. + x := storepb.Series{Labels: r.GetSeries().Labels} + if x.String() == lastLabels.String() { + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) + continue + } + lastLabels = x + expected = append(expected, *r.GetSeries()) + } + + } + + chunkLen := len(resps[len(resps)-1].GetSeries().Chunks) + maxTime := resps[len(resps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("%d of client with %d each, total %d", numOfClients, numberPerClient, number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: expected, + }, + ) + + // Change client to just one. + store.stores = func() []Client { + return []Client{&testClient{ + StoreClient: &mockedStoreAPI{ + // All responses. + RespSeries: resps, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }} + } + + // In this we expect exactly the same response as input. + expected = expected[:0] + for _, r := range resps { + expected = append(expected, *r.GetSeries()) + } + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("single client with %d", number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: expected, + }, + ) +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1e49cce456..41ed876fb5 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -17,9 +17,11 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -1398,35 +1400,35 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { return s, nil } +type sample struct { + t int64 + v float64 +} + +func (s sample) T() int64 { return s.t } +func (s sample) V() float64 { return s.v } + func (c *StoreSeriesClient) Context() context.Context { return c.ctx } -// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. -func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { - var s storepb.Series - - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) - } +type samples []sample - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - testutil.Ok(t, err) +func (s samples) Len() int { return len(s) } +func (s samples) Get(i int) tsdbutil.Sample { + return s[i] +} - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } +type sampleChunks [][]sample - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } +func (c sampleChunks) Len() int { return len(c) } +func (c sampleChunks) Get(i int) benchutil.Samples { + return samples(c[i]) +} - s.Chunks = append(s.Chunks, ch) - } +// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. +func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { + s := benchutil.NewTestSeries(t, lset, sampleChunks(smplChunks)) return storepb.NewSeriesResponse(&s) } diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index cbeaed6950..cd39ccdf8a 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -12,39 +12,65 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil" ) +type listSeriesSet struct { + series []Series + idx int +} + type sample struct { t int64 v float64 } -type listSeriesSet struct { - series []Series - idx int +func (s sample) T() int64 { return s.t } +func (s sample) V() float64 { return s.v } + +type samples []sample + +func (s samples) Len() int { return len(s) } +func (s samples) Get(i int) tsdbutil.Sample { + return s[i] } -func newSeries(tb testing.TB, lset labels.Labels, smplChunks [][]sample) Series { - var s Series +type sampleChunks [][]sample - for _, l := range lset { - s.Labels = append(s.Labels, Label{Name: l.Name, Value: l.Value}) - } +func (c sampleChunks) Len() int { return len(c) } +func (c sampleChunks) Get(i int) Samples { + return samples(c[i]) +} - for _, smpls := range smplChunks { +type Samples interface { + Len() int + Get(int) tsdbutil.Sample +} + +type SampleChunks interface { + Len() int + Get(int) Samples +} + +// newTestSeries is used for tests. The same one is in benchutil package but we need to copy it here to avoid +// cyclic dependencies. +func newTestSeries(t testing.TB, lset labels.Labels, smplChunks SampleChunks) Series { + var s Series + s.Labels = PromLabelsToLabels(lset) + for i := 0; i < smplChunks.Len(); i++ { c := chunkenc.NewXORChunk() a, err := c.Appender() - testutil.Ok(tb, err) + testutil.Ok(t, err) - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) + for j := 0; j < smplChunks.Get(i).Len(); j++ { + a.Append(smplChunks.Get(i).Get(j).T(), smplChunks.Get(i).Get(j).V()) } ch := AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, + MinTime: smplChunks.Get(i).Get(0).T(), + MaxTime: smplChunks.Get(i).Get(smplChunks.Get(i).Len() - 1).T(), Raw: &Chunk{Type: Chunk_XOR, Data: c.Bytes()}, } @@ -56,7 +82,7 @@ func newSeries(tb testing.TB, lset labels.Labels, smplChunks [][]sample) Series func newListSeriesSet(tb testing.TB, raw []rawSeries) *listSeriesSet { var series []Series for _, s := range raw { - series = append(series, newSeries(tb, s.lset, s.chunks)) + series = append(series, newTestSeries(tb, s.lset, sampleChunks(s.chunks))) } return &listSeriesSet{ series: series, diff --git a/pkg/testutil/benchutil/series.go b/pkg/testutil/benchutil/series.go new file mode 100644 index 0000000000..cd5edd695e --- /dev/null +++ b/pkg/testutil/benchutil/series.go @@ -0,0 +1,136 @@ +package benchutil + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +const ( + // LabelLongSuffix is a label with ~50B in size, to emulate real-world high cardinality. + LabelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" +) + +type Dimension string + +const ( + SeriesDimension = Dimension("series") + SamplesDimension = Dimension("samples") +) + +var ( + SingleSeriesInternalLabels = labels.FromStrings("foo", "bar", "i", LabelLongSuffix) + // Don't fill samples, would use too much memory. + SingleSeries = NewTestSeries(nil, append(labels.FromStrings("ext1", "1"), SingleSeriesInternalLabels...), nil) +) + +func OneSampleSeriesSubTestName(seriesNum int) string { + return fmt.Sprintf("%dSeriesWithOneSample", seriesNum) +} + +func OneSeriesManySamplesSubTestName(samplesNum int) string { + return fmt.Sprintf("OneSeriesWith%dSamples", samplesNum) +} + +func CreateSeriesWithOneSample(t testutil.TB, j int, totalSeries int) (*tsdb.Head, []storepb.Series) { + fmt.Println("Creating one-sample series with numSeries:", totalSeries) + + h, err := tsdb.NewHead(nil, nil, nil, 1) + testutil.Ok(t, err) + + app := h.Appender() + series := make([]storepb.Series, totalSeries) + var ts int64 + var lbls labels.Labels + for i := 0; i < totalSeries; i++ { + ts = int64(j*totalSeries + i) + lbls = labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, LabelLongSuffix)) + series[i] = NewTestSeries(t, append(labels.FromStrings("ext1", "1"), lbls...), sampleChunks([][]sample{{sample{t: ts, v: 0}}})) + + _, err := app.Add(lbls, ts, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + return h, series +} + +func CreateOneSeriesWithManySamples(t testutil.TB, j int, totalSamples int, random *rand.Rand) *tsdb.Head { + fmt.Println("Creating one series with numSamples:", totalSamples) + + h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)) + testutil.Ok(t, err) + + app := h.Appender() + + ref, err := app.Add(SingleSeriesInternalLabels, int64(j*totalSamples), random.Float64()) + testutil.Ok(t, err) + for i := 1; i < totalSamples; i++ { + ts := int64(j*totalSamples + i) + testutil.Ok(t, app.AddFast(ref, ts, random.Float64())) + } + testutil.Ok(t, app.Commit()) + return h +} + +type Samples interface { + Len() int + Get(int) tsdbutil.Sample +} + +type SampleChunks interface { + Len() int + Get(int) Samples +} + +// sample struct is always copied as it is used very often, so we want to avoid long `benchutil.Sample` statements in tests. +type sample struct { + t int64 + v float64 +} + +func (s sample) T() int64 { return s.t } +func (s sample) V() float64 { return s.v } + +type samples []sample + +func (s samples) Len() int { return len(s) } +func (s samples) Get(i int) tsdbutil.Sample { + return s[i] +} + +type sampleChunks [][]sample + +func (c sampleChunks) Len() int { return len(c) } +func (c sampleChunks) Get(i int) Samples { + return samples(c[i]) +} + +func NewTestSeries(t testing.TB, lset labels.Labels, smplChunks SampleChunks) storepb.Series { + var s storepb.Series + s.Labels = storepb.PromLabelsToLabels(lset) + for i := 0; smplChunks != nil && i < smplChunks.Len(); i++ { + c := chunkenc.NewXORChunk() + a, err := c.Appender() + testutil.Ok(t, err) + + for j := 0; j < smplChunks.Get(i).Len(); j++ { + a.Append(smplChunks.Get(i).Get(j).T(), smplChunks.Get(i).Get(j).V()) + } + + ch := storepb.AggrChunk{ + MinTime: smplChunks.Get(i).Get(0).T(), + MaxTime: smplChunks.Get(i).Get(smplChunks.Get(i).Len() - 1).T(), + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + } + + s.Chunks = append(s.Chunks, ch) + } + return s +} diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index e1ba298d7e..344d70e531 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -5,9 +5,7 @@ package testutil import ( "fmt" - "path/filepath" "reflect" - "runtime" "testing" "github.com/davecgh/go-spew/spew" @@ -18,58 +16,62 @@ import ( // Assert fails the test if the condition is false. func Assert(tb testing.TB, condition bool, v ...interface{}) { + tb.Helper() + if condition { return } - _, file, line, _ := runtime.Caller(1) var msg string if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...) + tb.Fatalf("\033[31m" + msg + "\033[39m\n\n") } // Ok fails the test if an err is not nil. func Ok(tb testing.TB, err error, v ...interface{}) { + tb.Helper() + if err == nil { return } - _, file, line, _ := runtime.Caller(1) var msg string if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error()) + tb.Fatalf("\033[31m"+msg+"\n\n unexpected error: %s\033[39m\n\n", err.Error()) } // NotOk fails the test if an err is nil. func NotOk(tb testing.TB, err error, v ...interface{}) { + tb.Helper() + if err != nil { return } - _, file, line, _ := runtime.Caller(1) var msg string if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n expected error, got nothing \033[39m\n\n", filepath.Base(file), line) + tb.Fatalf("\033[31m" + msg + "\n\n expected error, got nothing \033[39m\n\n") } // Equals fails the test if exp is not equal to act. func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { + tb.Helper() + if reflect.DeepEqual(exp, act) { return } - _, file, line, _ := runtime.Caller(1) var msg string if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n\texp: %#v\n\n\tgot: %#v%s\033[39m\n\n", filepath.Base(file), line, exp, act, diff(exp, act)) + tb.Fatalf("\033[31m"+msg+"\n\n\texp: %#v\n\n\tgot: %#v%s\033[39m\n\n", exp, act, diff(exp, act)) } func typeAndKind(v interface{}) (reflect.Type, reflect.Kind) {