diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bbcacddbd..d560b51218 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ## Unreleased +### Changed + +- [#3705](https://github.com/thanos-io/thanos/pull/3705) Store: Fix race condition leading to failing queries or possibly incorrect query results. + ## [v0.18.0](https://github.com/thanos-io/thanos/releases) - Release in progress ### Added diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index dee6788187..ccbf2f1b49 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "sort" + "sync" "time" "unsafe" @@ -440,7 +441,8 @@ type BinaryReader struct { nameSymbols map[uint32]string // Direct cache of values. This is much faster than an LRU cache and still provides // a reasonable cache hit ratio. - valueSymbols [valueSymbolsCacheSize]struct { + valueSymbolsMx sync.Mutex + valueSymbols [valueSymbolsCacheSize]struct { index uint32 symbol string } @@ -646,12 +648,12 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { }, nil } -func (r BinaryReader) IndexVersion() (int, error) { +func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } // TODO(bwplotka): Get advantage of multi value offset fetch. -func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, error) { +func (r *BinaryReader) PostingsOffset(name string, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err @@ -674,7 +676,7 @@ func skipNAndName(d *encoding.Decbuf, buf *int) { } d.Skip(*buf) } -func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) { +func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) { rngs := make([]index.Range, 0, len(values)) if r.indexVersion == index.FormatV1 { e, ok := r.postingsV1[name] @@ -812,9 +814,13 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran func (r *BinaryReader) LookupSymbol(o uint32) (string, error) { cacheIndex := o % valueSymbolsCacheSize + r.valueSymbolsMx.Lock() if cached := r.valueSymbols[cacheIndex]; cached.index == o && cached.symbol != "" { - return cached.symbol, nil + v := cached.symbol + r.valueSymbolsMx.Unlock() + return v, nil } + r.valueSymbolsMx.Unlock() if s, ok := r.nameSymbols[o]; ok { return s, nil @@ -830,12 +836,16 @@ func (r *BinaryReader) LookupSymbol(o uint32) (string, error) { if err != nil { return s, err } + + r.valueSymbolsMx.Lock() r.valueSymbols[cacheIndex].index = o r.valueSymbols[cacheIndex].symbol = s + r.valueSymbolsMx.Unlock() + return s, nil } -func (r BinaryReader) LabelValues(name string) ([]string, error) { +func (r *BinaryReader) LabelValues(name string) ([]string, error) { if r.indexVersion == index.FormatV1 { e, ok := r.postingsV1[name] if !ok { @@ -891,7 +901,7 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } -func (r BinaryReader) LabelNames() ([]string, error) { +func (r *BinaryReader) LabelNames() ([]string, error) { allPostingsKeyName, _ := index.AllPostingsKey() labelNames := make([]string, 0, len(r.postings)) for name := range r.postings { diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 365cf452ac..56730d309f 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -5,10 +5,12 @@ package indexheader import ( "context" + "fmt" "io/ioutil" "math" "os" "path/filepath" + "strconv" "testing" "github.com/go-kit/kit/log" @@ -18,6 +20,7 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" @@ -368,6 +371,63 @@ func BenchmarkBinaryReader(t *testing.B) { } } +func BenchmarkBinaryReader_LookupSymbol(b *testing.B) { + for _, numSeries := range []int{valueSymbolsCacheSize, valueSymbolsCacheSize * 10} { + b.Run(fmt.Sprintf("num series = %d", numSeries), func(b *testing.B) { + benchmarkBinaryReaderLookupSymbol(b, numSeries) + }) + } +} + +func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) { + const postingOffsetsInMemSampling = 32 + + ctx := context.Background() + logger := log.NewNopLogger() + + tmpDir, err := ioutil.TempDir("", "benchmark-lookupsymbol") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(b, err) + defer func() { testutil.Ok(b, bkt.Close()) }() + + // Generate series labels. + seriesLabels := make([]labels.Labels, 0, numSeries) + for i := 0; i < numSeries; i++ { + seriesLabels = append(seriesLabels, labels.Labels{{Name: "a", Value: strconv.Itoa(i)}}) + } + + // Create a block. + id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(b, err) + testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()))) + + // Create an index reader. + reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling) + testutil.Ok(b, err) + + // Get the offset of each label value symbol. + symbolsOffsets := make([]uint32, numSeries) + for i := 0; i < numSeries; i++ { + o, err := reader.symbols.ReverseLookup(strconv.Itoa(i)) + testutil.Ok(b, err) + + symbolsOffsets[i] = o + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for i := 0; i < len(symbolsOffsets); i++ { + if _, err := reader.LookupSymbol(symbolsOffsets[i]); err != nil { + b.Fail() + } + } + } +} + func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { version := int(b.Range(4, 5)[0])