Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in BinaryReader.LookupSymbol() #3705

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

## Unreleased

### Changed

- [#3705](https://github.com/thanos-io/thanos/pull/3705) Store: Fix race condition leading to failing queries or possibly incorrect query results.

## [v0.18.0](https://github.com/thanos-io/thanos/releases) - Release in progress

### Added
Expand Down
24 changes: 17 additions & 7 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -440,7 +441,8 @@ type BinaryReader struct {
nameSymbols map[uint32]string
// Direct cache of values. This is much faster than an LRU cache and still provides
// a reasonable cache hit ratio.
valueSymbols [valueSymbolsCacheSize]struct {
valueSymbolsMx sync.Mutex
valueSymbols [valueSymbolsCacheSize]struct {
index uint32
symbol string
}
Expand Down Expand Up @@ -646,12 +648,12 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) {
}, nil
}

func (r BinaryReader) IndexVersion() (int, error) {
func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, error) {
func (r *BinaryReader) PostingsOffset(name string, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
Expand All @@ -674,7 +676,7 @@ func skipNAndName(d *encoding.Decbuf, buf *int) {
}
d.Skip(*buf)
}
func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) {
rngs := make([]index.Range, 0, len(values))
if r.indexVersion == index.FormatV1 {
e, ok := r.postingsV1[name]
Expand Down Expand Up @@ -812,9 +814,13 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran

func (r *BinaryReader) LookupSymbol(o uint32) (string, error) {
cacheIndex := o % valueSymbolsCacheSize
r.valueSymbolsMx.Lock()
if cached := r.valueSymbols[cacheIndex]; cached.index == o && cached.symbol != "" {
return cached.symbol, nil
v := cached.symbol
r.valueSymbolsMx.Unlock()
return v, nil
}
r.valueSymbolsMx.Unlock()

if s, ok := r.nameSymbols[o]; ok {
return s, nil
Expand All @@ -830,12 +836,16 @@ func (r *BinaryReader) LookupSymbol(o uint32) (string, error) {
if err != nil {
return s, err
}

r.valueSymbolsMx.Lock()
r.valueSymbols[cacheIndex].index = o
r.valueSymbols[cacheIndex].symbol = s
r.valueSymbolsMx.Unlock()

return s, nil
}

func (r BinaryReader) LabelValues(name string) ([]string, error) {
func (r *BinaryReader) LabelValues(name string) ([]string, error) {
if r.indexVersion == index.FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
Expand Down Expand Up @@ -891,7 +901,7 @@ func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}

func (r BinaryReader) LabelNames() ([]string, error) {
func (r *BinaryReader) LabelNames() ([]string, error) {
allPostingsKeyName, _ := index.AllPostingsKey()
labelNames := make([]string, 0, len(r.postings))
for name := range r.postings {
Expand Down
60 changes: 60 additions & 0 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package indexheader

import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/go-kit/kit/log"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -368,6 +371,63 @@ func BenchmarkBinaryReader(t *testing.B) {
}
}

func BenchmarkBinaryReader_LookupSymbol(b *testing.B) {
for _, numSeries := range []int{valueSymbolsCacheSize, valueSymbolsCacheSize * 10} {
b.Run(fmt.Sprintf("num series = %d", numSeries), func(b *testing.B) {
benchmarkBinaryReaderLookupSymbol(b, numSeries)
})
}
}

func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) {
const postingOffsetsInMemSampling = 32

ctx := context.Background()
logger := log.NewNopLogger()

tmpDir, err := ioutil.TempDir("", "benchmark-lookupsymbol")
testutil.Ok(b, err)
defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(b, err)
defer func() { testutil.Ok(b, bkt.Close()) }()

// Generate series labels.
seriesLabels := make([]labels.Labels, 0, numSeries)
for i := 0; i < numSeries; i++ {
seriesLabels = append(seriesLabels, labels.Labels{{Name: "a", Value: strconv.Itoa(i)}})
}

// Create a block.
id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(b, err)
testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String())))

// Create an index reader.
reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling)
testutil.Ok(b, err)

// Get the offset of each label value symbol.
symbolsOffsets := make([]uint32, numSeries)
for i := 0; i < numSeries; i++ {
o, err := reader.symbols.ReverseLookup(strconv.Itoa(i))
testutil.Ok(b, err)

symbolsOffsets[i] = o
}

b.ResetTimer()

for n := 0; n < b.N; n++ {
for i := 0; i < len(symbolsOffsets); i++ {
if _, err := reader.LookupSymbol(symbolsOffsets[i]); err != nil {
b.Fail()
}
}
}
}

func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
version := int(b.Range(4, 5)[0])

Expand Down