Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

During 1h downsampling skip XOR chunks that may erroneously be present in 5m resolution blocks #5453

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased
- [#5453](https://github.com/thanos-io/thanos/pull/5453) Compact: Skip erroneous empty non `*AggrChunk` chunks during 1h downsampling of 5m resolution blocks.

### Fixed

Expand Down
10 changes: 9 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
package downsample

import (
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -153,7 +155,13 @@ func Downsample(
for _, c := range chks {
ac, ok := c.Chunk.(*AggrChunk)
if !ok {
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got %T instead for series: %d", c.Chunk, postings.At())
if c.Chunk.NumSamples() == 0 {
// Downsampled block can erroneously contain empty XOR chunks, skip those
// https://github.com/thanos-io/thanos/issues/5272
level.Warn(logger).Log("msg", fmt.Sprintf("expected downsampled chunk (*downsample.AggrChunk) got an empty %T instead for series: %d", c.Chunk, postings.At()))
continue
}
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got a non-empty %T instead for series: %d", c.Chunk, postings.At())
}
aggrChunks = append(aggrChunks, ac)
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,71 @@ func TestDownsample(t *testing.T) {
}
}

func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "downsample-mixed")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ser := &series{lset: labels.FromStrings("__name__", "a")}
aggr := map[AggrType][]sample{
AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}, {t: 1587691199999, v: 20}, {t: 1587691499999, v: 20}, {t: 1587691799999, v: 20}, {t: 1587692099999, v: 20}, {t: 1587692399999, v: 20}, {t: 1587692699999, v: 16}, {t: 1587692999999, v: 20}, {t: 1587693299999, v: 20}, {t: 1587693590791, v: 20}},
AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587690899999, v: 9.447457e+06}, {t: 1587691199999, v: 9.542732e+06}, {t: 1587691499999, v: 9.630379e+06}, {t: 1587691799999, v: 9.715631e+06}, {t: 1587692099999, v: 9.799808e+06}, {t: 1587692399999, v: 9.888117e+06}, {t: 1587692699999, v: 2.98928e+06}, {t: 1587692999999, v: 81592}, {t: 1587693299999, v: 163711}, {t: 1587693590791, v: 255746}},
AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}, {t: 1587691499999, v: 479625}, {t: 1587691799999, v: 483709}, {t: 1587692099999, v: 488036}, {t: 1587692399999, v: 492223}, {t: 1587692699999, v: 75}, {t: 1587692999999, v: 2261}, {t: 1587693299999, v: 6210}, {t: 1587693590791, v: 10464}},
AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 496544}, {t: 1587692999999, v: 6010}, {t: 1587693299999, v: 10242}, {t: 1587693590791, v: 14956}},
AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 498647}, {t: 1587692999999, v: 502554}, {t: 1587693299999, v: 506786}, {t: 1587693590791, v: 511500}, {t: 1587693590791, v: 14956}},
}
raw := chunkenc.NewXORChunk()
ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{
MinTime: math.MaxInt64,
MaxTime: math.MinInt64,
Chunk: raw,
})

mb := newMemBlock()
mb.addSeries(ser)

fakeMeta := &metadata.Meta{}
fakeMeta.Thanos.Downsample.Resolution = 300_000
id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000)
_ = id
testutil.Ok(t, err)
}

func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "downsample-mixed")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ser := &series{lset: labels.FromStrings("__name__", "a")}
aggr := map[AggrType][]sample{
AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}, {t: 1587691199999, v: 20}, {t: 1587691499999, v: 20}, {t: 1587691799999, v: 20}, {t: 1587692099999, v: 20}, {t: 1587692399999, v: 20}, {t: 1587692699999, v: 16}, {t: 1587692999999, v: 20}, {t: 1587693299999, v: 20}, {t: 1587693590791, v: 20}},
AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587690899999, v: 9.447457e+06}, {t: 1587691199999, v: 9.542732e+06}, {t: 1587691499999, v: 9.630379e+06}, {t: 1587691799999, v: 9.715631e+06}, {t: 1587692099999, v: 9.799808e+06}, {t: 1587692399999, v: 9.888117e+06}, {t: 1587692699999, v: 2.98928e+06}, {t: 1587692999999, v: 81592}, {t: 1587693299999, v: 163711}, {t: 1587693590791, v: 255746}},
AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}, {t: 1587691499999, v: 479625}, {t: 1587691799999, v: 483709}, {t: 1587692099999, v: 488036}, {t: 1587692399999, v: 492223}, {t: 1587692699999, v: 75}, {t: 1587692999999, v: 2261}, {t: 1587693299999, v: 6210}, {t: 1587693590791, v: 10464}},
AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 496544}, {t: 1587692999999, v: 6010}, {t: 1587693299999, v: 10242}, {t: 1587693590791, v: 14956}},
AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 498647}, {t: 1587692999999, v: 502554}, {t: 1587693299999, v: 506786}, {t: 1587693590791, v: 511500}, {t: 1587693590791, v: 14956}},
}
raw := chunkenc.NewXORChunk()
app, err := raw.Appender()
testutil.Ok(t, err)
app.Append(1587690005794, 42.5)
ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{
MinTime: math.MaxInt64,
MaxTime: math.MinInt64,
Chunk: raw,
})

mb := newMemBlock()
mb.addSeries(ser)

fakeMeta := &metadata.Meta{}
fakeMeta.Thanos.Downsample.Resolution = 300_000
id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000)
_ = id
testutil.NotOk(t, err)
}

func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series {
if len(inRaw) > 0 && len(inAggr) > 0 {
t.Fatalf("test must not have raw and aggregate input data at once")
Expand Down