Skip to content

Commit

Permalink
Optimized chunk comparision for overlaps.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 15, 2020
1 parent f6004ab commit 34d9706
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 25 deletions.
46 changes: 32 additions & 14 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down Expand Up @@ -190,27 +191,18 @@ Outer:
break Outer
}

if chksA[a].MinTime < chksB[b].MinTime {
cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}

if chksA[a].MinTime > chksB[b].MinTime {
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// TODO(bwplotka): This is expensive.
//fmt.Println("check strings")
if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 {
// Exact duplicated chunks, discard one from b.
b++
continue
}

// Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop).
s.chunks = append(s.chunks, chksB[b])
// Exact duplicated chunks, discard one from b.
b++
}
}
Expand Down Expand Up @@ -267,7 +259,6 @@ func (s *uniqueSeriesSet) Next() bool {
// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)

}

if s.peek == nil {
Expand All @@ -279,6 +270,33 @@ func (s *uniqueSeriesSet) Next() bool {
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// Same min and max time.
// TODO(bwplotka): Use cmp.Diff(tc.expected, actual, protocmp.Transform()); in newer protobuf API (https://blog.golang.org/protobuf-apiv2),
// once we upgrade protobuf.
if proto.Equal(&m, &b) {
return 0
}
return 1
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
func LabelsToPromLabels(lset []Label) labels.Labels {
ret := make(labels.Labels, len(lset))
Expand Down
39 changes: 28 additions & 11 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,17 +381,25 @@ func expandSeriesSet(t *testing.T, gotSS SeriesSet) (ret []rawSeries) {
}

// Test the cost of merging series sets for different number of merged sets and their size.
// This tests cases with large number of series, with same chunks. Since the subset are unique, this does not capture
// merging of partial or non-overlapping sets well.
func BenchmarkMergedSeriesSet(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
b.Run("overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), true)
})
b.Run("non-overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), false)
})
}

func TestMergedSeriesSet_Labels(b *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
func TestMergedSeriesSet_Labels(t *testing.T) {
t.Run("overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), true)
})
t.Run("non-overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), false)
})
}

func benchmarkMergedSeriesSet(b testutil.TB) {
func benchmarkMergedSeriesSet(b testutil.TB, overlappingChunks bool) {
var sel func(sets []SeriesSet) SeriesSet
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
Expand All @@ -418,19 +426,28 @@ func benchmarkMergedSeriesSet(b testutil.TB) {

sort.Sort(labels.Slice(lbls))

in := make([][]rawSeries, j)

blocks := make([][]rawSeries, j)
for _, l := range lbls {
for j := range in {
in[j] = append(in[j], rawSeries{lset: l, chunks: chunks})
for j := range blocks {
if overlappingChunks {
blocks[j] = append(blocks[j], rawSeries{lset: l, chunks: chunks})
continue
}
blocks[j] = append(blocks[j], rawSeries{
lset: l,
chunks: [][]sample{
{{int64(4*j) + 1, 1}, {int64(4*j) + 2, 2}},
{{int64(4*j) + 3, 3}, {int64(4*j) + 4, 4}},
},
})
}
}

b.ResetTimer()

for i := 0; i < b.N(); i++ {
var sets []SeriesSet
for _, s := range in {
for _, s := range blocks {
sets = append(sets, newListSeriesSet(b, s))
}
ms := sel(sets)
Expand Down

0 comments on commit 34d9706

Please sign in to comment.