Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bucket verify: repair out of order labels #964

Merged
5 changes: 3 additions & 2 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
var backupBkt objstore.Bucket
if len(backupconfContentYaml) == 0 {
if *repair {
return errors.Wrap(err, "repair is specified, so backup client is required")
return errors.New("repair is specified, so backup client is required")
}
} else {
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name)
// nil Prometheus registerer: don't create conflicting metrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good solution. It's essentially as easy as prometheus.WrapRegisterWithPrefix("backup_..., reg) (:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But also, not sure if it matters as it is only batch jobs, no one looks on metrics ;p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, otherwise we register the same metrics twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you don't: It's essentially as easy as prometheus.WrapRegisterWithPrefix("backup_..., reg) (:

backupBkt, err = client.NewBucket(logger, backupconfContentYaml, nil, name)
if err != nil {
return err
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chun
// the current one.
if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime {
return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]",
last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime)
last.MinTime, last.MaxTime, curr.MinTime, curr.MaxTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow! that was super confusing indeed, thanks for spotting!

}
ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli)
cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli)
Expand Down Expand Up @@ -559,9 +559,14 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk
var last *chunks.Meta

OUTER:
for _, c := range chks {
// This compares the current chunk to the chunk from the last iteration
// by pointers. If we use "i, c := range chks" the variable c is a new
// variable who's address doesn't change through the entire loop.
// The current element of the chks slice is copied into it. We must take
// the address of the indexed slice instead.
for i := range chks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very subtle. So we are remembering a pointer to the chunk c from the last iteration to compare it against the chunk in the current iteration. However, when we use for index, value := range slice the value is not a pointer into the slice. In fact its a new variable the current item of the slice is copied into. Which means our pointer based comparisons are broken -- they always compare the current chunk to itself as the address of the variable c doesn't change throughout the loop.

Using just a slice index here allows us to correctly store a pointer to the item of the slice from the last iteration and compare that to the chunk in the current iteration. Otherwise, this code was removing all chunks in the series other than the first one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document this :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the right place to do so? Glad to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in the code. If that's not the best place, let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing, nice catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like, why did the repair just lose all the data in by blocks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice catch, the comment makes it clear when we read this in 3 months again :) 👍

for _, ignoreChkFn := range ignoreChkFns {
ignore, err := ignoreChkFn(mint, maxt, last, &c)
ignore, err := ignoreChkFn(mint, maxt, last, &chks[i])
if err != nil {
return nil, errors.Wrap(err, "ignore function")
}
Expand All @@ -571,13 +576,18 @@ OUTER:
}
}

last = &c
repl = append(repl, c)
last = &chks[i]
repl = append(repl, chks[i])
}

return repl, nil
}

type seriesRepair struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using just series type or name it series?

lset labels.Labels
chks []chunks.Meta
}

// rewrite writes all data from the readers back into the writers while cleaning
// up mis-ordered and duplicated chunks.
func rewrite(
Expand Down Expand Up @@ -605,17 +615,20 @@ func rewrite(
postings = index.NewMemPostings()
values = map[string]stringset{}
i = uint64(0)
series = []seriesRepair{}
)

var lset labels.Labels
var chks []chunks.Meta

for all.Next() {
var lset labels.Labels
var chks []chunks.Meta
id := all.At()

if err := indexr.Series(id, &lset, &chks); err != nil {
return err
}
// Make sure labels are in sorted order.
sort.Sort(lset)

for i, c := range chks {
chks[i].Chunk, err = chunkr.Chunk(c.Ref)
if err != nil {
Expand All @@ -632,34 +645,49 @@ func rewrite(
continue
}

if err := chunkw.WriteChunks(chks...); err != nil {
series = append(series, seriesRepair{
lset: lset,
chks: chks,
})
}

if all.Err() != nil {
return errors.Wrap(all.Err(), "iterate series")
}

// Sort the series, if labels are re-ordered then the ordering of series
// will be different.
sort.Slice(series, func(i, j int) bool {
return labels.Compare(series[i].lset, series[j].lset) < 0
})

// Build a new TSDB block.
for _, s := range series {
if err := chunkw.WriteChunks(s.chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, chks...); err != nil {
if err := indexw.AddSeries(i, s.lset, s.chks...); err != nil {
return errors.Wrap(err, "add series")
}

meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumChunks += uint64(len(s.chks))
meta.Stats.NumSeries++

for _, chk := range chks {
for _, chk := range s.chks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}

for _, l := range lset {
for _, l := range s.lset {
valset, ok := values[l.Name]
if !ok {
valset = stringset{}
values[l.Name] = valset
}
valset.set(l.Value)
}
postings.Add(i, lset)
postings.Add(i, s.lset)
i++
}
if all.Err() != nil {
return errors.Wrap(all.Err(), "iterate series")
}

s := make([]string, 0, 256)
for n, v := range values {
Expand Down
3 changes: 2 additions & 1 deletion pkg/verifier/overlapped_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package verifier

import (
"context"
"sort"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand All @@ -10,7 +12,6 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"sort"
)

const OverlappedBlocksIssueID = "overlapped_blocks"
Expand Down
13 changes: 9 additions & 4 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package verifier

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -31,13 +31,18 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

dir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id))
tempdir, err := ioutil.TempDir("", "safe-delete")
if err != nil {
return err
}
dir := filepath.Join(tempdir, id.String())
err = os.Mkdir(dir, 0755)
if err != nil {
return err
}
defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Warn(logger).Log("msg", "failed to delete dir", "dir", dir, "err", err)
if err := os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err)
}
}()

Expand Down