Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/proxy: Deduplicate chunks on StoreAPI level. Recommend chunk sorting for StoreAPI. #2603

Merged
merged 3 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also
moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument
anymore.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).

## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30

Expand Down
5 changes: 2 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,9 +981,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()

// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
// Chunks of returned series might be out of order w.r.t to their time range.
// This must be accounted for later by clients.
// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
// blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later.
set := storepb.MergeSeriesSets(res...)
for set.Next() {
var series storepb.Series
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
}
return s.currSeries.Labels, s.currSeries.Chunks
}

func (s *streamSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
Expand Down
12 changes: 4 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,11 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{5, 4}}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}, {{5, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge.
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{2, 2}, {3, 3}, {4, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "c"}},
Expand Down Expand Up @@ -343,7 +339,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}},
chunks: [][]sample{{{1, 11}, {2, 22}, {3, 33}}, {{1, 1}, {2, 2}, {3, 3}}},
},
},
},
Expand Down Expand Up @@ -1220,7 +1216,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got: %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)

for i, series := range got {
testutil.Equals(t, expected[i].lset, series.Labels)
Expand Down
201 changes: 176 additions & 25 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package storepb

import (
"bytes"
"strings"
"unsafe"

Expand Down Expand Up @@ -45,6 +46,7 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}

// CompareLabels compares two sets of labels.
// After lexicographical order, the set with fewer labels comes first.
func CompareLabels(a, b []Label) int {
l := len(a)
if len(b) < l {
Expand All @@ -58,7 +60,7 @@ func CompareLabels(a, b []Label) int {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.

return len(a) - len(b)
}

Expand All @@ -73,13 +75,25 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}

// MergeSeriesSets returns a new series set that is the union of the input sets.
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
return emptySeriesSet{}
case 1:
return all[0]
return newUniqueSeriesSet(all[0])
}
h := len(all) / 2

Expand All @@ -106,11 +120,6 @@ type mergedSeriesSet struct {
adone, bdone bool
}

// newMergedSeriesSet takes two series sets as a single series set.
// Series that occur in both sets should have disjoint time ranges.
// If the ranges overlap b samples are appended to a samples.
// If the single SeriesSet returns same series within many iterations,
// merge series set will not try to merge those.
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
Expand Down Expand Up @@ -150,33 +159,175 @@ func (s *mergedSeriesSet) Next() bool {
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
return true
}
if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
} else {
// Concatenate chunks from both series sets. They may be expected of order
// w.r.t to their time range. This must be accounted for later.
lset, chksA := s.a.At()
_, chksB := s.b.At()

s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
s.chunks = append(s.chunks, chksA...)
s.chunks = append(s.chunks, chksB...)
return true
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.
lset, chksA := s.a.At()
_, chksB := s.b.At()
s.lset = lset

// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))

b := 0
Outer:
for a := range chksA {
for {
if b >= len(chksB) {
// No more b chunks.
s.chunks = append(s.chunks, chksA[a:]...)
break Outer
}

cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// Exact duplicated chunks, discard one from b.
b++
}
}

if b < len(chksB) {
s.chunks = append(s.chunks, chksB[b:]...)
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return true
}

// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series.
type uniqueSeriesSet struct {
SeriesSet
done bool

peek *Series

lset []Label
chunks []AggrChunk
}

func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet {
return &uniqueSeriesSet{SeriesSet: wrapped}
}

func (s *uniqueSeriesSet) At() ([]Label, []AggrChunk) {
return s.lset, s.chunks
}

func (s *uniqueSeriesSet) Next() bool {
if s.Err() != nil {
return false
}

for !s.done {
if s.done = !s.SeriesSet.Next(); s.done {
break
}
lset, chks := s.SeriesSet.At()
if s.peek == nil {
s.peek = &Series{Labels: lset, Chunks: chks}
continue
}

if CompareLabels(lset, s.peek.Labels) != 0 {
s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = &Series{Labels: lset, Chunks: chks}
return true
}

// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)
}

if s.peek == nil {
return false
}

s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = nil
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}

// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}

if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
func LabelsToPromLabels(lset []Label) labels.Labels {
ret := make(labels.Labels, len(lset))
Expand Down
Loading