From 3ecb6026b723e296387a2a190ab8ee3d8e294800 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 17 Jan 2018 11:56:55 +0100 Subject: [PATCH] downsample: fix various issues --- pkg/compact/downsample/downsample.go | 168 +++++++++++----------- pkg/compact/downsample/downsample_test.go | 2 +- 2 files changed, 82 insertions(+), 88 deletions(-) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 806207e4b68..a77058d1549 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -7,7 +7,8 @@ import ( "os" "path/filepath" "sort" - "strings" + + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/pkg/value" @@ -38,6 +39,10 @@ func Downsample( dir string, resolution int64, ) (id ulid.ULID, err error) { + if origMeta.Thanos.Downsample.Resolution >= resolution { + return id, errors.New("target resolution not lower than existing one") + } + indexr, err := b.Index() if err != nil { return id, errors.Wrap(err, "open index reader") @@ -78,26 +83,30 @@ func Downsample( if err := indexr.Series(pall.At(), &lset, &chks); err != nil { return id, errors.Wrapf(err, "get series %d", pall.At()) } + // While #183 exists, we sanitize the chunks we retrieved from the block + // before retrieving their samples. + for i, c := range chks { + chk, err := chunkr.Chunk(c.Ref) + if err != nil { + return id, errors.Wrapf(err, "get chunk %d", c.Ref) + } + chks[i].Chunk = chk + } + // Raw and already downsampled data need different processing. if origMeta.Thanos.Downsample.Resolution == 0 { for _, c := range chks { - chk, err := chunkr.Chunk(c.Ref) - if err != nil { - return id, errors.Wrapf(err, "get chunk %d", c.Ref) - } - if err := expandChunkIterator(chk.Iterator(), &all); err != nil { + if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil { return id, errors.Wrapf(err, "expand chunk %d", c.Ref) } } - newb.addSeries(downsampleRaw(lset, all, resolution)) + newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}) continue } + + // Downsample a block that contains aggregate chunks already. for _, c := range chks { - chk, err := chunkr.Chunk(c.Ref) - if err != nil { - return id, errors.Wrapf(err, "get chunk %d", c.Ref) - } - aggrChunks = append(aggrChunks, chk.(AggrChunk)) + aggrChunks = append(aggrChunks, c.Chunk.(AggrChunk)) } res, err := downsampleAggr(aggrChunks, &all, chks[0].MinTime, chks[len(chks)-1].MaxTime, resolution) if err != nil { @@ -308,18 +317,15 @@ type aggrChunkBuilder struct { apps [5]chunkenc.Appender } -func newAggrChunkBuilder(isCounter bool) *aggrChunkBuilder { +func newAggrChunkBuilder() *aggrChunkBuilder { b := &aggrChunkBuilder{ - mint: math.MaxInt64, - maxt: math.MinInt64, - isCounter: isCounter, - } - if !isCounter { - b.chunks[AggrSum] = chunkenc.NewXORChunk() - b.chunks[AggrMin] = chunkenc.NewXORChunk() - b.chunks[AggrMax] = chunkenc.NewXORChunk() + mint: math.MaxInt64, + maxt: math.MinInt64, } b.chunks[AggrCount] = chunkenc.NewXORChunk() + b.chunks[AggrSum] = chunkenc.NewXORChunk() + b.chunks[AggrMin] = chunkenc.NewXORChunk() + b.chunks[AggrMax] = chunkenc.NewXORChunk() b.chunks[AggrCounter] = chunkenc.NewXORChunk() for i, c := range b.chunks { @@ -327,7 +333,6 @@ func newAggrChunkBuilder(isCounter bool) *aggrChunkBuilder { b.apps[i], _ = c.Appender() } } - return b } @@ -338,11 +343,9 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) { if t > b.maxt { b.maxt = t } - if !b.isCounter { - b.apps[AggrSum].Append(t, aggr.sum) - b.apps[AggrMin].Append(t, aggr.min) - b.apps[AggrMax].Append(t, aggr.max) - } + b.apps[AggrSum].Append(t, aggr.sum) + b.apps[AggrMin].Append(t, aggr.min) + b.apps[AggrMax].Append(t, aggr.max) b.apps[AggrCount].Append(t, float64(aggr.count)) b.apps[AggrCounter].Append(t, aggr.counter) @@ -362,7 +365,7 @@ func currentWindow(t, r int64) int64 { } // downsampleRaw create a series of aggregation chunks for the given sample data. -func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series { +func downsampleRaw(data []sample, resolution int64) []chunks.Meta { if len(data) == 0 { return nil } @@ -385,7 +388,7 @@ func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series for ; j < len(data) && data[j].t <= curW; j++ { } - ab := newAggrChunkBuilder(isCounter(lset)) + ab := newAggrChunkBuilder() batch := data[:j] data = data[j:] @@ -400,7 +403,7 @@ func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series Chunk: ab.encode(), }) } - return &series{lset: lset, chunks: chks} + return chks } // downsampleBatch aggregates the data over the given resolution and calls add each time @@ -464,9 +467,16 @@ func downsampleAggr(chks []AggrChunk, buf *[]sample, mint, maxt, resolution int6 } func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error { + // For safety reasons, we check for each sample that it does not go back in time. + // If it does, we skip it. + lastT := int64(0) + for it.Next() { t, v := it.At() - *buf = append(*buf, sample{t, v}) + if t >= lastT { + *buf = append(*buf, sample{t, v}) + lastT = t + } } return it.Err() } @@ -564,19 +574,11 @@ func downsampleAggrBatch(chks []AggrChunk, buf *[]sample, resolution int64) (min return mint, maxt, ab.encode(), nil } -// isCounter guesses whether a series is a counter based on its label set. -func isCounter(lset labels.Labels) bool { - metric := lset.Get("__name__") - return strings.HasSuffix(metric, "_total") || - strings.HasSuffix(metric, "_bucket") || - strings.HasSuffix(metric, "_sum") -} - // targetChunkSize computes an intended sample count per chunk given a fixed length // of samples in the series. -// It aims to split the series into chunks of length 100 or higher. +// It aims to split the series into chunks of length 70 or higher. func targetChunkSize(l int) (c, s int) { - for c = 1; l/c > 200; c++ { + for c = 1; l/c > 140; c++ { } return c, (l / c) + 1 } @@ -598,21 +600,16 @@ type AggrChunk []byte // EncodeAggrChunk encodes a new aggregate chunk from the array of chunks for each aggregate. // Each array entry corresponds to the respective AggrType number. func EncodeAggrChunk(chks [5]chunkenc.Chunk) AggrChunk { - var mask byte - var all []chunkenc.Chunk - - for i, c := range chks { - if c != nil { - mask |= 1 << (7 - uint(i)) - all = append(all, c) - } - } - var b []byte - b = append(b, mask) buf := [8]byte{} - for _, c := range all { + for _, c := range chks { + // Unset aggregates are marked with a zero length entry. + if c == nil { + n := binary.PutUvarint(buf[:], 0) + b = append(b, buf[:n]...) + continue + } l := len(c.Bytes()) n := binary.PutUvarint(buf[:], uint64(l)) b = append(b, buf[:n]...) @@ -635,7 +632,7 @@ func (c AggrChunk) Iterator() chunkenc.Iterator { } func (c AggrChunk) NumSamples() int { - x, err := c.nth(0) + x, err := c.Get(AggrCount) if err != nil { return 0 } @@ -653,46 +650,30 @@ func (c AggrChunk) Encoding() chunkenc.Encoding { return ChunkEncAggr } -// nth returns the nth chunk present in the aggregated chunk. -func (c AggrChunk) nth(n uint8) (chunkenc.Chunk, error) { - b := c[1:] +// Get returns the sub-chunk for the given aggregate type if it exists. +func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) { + b := c[:] var x []byte - for i := uint8(0); i <= n; i++ { + for i := AggrType(0); i <= t; i++ { l, n := binary.Uvarint(b) if n < 1 || len(b[n:]) < int(l)+1 { return nil, errors.New("invalid size") } - x = b[n : n+int(l)+1] - b = b[n+int(l)+1:] + b = b[n:] + // If length is set to zero explicitly, that means the aggregate is unset. + if l == 0 { + if i == t { + return nil, ErrAggrNotExist + } + continue + } + x = b[:int(l)+1] + b = b[int(l)+1:] } return chunkenc.FromData(chunkenc.Encoding(x[0]), x[1:]) } -// position returns at which position the chunk for the type is located. -func (c AggrChunk) position(t AggrType) (ok bool, p uint8) { - mask := uint8(c[0]) - - if mask&(1<<(7-t)) == 0 { - return false, 0 - } - // In the bit mask we count how many chunk types have their bit set before - // the type's own position. - for i := int(t) - 1; i >= 0; i-- { - p += (mask << uint(i)) >> 7 - } - return true, p -} - -// Get returns the sub-chunk for the given aggregate type if it exists. -func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) { - ok, p := c.position(t) - if !ok { - return nil, ErrAggrNotExist - } - return c.nth(p) -} - // CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing // values as counter reset. // Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk @@ -717,7 +698,10 @@ func (it *CounterSeriesIterator) Next() bool { } if ok := it.chks[it.i].Next(); !ok { it.i++ - return it.Next() + // While iterators are ordered, they are not generally guaranteed to be + // non-overlapping. Ensure that the series does not go back in time by seeking at least + // to the next timestamp. + return it.Seek(it.lastT + 1) } t, v := it.chks[it.i].At() @@ -742,9 +726,12 @@ func (it *CounterSeriesIterator) Next() bool { it.total++ return true } - // We hit the last sample that indicates what the true last value was. For the + // We hit a sample that indicates what the true last value was. For the // next chunk we use it to determine whether there was a counter reset between them. - it.lastT, it.lastV = t, v + if t == it.lastT { + it.lastV = v + } + // Otherwise the series went back in time and we just keep moving forward. return it.Next() } @@ -800,6 +787,7 @@ func (it *AverageChunkIterator) Next() bool { sumT, sumV := it.sumIt.At() if cntT != sumT { it.err = errors.New("sum and count timestamps not aligned") + return false } it.t, it.v = cntT, sumV/cntV return true @@ -810,6 +798,12 @@ func (it *AverageChunkIterator) At() (int64, float64) { } func (it *AverageChunkIterator) Err() error { + if it.cntIt.Err() != nil { + return it.cntIt.Err() + } + if it.sumIt.Err() != nil { + return it.sumIt.Err() + } return it.err } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 2a8456706e2..dfd32d2ea14 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -125,7 +125,7 @@ type testSeries struct { } func encodeTestAggrSeries(v map[AggrType][]sample) (AggrChunk, int64, int64) { - b := newAggrChunkBuilder(false) + b := newAggrChunkBuilder() for at, d := range v { for _, s := range d {