From 21af82b24483f3d4a8f72354626dbc8a9121d7ec Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 17 Aug 2020 18:58:28 +0100 Subject: [PATCH] receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDBStore, fixed multitsdb benchmarks. Fixed https://github.com/thanos-io/thanos/issues/3013 Also: * Fixed other quite big issue with reusing chunk slice. * Fixed framing - previously it was wrongly sending single-chunk frames, taking huge amount of time. Before upgrading go.mod with latest TSDB: TestTSDBStore_SeriesChunkBytesCopied/flush_WAL_and_access_results: tsdb_test.go:487: tsdb_test.go:487: unexpected error: invoked function panicked or caused segmentation fault: runtime error: invalid memory address or nil pointer dereference After all ok. Signed-off-by: Bartlomiej Plotka --- pkg/store/bucket_test.go | 21 ++- pkg/store/multitsdb_test.go | 23 +-- pkg/store/proxy_test.go | 11 +- pkg/store/storepb/testutil/series.go | 30 ++-- pkg/store/tsdb.go | 31 ++-- pkg/store/tsdb_test.go | 247 +++++++++++++++++++++++++++ pkg/testutil/testutil.go | 18 ++ 7 files changed, 325 insertions(+), 56 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 7ea52149125..6a25bfc946a 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1178,7 +1178,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request var ( logger = log.NewNopLogger() blocks []*bucketBlock - series []storepb.Series + series []*storepb.Series random = rand.New(rand.NewSource(120)) ) @@ -1213,7 +1213,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request // This allows to pick time range that will correspond to number of series picked 1:1. for bi := 0; bi < numOfBlocks; bi++ { head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)), SamplesPerSeries: samplesPerSeriesPerBlock, Series: seriesPerBlock, PrependLabels: extLset, @@ -1536,17 +1536,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { } // Create TSDB blocks. - opts := storetestutil.HeadGenOptions{ - Dir: tmpDir, + head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: filepath.Join(tmpDir, "0"), SamplesPerSeries: 1, Series: 2, PrependLabels: extLset, Random: random, - } - head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts) + }) block1 := createBlockFromHead(t, bktDir, head) testutil.Ok(t, head.Close()) - head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts) + head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ + TSDBDir: filepath.Join(tmpDir, "1"), + SamplesPerSeries: 1, + Series: 2, + PrependLabels: extLset, + Random: random, + }) block2 := createBlockFromHead(t, bktDir, head2) testutil.Ok(t, head2.Close()) @@ -1610,7 +1615,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ diff --git a/pkg/store/multitsdb_test.go b/pkg/store/multitsdb_test.go index b4fa2767aa0..fe8c4df418f 100644 --- a/pkg/store/multitsdb_test.go +++ b/pkg/store/multitsdb_test.go @@ -92,29 +92,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB } }() for j := range dbs { + tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j)) + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: tsdbDir, SamplesPerSeries: samplesPerSeriesPerTSDB, Series: seriesPerTSDB, - WithWAL: true, + WithWAL: !flushToBlocks, Random: random, SkipChunks: t.IsBenchmark(), }) - testutil.Ok(t, head.Close()) - - tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j)) - for i := 0; i < len(created); i++ { - resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i])) + resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i])) } if flushToBlocks { - db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) - testutil.Ok(t, err) - - testutil.Ok(t, db.FlushWAL(tmpDir)) - testutil.Ok(t, db.Close()) + _ = createBlockFromHead(t, tsdbDir, head) } + testutil.Ok(t, head.Close()) db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) testutil.Ok(t, err) @@ -129,7 +124,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs }) - var expected []storepb.Series + var expected []*storepb.Series lastLabels := storepb.Series{} for _, resp := range resps { for _, r := range resp { @@ -141,7 +136,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB continue } lastLabels = x - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 3da59567734..c88ddcbdcd9 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -11,6 +11,7 @@ import ( "math" "math/rand" "os" + "path/filepath" "sort" "testing" "time" @@ -1616,7 +1617,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { var resps []*storepb.SeriesResponse head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)), SamplesPerSeries: samplesPerSeriesPerClient, Series: seriesPerClient, MaxFrameBytes: storetestutil.RemoteReadFrameLimit, @@ -1626,7 +1627,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { testutil.Ok(t, head.Close()) for i := 0; i < len(created); i++ { - resps = append(resps, storepb.NewSeriesResponse(&created[i])) + resps = append(resps, storepb.NewSeriesResponse(created[i])) } clients[j] = &testClient{ @@ -1647,7 +1648,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } var allResps []*storepb.SeriesResponse - var expected []storepb.Series + var expected []*storepb.Series lastLabels := storepb.Series{} for _, c := range clients { m := c.(*testClient).StoreClient.(*mockedStoreAPI) @@ -1663,7 +1664,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { continue } lastLabels = x - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } } @@ -1700,7 +1701,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { // In this we expect exactly the same response as input. expected = expected[:0] for _, r := range allResps { - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } storetestutil.TestServerSeries(t, store, &storetestutil.SeriesCase{ diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index 90e11eee182..718bfc98d97 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "math/rand" + "os" "path/filepath" "runtime" "sort" @@ -39,13 +40,13 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { const RemoteReadFrameLimit = 1048576 type HeadGenOptions struct { - Dir string + TSDBDir string SamplesPerSeries, Series int MaxFrameBytes int // No limit by default. WithWAL bool PrependLabels labels.Labels - SkipChunks bool + SkipChunks bool // Skips chunks in returned slice (not in generated head!). Random *rand.Rand } @@ -54,22 +55,23 @@ type HeadGenOptions struct { // Returned series list has "ext1"="1" prepended. Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. // Returned series are frame in same way as remote read would frame them. -func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []storepb.Series) { +func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []*storepb.Series) { if opts.SamplesPerSeries < 1 || opts.Series < 1 { t.Fatal("samples and series has to be 1 or more") } - tsdbDir := filepath.Join(opts.Dir, fmt.Sprintf("%d", j)) - fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, tsdbDir) + fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, opts.TSDBDir) var w *wal.WAL var err error if opts.WithWAL { - w, err = wal.New(nil, nil, filepath.Join(tsdbDir, "wal"), true) + w, err = wal.New(nil, nil, filepath.Join(opts.TSDBDir, "wal"), true) testutil.Ok(t, err) + } else { + testutil.Ok(t, os.MkdirAll(filepath.Join(opts.TSDBDir, "wal"), os.ModePerm)) } - h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil) + h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, opts.TSDBDir, nil, tsdb.DefaultStripeSize, nil) testutil.Ok(t, err) app := h.Appender(context.Background()) @@ -96,7 +98,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, var ( lset labels.Labels chunkMetas []chunks.Meta - expected = make([]storepb.Series, 0, opts.Series) + expected = make([]*storepb.Series, 0, opts.Series) sBytes int ) @@ -105,7 +107,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) i := 0 sLset := storepb.PromLabelsToLabels(lset) - expected = append(expected, storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)}) + expected = append(expected, &storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)}) if opts.SkipChunks { continue @@ -142,7 +144,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, // Compose many frames as remote read (so sidecar StoreAPI) would do if requested by maxFrameBytes. if opts.MaxFrameBytes > 0 && sBytes >= opts.MaxFrameBytes { - expected = append(expected, storepb.Series{Labels: sLset}) + expected = append(expected, &storepb.Series{Labels: sLset}) sBytes = lBytes } } @@ -158,7 +160,7 @@ type SeriesServer struct { ctx context.Context - SeriesSet []storepb.Series + SeriesSet []*storepb.Series Warnings []string HintsSet []*types.Any @@ -178,7 +180,7 @@ func (s *SeriesServer) Send(r *storepb.SeriesResponse) error { } if r.GetSeries() != nil { - s.SeriesSet = append(s.SeriesSet, *r.GetSeries()) + s.SeriesSet = append(s.SeriesSet, r.GetSeries()) return nil } @@ -227,7 +229,7 @@ type SeriesCase struct { Req *storepb.SeriesRequest // Exact expectations are checked only for tests. For benchmarks only length is assured. - ExpectedSeries []storepb.Series + ExpectedSeries []*storepb.Series ExpectedWarnings []string ExpectedHints []hintspb.SeriesResponseHints } @@ -254,7 +256,7 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series testutil.Equals(t, c.ExpectedSeries[0].Chunks[0], srv.SeriesSet[0].Chunks[0]) - // This might give unreadable output for millions of series on fail.. + // This might give unreadable output for millions of series on fail. testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) var actualHints []hintspb.SeriesResponseHints diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index cbc5c7a5e14..bdecb67b9e8 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -116,28 +116,27 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer } defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier series") - var ( - set = q.Select(false, nil, matchers...) - respSeries storepb.Series - ) + set := q.Select(false, nil, matchers...) // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - respSeries.Chunks = respSeries.Chunks[:0] + seriesLabels := storepb.Series{Labels: s.translateAndExtendLabels(series.Labels(), s.externalLabels)} if r.SkipChunks { - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + if err := srv.Send(storepb.NewSeriesResponse(&seriesLabels)); err != nil { return status.Error(codes.Aborted, err.Error()) } continue } - frameBytesLeft := s.maxBytesPerFrame - for _, lbl := range respSeries.Labels { - frameBytesLeft -= lbl.Size() + bytesLeftForChunks := s.maxBytesPerFrame + for _, lbl := range seriesLabels.Labels { + bytesLeftForChunks -= lbl.Size() } + frameBytesLeft := bytesLeftForChunks + // Share? respSeries.Chunks = respSeries.Chunks[:0] + seriesChunks := []storepb.AggrChunk{} chIter := series.Iterator() isNext := chIter.Next() for isNext { @@ -146,15 +145,16 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{ + c := storepb.AggrChunk{ MinTime: chk.MinTime, MaxTime: chk.MaxTime, Raw: &storepb.Chunk{ Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one. Data: chk.Chunk.Bytes(), }, - }) - frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size() + } + frameBytesLeft -= c.Size() + seriesChunks = append(seriesChunks, c) // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. isNext = chIter.Next() @@ -162,10 +162,11 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer continue } - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + if err := srv.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: seriesLabels.Labels, Chunks: seriesChunks})); err != nil { return status.Error(codes.Aborted, err.Error()) } - respSeries.Chunks = respSeries.Chunks[:0] + frameBytesLeft = bytesLeftForChunks + seriesChunks = []storepb.AggrChunk{} } if err := chIter.Err(); err != nil { return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error()) diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 468d9cec032..794012dc52e 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -5,15 +5,23 @@ package store import ( "context" + "fmt" + "io/ioutil" "math" + "math/rand" + "os" + "sort" "testing" "time" "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -395,3 +403,242 @@ func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) { return tsdbStore }) } + +// Regression test for: https://github.com/thanos-io/thanos/issues/3013 . +func TestTSDBStore_SeriesChunkBytesCopied(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + var ( + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + // Generate one series in two parts. Put first part in block, second in just WAL. + head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + Random: random, + SkipChunks: true, + }) + _ = createBlockFromHead(t, tmpDir, head) + testutil.Ok(t, head.Close()) + + head, _ = storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + WithWAL: true, + Random: random, + SkipChunks: true, + }) + testutil.Ok(t, head.Close()) + + db, err := tsdb.OpenDBReadOnly(tmpDir, logger) + testutil.Ok(t, err) + + extLabels := labels.FromStrings("ext", "1") + store := NewTSDBStore(logger, nil, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + + t.Cleanup(func() { + if db != nil { + testutil.Ok(t, db.Close()) + } + }) + + // Call series. + srv := storetestutil.NewSeriesServer(context.Background()) + t.Run("call series and access results", func(t *testing.T) { + testutil.Ok(t, store.Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, srv)) + testutil.Equals(t, 0, len(srv.Warnings)) + testutil.Equals(t, 0, len(srv.HintsSet)) + testutil.Equals(t, 4, len(srv.SeriesSet)) + + // All chunks should be accessible for read and write (copied). + for _, s := range srv.SeriesSet { + testutil.Equals(t, 3, len(s.Chunks)) + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + c.Raw.Data[0] = 0 // Check if we can write to the byte range. + })) + } + } + }) + t.Run("flush WAL and access results", func(t *testing.T) { + testutil.Ok(t, db.FlushWAL(tmpDir)) + + // All chunks should be still accessible for read and write (copied). + for _, s := range srv.SeriesSet { + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + c.Raw.Data[0] = 0 // Check if we can write to the byte range. + })) + } + } + }) + + t.Run("close db with block readers and access results", func(t *testing.T) { + // This should not block, as select finished. + testutil.Ok(t, db.Close()) + db = nil + + // All chunks should be still accessible for read and write (copied). + for _, s := range srv.SeriesSet { + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + c.Raw.Data[0] = 0 // Check if we can write to the byte range. + })) + } + } + }) +} + +func TestTSDBStoreSeries(t *testing.T) { + tb := testutil.NewTB(t) + // Make sure there are more samples, so we can check framing code. + storetestutil.RunSeriesInterestingCases(tb, 10e6, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchTSDBStoreSeries(t, samplesPerSeries, series) + }) +} + +func BenchmarkTSDBStoreSeries(b *testing.B) { + tb := testutil.NewTB(b) + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchTSDBStoreSeries(t, samplesPerSeries, series) + }) +} + +func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { + tmpDir, err := ioutil.TempDir("", "testorbench-testtsdbseries") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + // This means 3 blocks and the head. + const numOfBlocks = 4 + + samplesPerSeriesPerBlock := totalSamples / numOfBlocks + if samplesPerSeriesPerBlock == 0 { + samplesPerSeriesPerBlock = 1 + } + seriesPerBlock := totalSeries / numOfBlocks + if seriesPerBlock == 0 { + seriesPerBlock = 1 + } + + const maxBytesPerFrame = 1024 * 1024 + var ( + resps = make([][]*storepb.SeriesResponse, 4) + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + for j := 0; j < 3; j++ { + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerBlock, + Series: seriesPerBlock, + Random: random, + SkipChunks: t.IsBenchmark(), + MaxFrameBytes: maxBytesPerFrame, + }) + for i := 0; i < len(created); i++ { + resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i])) + } + + _ = createBlockFromHead(t, tmpDir, head) + t.Cleanup(func() { + testutil.Ok(t, head.Close()) + }) + + } + + head2, created := storetestutil.CreateHeadWithSeries(t, 3, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerBlock, + Series: seriesPerBlock, + WithWAL: true, + Random: random, + SkipChunks: t.IsBenchmark(), + MaxFrameBytes: maxBytesPerFrame, + }) + t.Cleanup(func() { + testutil.Ok(t, head2.Close()) + }) + + for i := 0; i < len(created); i++ { + resps[3] = append(resps[3], storepb.NewSeriesResponse(created[i])) + } + + db, err := tsdb.OpenDBReadOnly(tmpDir, logger) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, db.Close()) }() + + extLabels := labels.FromStrings("ext", "1") + store := NewTSDBStore(logger, nil, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store.maxBytesPerFrame = maxBytesPerFrame + + var expected []*storepb.Series + for _, resp := range resps { + for _, r := range resp { + // Add external labels. + x := storepb.Series{ + Labels: make([]storepb.Label, 0, len(r.GetSeries().Labels)+len(extLabels)), + } + for _, l := range r.GetSeries().Labels { + x.Labels = append(x.Labels, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + for _, l := range extLabels { + x.Labels = append(x.Labels, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Slice(x.Labels, func(i, j int) bool { + return x.Labels[i].Name < x.Labels[j].Name + }) + + expected = append(expected, &storepb.Series{Labels: x.Labels, Chunks: r.GetSeries().Chunks}) + } + } + + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%d blocks and one WAL with %d samples, %d series each", numOfBlocks-1, samplesPerSeriesPerBlock, seriesPerBlock), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + ExpectedSeries: expected, + }, + ) +} diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 8b1e8e1d4f1..6cc1920aee3 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -8,9 +8,11 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/debug" "testing" "github.com/davecgh/go-spew/spew" + "github.com/pkg/errors" "github.com/pmezard/go-difflib/difflib" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -152,3 +154,19 @@ func GatherAndCompare(t *testing.T, g1 prometheus.Gatherer, g2 prometheus.Gather } Equals(t, m1.String(), m2.String()) } + +// FaultOrPanicToErr returns error if panic of fault was triggered during execution of function. +func FaultOrPanicToErr(f func()) (err error) { + // Set this go routine to panic on segfault to allow asserting on those. + debug.SetPanicOnFault(true) + defer func() { + if r := recover(); r != nil { + err = errors.Errorf("invoked function panicked or caused segmentation fault: %v", r) + } + debug.SetPanicOnFault(false) + }() + + f() + + return err +}