Skip to content

Commit

Permalink
downsample: fix various issues
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Jan 29, 2018
1 parent 6f8bfbc commit 9359c8c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 90 deletions.
169 changes: 80 additions & 89 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"os"
"path/filepath"
"sort"
"strings"

"github.com/prometheus/prometheus/pkg/value"

"github.com/improbable-eng/thanos/pkg/block"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"

"github.com/go-kit/kit/log"
Expand All @@ -38,6 +36,10 @@ func Downsample(
dir string,
resolution int64,
) (id ulid.ULID, err error) {
if origMeta.Thanos.Downsample.Resolution >= resolution {
return id, errors.New("target resolution not lower than existing one")
}

indexr, err := b.Index()
if err != nil {
return id, errors.Wrap(err, "open index reader")
Expand Down Expand Up @@ -78,26 +80,30 @@ func Downsample(
if err := indexr.Series(pall.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", pall.At())
}
// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
}
chks[i].Chunk = chk
}

// Raw and already downsampled data need different processing.
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
}
if err := expandChunkIterator(chk.Iterator(), &all); err != nil {
if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d", c.Ref)
}
}
newb.addSeries(downsampleRaw(lset, all, resolution))
newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)})
continue
}

// Downsample a block that contains aggregate chunks already.
for _, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
}
aggrChunks = append(aggrChunks, chk.(AggrChunk))
aggrChunks = append(aggrChunks, c.Chunk.(AggrChunk))
}
res, err := downsampleAggr(aggrChunks, &all, chks[0].MinTime, chks[len(chks)-1].MaxTime, resolution)
if err != nil {
Expand Down Expand Up @@ -308,26 +314,22 @@ type aggrChunkBuilder struct {
apps [5]chunkenc.Appender
}

func newAggrChunkBuilder(isCounter bool) *aggrChunkBuilder {
func newAggrChunkBuilder() *aggrChunkBuilder {
b := &aggrChunkBuilder{
mint: math.MaxInt64,
maxt: math.MinInt64,
isCounter: isCounter,
}
if !isCounter {
b.chunks[AggrSum] = chunkenc.NewXORChunk()
b.chunks[AggrMin] = chunkenc.NewXORChunk()
b.chunks[AggrMax] = chunkenc.NewXORChunk()
mint: math.MaxInt64,
maxt: math.MinInt64,
}
b.chunks[AggrCount] = chunkenc.NewXORChunk()
b.chunks[AggrSum] = chunkenc.NewXORChunk()
b.chunks[AggrMin] = chunkenc.NewXORChunk()
b.chunks[AggrMax] = chunkenc.NewXORChunk()
b.chunks[AggrCounter] = chunkenc.NewXORChunk()

for i, c := range b.chunks {
if c != nil {
b.apps[i], _ = c.Appender()
}
}

return b
}

Expand All @@ -338,11 +340,9 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
if t > b.maxt {
b.maxt = t
}
if !b.isCounter {
b.apps[AggrSum].Append(t, aggr.sum)
b.apps[AggrMin].Append(t, aggr.min)
b.apps[AggrMax].Append(t, aggr.max)
}
b.apps[AggrSum].Append(t, aggr.sum)
b.apps[AggrMin].Append(t, aggr.min)
b.apps[AggrMax].Append(t, aggr.max)
b.apps[AggrCount].Append(t, float64(aggr.count))
b.apps[AggrCounter].Append(t, aggr.counter)

Expand All @@ -362,7 +362,7 @@ func currentWindow(t, r int64) int64 {
}

// downsampleRaw create a series of aggregation chunks for the given sample data.
func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series {
func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
if len(data) == 0 {
return nil
}
Expand All @@ -385,7 +385,7 @@ func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series
for ; j < len(data) && data[j].t <= curW; j++ {
}

ab := newAggrChunkBuilder(isCounter(lset))
ab := newAggrChunkBuilder()
batch := data[:j]
data = data[j:]

Expand All @@ -400,7 +400,7 @@ func downsampleRaw(lset labels.Labels, data []sample, resolution int64) *series
Chunk: ab.encode(),
})
}
return &series{lset: lset, chunks: chks}
return chks
}

// downsampleBatch aggregates the data over the given resolution and calls add each time
Expand Down Expand Up @@ -464,9 +464,16 @@ func downsampleAggr(chks []AggrChunk, buf *[]sample, mint, maxt, resolution int6
}

func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
// For safety reasons, we check for each sample that it does not go back in time.
// If it does, we skip it.
lastT := int64(0)

for it.Next() {
t, v := it.At()
*buf = append(*buf, sample{t, v})
if t >= lastT {
*buf = append(*buf, sample{t, v})
lastT = t
}
}
return it.Err()
}
Expand Down Expand Up @@ -564,19 +571,11 @@ func downsampleAggrBatch(chks []AggrChunk, buf *[]sample, resolution int64) (min
return mint, maxt, ab.encode(), nil
}

// isCounter guesses whether a series is a counter based on its label set.
func isCounter(lset labels.Labels) bool {
metric := lset.Get("__name__")
return strings.HasSuffix(metric, "_total") ||
strings.HasSuffix(metric, "_bucket") ||
strings.HasSuffix(metric, "_sum")
}

// targetChunkSize computes an intended sample count per chunk given a fixed length
// of samples in the series.
// It aims to split the series into chunks of length 100 or higher.
// It aims to split the series into chunks of length 70 or higher.
func targetChunkSize(l int) (c, s int) {
for c = 1; l/c > 200; c++ {
for c = 1; l/c > 140; c++ {
}
return c, (l / c) + 1
}
Expand All @@ -598,21 +597,16 @@ type AggrChunk []byte
// EncodeAggrChunk encodes a new aggregate chunk from the array of chunks for each aggregate.
// Each array entry corresponds to the respective AggrType number.
func EncodeAggrChunk(chks [5]chunkenc.Chunk) AggrChunk {
var mask byte
var all []chunkenc.Chunk

for i, c := range chks {
if c != nil {
mask |= 1 << (7 - uint(i))
all = append(all, c)
}
}

var b []byte
b = append(b, mask)
buf := [8]byte{}

for _, c := range all {
for _, c := range chks {
// Unset aggregates are marked with a zero length entry.
if c == nil {
n := binary.PutUvarint(buf[:], 0)
b = append(b, buf[:n]...)
continue
}
l := len(c.Bytes())
n := binary.PutUvarint(buf[:], uint64(l))
b = append(b, buf[:n]...)
Expand All @@ -635,7 +629,7 @@ func (c AggrChunk) Iterator() chunkenc.Iterator {
}

func (c AggrChunk) NumSamples() int {
x, err := c.nth(0)
x, err := c.Get(AggrCount)
if err != nil {
return 0
}
Expand All @@ -653,46 +647,30 @@ func (c AggrChunk) Encoding() chunkenc.Encoding {
return ChunkEncAggr
}

// nth returns the nth chunk present in the aggregated chunk.
func (c AggrChunk) nth(n uint8) (chunkenc.Chunk, error) {
b := c[1:]
// Get returns the sub-chunk for the given aggregate type if it exists.
func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) {
b := c[:]
var x []byte

for i := uint8(0); i <= n; i++ {
for i := AggrType(0); i <= t; i++ {
l, n := binary.Uvarint(b)
if n < 1 || len(b[n:]) < int(l)+1 {
return nil, errors.New("invalid size")
}
x = b[n : n+int(l)+1]
b = b[n+int(l)+1:]
b = b[n:]
// If length is set to zero explicitly, that means the aggregate is unset.
if l == 0 {
if i == t {
return nil, ErrAggrNotExist
}
continue
}
x = b[:int(l)+1]
b = b[int(l)+1:]
}
return chunkenc.FromData(chunkenc.Encoding(x[0]), x[1:])
}

// position returns at which position the chunk for the type is located.
func (c AggrChunk) position(t AggrType) (ok bool, p uint8) {
mask := uint8(c[0])

if mask&(1<<(7-t)) == 0 {
return false, 0
}
// In the bit mask we count how many chunk types have their bit set before
// the type's own position.
for i := int(t) - 1; i >= 0; i-- {
p += (mask << uint(i)) >> 7
}
return true, p
}

// Get returns the sub-chunk for the given aggregate type if it exists.
func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) {
ok, p := c.position(t)
if !ok {
return nil, ErrAggrNotExist
}
return c.nth(p)
}

// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing
// values as counter reset.
// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk
Expand All @@ -717,7 +695,10 @@ func (it *CounterSeriesIterator) Next() bool {
}
if ok := it.chks[it.i].Next(); !ok {
it.i++
return it.Next()
// While iterators are ordered, they are not generally guaranteed to be
// non-overlapping. Ensure that the series does not go back in time by seeking at least
// to the next timestamp.
return it.Seek(it.lastT + 1)
}
t, v := it.chks[it.i].At()

Expand All @@ -742,9 +723,12 @@ func (it *CounterSeriesIterator) Next() bool {
it.total++
return true
}
// We hit the last sample that indicates what the true last value was. For the
// We hit a sample that indicates what the true last value was. For the
// next chunk we use it to determine whether there was a counter reset between them.
it.lastT, it.lastV = t, v
if t == it.lastT {
it.lastV = v
}
// Otherwise the series went back in time and we just keep moving forward.

return it.Next()
}
Expand Down Expand Up @@ -800,6 +784,7 @@ func (it *AverageChunkIterator) Next() bool {
sumT, sumV := it.sumIt.At()
if cntT != sumT {
it.err = errors.New("sum and count timestamps not aligned")
return false
}
it.t, it.v = cntT, sumV/cntV
return true
Expand All @@ -810,6 +795,12 @@ func (it *AverageChunkIterator) At() (int64, float64) {
}

func (it *AverageChunkIterator) Err() error {
if it.cntIt.Err() != nil {
return it.cntIt.Err()
}
if it.sumIt.Err() != nil {
return it.sumIt.Err()
}
return it.err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type testSeries struct {
}

func encodeTestAggrSeries(v map[AggrType][]sample) (AggrChunk, int64, int64) {
b := newAggrChunkBuilder(false)
b := newAggrChunkBuilder()

for at, d := range v {
for _, s := range d {
Expand Down

0 comments on commit 9359c8c

Please sign in to comment.