Skip to content

Commit

Permalink
Support native histograms in store-gateway (thanos-io#6056)
Browse files Browse the repository at this point in the history
* Support native histograms in store-gateway

This commit adds support for retrieving native histogram chunks
in Store Gateway.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix tests

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Revert test changes

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored and Nathaniel Graham committed Apr 17, 2023
1 parent 627bad2 commit ef2c0b6
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 57 deletions.
9 changes: 7 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,14 +1050,19 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if in.Encoding() == chunkenc.EncXOR {
if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram {
b, err := save(in.Bytes())
if err != nil {
return err
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Raw = &storepb.Chunk{
Data: b,
Type: storepb.Chunk_Encoding(in.Encoding() - 1),
Hash: hashChunk(hasher, b, calculateChecksum),
}
return nil
}

if in.Encoding() != downsample.ChunkEncAggr {
return errors.Errorf("unsupported chunk encoding %d", in.Encoding())
}
Expand Down
30 changes: 22 additions & 8 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"

"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -45,6 +44,7 @@ import (
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1238,34 +1238,41 @@ func benchmarkExpandedPostings(
func TestBucketSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, false, samplesPerSeries, series, 1)
benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1)
})
}

func TestBucketHistogramSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1)
})
}

func TestBucketSkipChunksSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, true, samplesPerSeries, series, 1)
benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1)
})
}

func BenchmarkBucketSeries(b *testing.B) {
tb := testutil.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
})
}

func BenchmarkBucketSkipChunksSeries(b *testing.B) {
tb := testutil.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
})
}

func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) {
func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) {
const numOfBlocks = 4

tmpDir := t.TempDir()
Expand Down Expand Up @@ -1303,17 +1310,24 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
// Timestamp will be counted for each new series and new sample, so each each series will have unique timestamp.
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
head, _ := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
SamplesPerSeries: samplesPerSeriesPerBlock,
Series: seriesPerBlock,
PrependLabels: extLset,
Random: random,
SkipChunks: t.IsBenchmark() || skipChunk,
SampleType: sampleType,
})
id := createBlockFromHead(t, blockDir, head)
testutil.Ok(t, head.Close())
series = append(series, bSeries...)

// Histogram chunks are represented differently in memory and on disk. In order to
// have a precise comparison, we need to use the on-disk representation as the expected value
// instead of the in-memory one.
diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil)
testutil.Ok(t, err)
series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...)

meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil)
testutil.Ok(t, err)
Expand Down
96 changes: 76 additions & 20 deletions pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ import (
"time"

"github.com/cespare/xxhash"

"github.com/efficientgo/core/testutil"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/wlog"

"github.com/efficientgo/core/testutil"
"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -51,6 +53,7 @@ type HeadGenOptions struct {
WithWAL bool
PrependLabels labels.Labels
SkipChunks bool // Skips chunks in returned slice (not in generated head!).
SampleType chunkenc.ValueType

Random *rand.Rand
}
Expand All @@ -66,6 +69,10 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
if opts.ScrapeInterval == 0 {
opts.ScrapeInterval = 1 * time.Millisecond
}
// Use float type if sample type is not set.
if opts.SampleType == chunkenc.ValNone {
opts.SampleType = chunkenc.ValFloat
}

fmt.Printf(
"Creating %d %d-sample series with %s interval in %s\n",
Expand All @@ -86,27 +93,26 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = opts.TSDBDir
headOpts.EnableNativeHistograms = *atomic.NewBool(true)
h, err := tsdb.NewHead(nil, nil, w, nil, headOpts, nil)
testutil.Ok(t, err)

app := h.Appender(context.Background())
for i := 0; i < opts.Series; i++ {
tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries
ref, err := app.Append(
0,
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
opts.Random.Float64(),
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
_, err := app.Append(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64())
testutil.Ok(t, err)
switch opts.SampleType {
case chunkenc.ValFloat:
appendFloatSamples(t, app, tsLabel, opts)
case chunkenc.ValHistogram:
appendHistogramSamples(t, app, tsLabel, opts)
}
}
testutil.Ok(t, app.Commit())

return h, ReadSeriesFromBlock(t, h, opts.PrependLabels, opts.SkipChunks)
}

func ReadSeriesFromBlock(t testing.TB, h tsdb.BlockReader, extLabels labels.Labels, skipChunks bool) []*storepb.Series {
// Use TSDB and get all series for assertion.
chks, err := h.Chunks()
testutil.Ok(t, err)
Expand All @@ -119,7 +125,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
var (
lset labels.Labels
chunkMetas []chunks.Meta
expected = make([]*storepb.Series, 0, opts.Series)
expected = make([]*storepb.Series, 0)
)

var builder labels.ScratchBuilder
Expand All @@ -128,9 +134,9 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
for all.Next() {
testutil.Ok(t, ir.Series(all.At(), &builder, &chunkMetas))
lset = builder.Labels()
expected = append(expected, &storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(append(opts.PrependLabels.Copy(), lset...))})
expected = append(expected, &storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(append(extLabels.Copy(), lset...))})

if opts.SkipChunks {
if skipChunks {
continue
}

Expand All @@ -146,12 +152,60 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{
MinTime: c.MinTime,
MaxTime: c.MaxTime,
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes(), Hash: xxhash.Sum64(chEnc.Bytes())},
Raw: &storepb.Chunk{
Data: chEnc.Bytes(),
Type: storepb.Chunk_Encoding(chEnc.Encoding() - 1),
Hash: xxhash.Sum64(chEnc.Bytes()),
},
})
}
}
testutil.Ok(t, all.Err())
return h, expected
return expected
}

func appendFloatSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) {
ref, err := app.Append(
0,
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
opts.Random.Float64(),
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
_, err := app.Append(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64())
testutil.Ok(t, err)
}
}

func appendHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) {
sample := &histogram.Histogram{
Schema: 0,
Count: 9,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
}

ref, err := app.AppendHistogram(
0,
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
sample,
nil,
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
_, err := app.AppendHistogram(ref, nil, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), sample, nil)
testutil.Ok(t, err)
}
}

// SeriesServer is test gRPC storeAPI series server.
Expand Down Expand Up @@ -265,7 +319,9 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series
if len(c.ExpectedSeries[j].Chunks) > 20 {
testutil.Equals(t, len(c.ExpectedSeries[j].Chunks), len(srv.SeriesSet[j].Chunks), "%v series chunks number mismatch", j)
}
testutil.Equals(t, c.ExpectedSeries[j].Chunks, srv.SeriesSet[j].Chunks, "%v series chunks mismatch", j)
for ci := range c.ExpectedSeries[j].Chunks {
testutil.Equals(t, c.ExpectedSeries[j].Chunks[ci], srv.SeriesSet[j].Chunks[ci], "%v series chunks mismatch %v", j, ci)
}
}
}
} else {
Expand Down
Loading

0 comments on commit ef2c0b6

Please sign in to comment.