Skip to content

Commit

Permalink
receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDB…
Browse files Browse the repository at this point in the history
…Store, fixed multitsdb benchmarks.

Fixed #3013

Also:
* Fixed other quite big issue with reusing chunk slice.
* Fixed framing - previously it was wrongly sending single-chunk frames, taking
huge amount of time.

Before upgrading go.mod with latest TSDB:

 TestTSDBStore_SeriesChunkBytesCopied/flush_WAL_and_access_results: tsdb_test.go:487: tsdb_test.go:487:

         unexpected error: invoked function panicked or caused segmentation fault: runtime error: invalid memory address or nil pointer dereference

After all ok.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Aug 18, 2020
1 parent ae629b2 commit 21af82b
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 56 deletions.
21 changes: 13 additions & 8 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
var (
logger = log.NewNopLogger()
blocks []*bucketBlock
series []storepb.Series
series []*storepb.Series
random = rand.New(rand.NewSource(120))
)

Expand Down Expand Up @@ -1213,7 +1213,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
SamplesPerSeries: samplesPerSeriesPerBlock,
Series: seriesPerBlock,
PrependLabels: extLset,
Expand Down Expand Up @@ -1536,17 +1536,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
}

// Create TSDB blocks.
opts := storetestutil.HeadGenOptions{
Dir: tmpDir,
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "0"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
}
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts)
})
block1 := createBlockFromHead(t, bktDir, head)
testutil.Ok(t, head.Close())
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts)
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "1"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
})
block2 := createBlockFromHead(t, bktDir, head2)
testutil.Ok(t, head2.Close())

Expand Down Expand Up @@ -1610,7 +1615,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
Expand Down
23 changes: 9 additions & 14 deletions pkg/store/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
}
}()
for j := range dbs {
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: tsdbDir,
SamplesPerSeries: samplesPerSeriesPerTSDB,
Series: seriesPerTSDB,
WithWAL: true,
WithWAL: !flushToBlocks,
Random: random,
SkipChunks: t.IsBenchmark(),
})
testutil.Ok(t, head.Close())

tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

for i := 0; i < len(created); i++ {
resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i]))
resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i]))
}

if flushToBlocks {
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)

testutil.Ok(t, db.FlushWAL(tmpDir))
testutil.Ok(t, db.Close())
_ = createBlockFromHead(t, tsdbDir, head)
}
testutil.Ok(t, head.Close())

db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)
Expand All @@ -129,7 +124,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, resp := range resps {
for _, r := range resp {
Expand All @@ -141,7 +136,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -1616,7 +1617,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
var resps []*storepb.SeriesResponse

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)),
SamplesPerSeries: samplesPerSeriesPerClient,
Series: seriesPerClient,
MaxFrameBytes: storetestutil.RemoteReadFrameLimit,
Expand All @@ -1626,7 +1627,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
testutil.Ok(t, head.Close())

for i := 0; i < len(created); i++ {
resps = append(resps, storepb.NewSeriesResponse(&created[i]))
resps = append(resps, storepb.NewSeriesResponse(created[i]))
}

clients[j] = &testClient{
Expand All @@ -1647,7 +1648,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
}

var allResps []*storepb.SeriesResponse
var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, c := range clients {
m := c.(*testClient).StoreClient.(*mockedStoreAPI)
Expand All @@ -1663,7 +1664,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}

}
Expand Down Expand Up @@ -1700,7 +1701,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
// In this we expect exactly the same response as input.
expected = expected[:0]
for _, r := range allResps {
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
storetestutil.TestServerSeries(t, store,
&storetestutil.SeriesCase{
Expand Down
30 changes: 16 additions & 14 deletions pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -39,13 +40,13 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings {
const RemoteReadFrameLimit = 1048576

type HeadGenOptions struct {
Dir string
TSDBDir string
SamplesPerSeries, Series int

MaxFrameBytes int // No limit by default.
WithWAL bool
PrependLabels labels.Labels
SkipChunks bool
SkipChunks bool // Skips chunks in returned slice (not in generated head!).

Random *rand.Rand
}
Expand All @@ -54,22 +55,23 @@ type HeadGenOptions struct {
// Returned series list has "ext1"="1" prepended. Each series looks as follows:
// {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} <random value> where number indicate sample number from 0.
// Returned series are frame in same way as remote read would frame them.
func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []storepb.Series) {
func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []*storepb.Series) {
if opts.SamplesPerSeries < 1 || opts.Series < 1 {
t.Fatal("samples and series has to be 1 or more")
}

tsdbDir := filepath.Join(opts.Dir, fmt.Sprintf("%d", j))
fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, tsdbDir)
fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, opts.TSDBDir)

var w *wal.WAL
var err error
if opts.WithWAL {
w, err = wal.New(nil, nil, filepath.Join(tsdbDir, "wal"), true)
w, err = wal.New(nil, nil, filepath.Join(opts.TSDBDir, "wal"), true)
testutil.Ok(t, err)
} else {
testutil.Ok(t, os.MkdirAll(filepath.Join(opts.TSDBDir, "wal"), os.ModePerm))
}

h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, opts.TSDBDir, nil, tsdb.DefaultStripeSize, nil)
testutil.Ok(t, err)

app := h.Appender(context.Background())
Expand All @@ -96,7 +98,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
var (
lset labels.Labels
chunkMetas []chunks.Meta
expected = make([]storepb.Series, 0, opts.Series)
expected = make([]*storepb.Series, 0, opts.Series)
sBytes int
)

Expand All @@ -105,7 +107,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas))
i := 0
sLset := storepb.PromLabelsToLabels(lset)
expected = append(expected, storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)})
expected = append(expected, &storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)})

if opts.SkipChunks {
continue
Expand Down Expand Up @@ -142,7 +144,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,

// Compose many frames as remote read (so sidecar StoreAPI) would do if requested by maxFrameBytes.
if opts.MaxFrameBytes > 0 && sBytes >= opts.MaxFrameBytes {
expected = append(expected, storepb.Series{Labels: sLset})
expected = append(expected, &storepb.Series{Labels: sLset})
sBytes = lBytes
}
}
Expand All @@ -158,7 +160,7 @@ type SeriesServer struct {

ctx context.Context

SeriesSet []storepb.Series
SeriesSet []*storepb.Series
Warnings []string
HintsSet []*types.Any

Expand All @@ -178,7 +180,7 @@ func (s *SeriesServer) Send(r *storepb.SeriesResponse) error {
}

if r.GetSeries() != nil {
s.SeriesSet = append(s.SeriesSet, *r.GetSeries())
s.SeriesSet = append(s.SeriesSet, r.GetSeries())
return nil
}

Expand Down Expand Up @@ -227,7 +229,7 @@ type SeriesCase struct {
Req *storepb.SeriesRequest

// Exact expectations are checked only for tests. For benchmarks only length is assured.
ExpectedSeries []storepb.Series
ExpectedSeries []*storepb.Series
ExpectedWarnings []string
ExpectedHints []hintspb.SeriesResponseHints
}
Expand All @@ -254,7 +256,7 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series

testutil.Equals(t, c.ExpectedSeries[0].Chunks[0], srv.SeriesSet[0].Chunks[0])

// This might give unreadable output for millions of series on fail..
// This might give unreadable output for millions of series on fail.
testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet)

var actualHints []hintspb.SeriesResponseHints
Expand Down
31 changes: 16 additions & 15 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,28 +116,27 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
}
defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier series")

var (
set = q.Select(false, nil, matchers...)
respSeries storepb.Series
)
set := q.Select(false, nil, matchers...)

// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
series := set.At()
respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels)
respSeries.Chunks = respSeries.Chunks[:0]
seriesLabels := storepb.Series{Labels: s.translateAndExtendLabels(series.Labels(), s.externalLabels)}
if r.SkipChunks {
if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
if err := srv.Send(storepb.NewSeriesResponse(&seriesLabels)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
continue
}

frameBytesLeft := s.maxBytesPerFrame
for _, lbl := range respSeries.Labels {
frameBytesLeft -= lbl.Size()
bytesLeftForChunks := s.maxBytesPerFrame
for _, lbl := range seriesLabels.Labels {
bytesLeftForChunks -= lbl.Size()
}
frameBytesLeft := bytesLeftForChunks

// Share? respSeries.Chunks = respSeries.Chunks[:0]
seriesChunks := []storepb.AggrChunk{}
chIter := series.Iterator()
isNext := chIter.Next()
for isNext {
Expand All @@ -146,26 +145,28 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}

respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{
c := storepb.AggrChunk{
MinTime: chk.MinTime,
MaxTime: chk.MaxTime,
Raw: &storepb.Chunk{
Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one.
Data: chk.Chunk.Bytes(),
},
})
frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size()
}
frameBytesLeft -= c.Size()
seriesChunks = append(seriesChunks, c)

// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = chIter.Next()
if frameBytesLeft > 0 && isNext {
continue
}

if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
if err := srv.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: seriesLabels.Labels, Chunks: seriesChunks})); err != nil {
return status.Error(codes.Aborted, err.Error())
}
respSeries.Chunks = respSeries.Chunks[:0]
frameBytesLeft = bytesLeftForChunks
seriesChunks = []storepb.AggrChunk{}
}
if err := chIter.Err(); err != nil {
return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error())
Expand Down
Loading

0 comments on commit 21af82b

Please sign in to comment.