Skip to content

Commit

Permalink
Use sync pool to reuse sharding buffers
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jul 19, 2022
1 parent db07a4e commit c1cd67d
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/queryfrontend/shard_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package queryfrontend

import (
"context"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down
19 changes: 13 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type BucketStore struct {
dir string
indexCache storecache.IndexCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Bytes

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -401,11 +402,15 @@ func NewBucketStore(
options ...BucketStoreOption,
) (*BucketStore, error) {
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
}},
chunkPool: pool.NoopBytes{},
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
Expand Down Expand Up @@ -1067,6 +1072,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
})
defer span.Finish()

shardMatcher := req.ShardInfo.Matcher(&s.buffers)
defer shardMatcher.Close()
part, pstats, err := blockSeries(
newCtx,
b.extLset,
Expand All @@ -1078,7 +1085,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
req.ShardInfo.Matcher(),
shardMatcher,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

shardMatcher := r.ShardInfo.Matcher()
shardMatcher := r.ShardInfo.Matcher(&p.buffers)
defer shardMatcher.Close()

if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() {
return p.queryPrometheus(s, r)
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type ProxyStore struct {
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels
buffers sync.Pool

responseTimeout time.Duration
metrics *proxyStoreMetrics
Expand Down Expand Up @@ -104,10 +105,14 @@ func NewProxyStore(

metrics := newProxyStoreMetrics(reg)
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
}},
responseTimeout: responseTimeout,
metrics: metrics,
}
Expand Down Expand Up @@ -344,6 +349,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
s.responseTimeout,
s.metrics.emptyStreamResponses,
st.SupportsSharding(),
&s.buffers,
r.ShardInfo,
))
}
Expand Down Expand Up @@ -441,6 +447,7 @@ func startStreamSeriesSet(
responseTimeout time.Duration,
emptyStreamResponses prometheus.Counter,
storeSupportsSharding bool,
buffers *sync.Pool,
shardInfo *storepb.ShardInfo,
) *streamSeriesSet {
s := &streamSeriesSet{
Expand Down Expand Up @@ -491,7 +498,8 @@ func startStreamSeriesSet(
}
}()

shardMatcher := shardInfo.Matcher()
shardMatcher := shardInfo.Matcher(buffers)
defer shardMatcher.Close()
applySharding := shardInfo != nil && !storeSupportsSharding
if applySharding {
msg := "Applying series sharding in the proxy since there is not support in the underlying store"
Expand Down
32 changes: 20 additions & 12 deletions pkg/store/storepb/shard_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
package storepb

import (
"github.com/alecthomas/units"
"sync"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -13,11 +14,11 @@ import (
var sep = []byte{'\xff'}

type ShardMatcher struct {
buf []byte
buf *[]byte
buffers *sync.Pool
shardingLabelset map[string]struct{}

isSharded bool

isSharded bool
by bool
totalShards int64
shardIndex int64
Expand All @@ -27,28 +28,34 @@ func (s *ShardMatcher) IsSharded() bool {
return s.isSharded
}

func (s *ShardMatcher) Close() {
if s.buffers != nil {
s.buffers.Put(s.buf)
}
}

func (s *ShardMatcher) MatchesZLabels(zLabels []labelpb.ZLabel) bool {
// Match all series when query is not sharded
if s == nil || !s.isSharded {
return true
}

s.buf = s.buf[:0]
*s.buf = (*s.buf)[:0]
for _, lbl := range zLabels {
// Exclude metric name and le label from sharding
if lbl.Name == "__name__" || lbl.Name == "le" {
continue
}

if shardByLabel(s.shardingLabelset, lbl, s.by) {
s.buf = append(s.buf, lbl.Name...)
s.buf = append(s.buf, sep[0])
s.buf = append(s.buf, lbl.Value...)
s.buf = append(s.buf, sep[0])
*s.buf = append(*s.buf, lbl.Name...)
*s.buf = append(*s.buf, sep[0])
*s.buf = append(*s.buf, lbl.Value...)
*s.buf = append(*s.buf, sep[0])
}
}

hash := xxhash.Sum64(s.buf)
hash := xxhash.Sum64(*s.buf)
return hash%uint64(s.totalShards) == uint64(s.shardIndex)
}

Expand All @@ -70,7 +77,7 @@ func shardByLabel(labelSet map[string]struct{}, zlabel labelpb.ZLabel, groupingB
return false
}

func (m *ShardInfo) Matcher() *ShardMatcher {
func (m *ShardInfo) Matcher(buffers *sync.Pool) *ShardMatcher {
if m == nil || m.TotalShards < 1 {
return &ShardMatcher{
isSharded: false,
Expand All @@ -79,7 +86,8 @@ func (m *ShardInfo) Matcher() *ShardMatcher {

return &ShardMatcher{
isSharded: true,
buf: make([]byte, 10*units.Kilobyte),
buf: buffers.Get().(*[]byte),
buffers: buffers,
shardingLabelset: m.labelSet(),
by: m.By,
totalShards: m.TotalShards,
Expand Down
10 changes: 9 additions & 1 deletion pkg/store/storepb/shard_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package storepb

import (
"sync"
"testing"

"github.com/alecthomas/units"

"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)
Expand Down Expand Up @@ -107,9 +110,14 @@ func TestShardInfo_MatchesSeries(t *testing.T) {
},
}

buffers := sync.Pool{New: func() interface{} {
b := make([]byte, 0, 10*units.Kilobyte)
return &b
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
matcher := test.shardInfo.Matcher()
matcher := test.shardInfo.Matcher(&buffers)
defer matcher.Close()
isMatch := matcher.MatchesZLabels(test.series)
if isMatch != test.matches {
t.Fatalf("invalid result, got %t, want %t", isMatch, test.matches)
Expand Down
9 changes: 8 additions & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"math"
"sort"
"sync"

"github.com/go-kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -38,6 +39,7 @@ type TSDBStore struct {
db TSDBReader
component component.StoreAPI
extLset labels.Labels
buffers sync.Pool
maxBytesPerFrame int
}

Expand Down Expand Up @@ -65,6 +67,10 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
component: component,
extLset: extLset,
maxBytesPerFrame: RemoteReadFrameLimit,
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
}},
}
}

Expand Down Expand Up @@ -152,7 +158,8 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer

set := q.Select(false, nil, matchers...)

shardMatcher := r.ShardInfo.Matcher()
shardMatcher := r.ShardInfo.Matcher(&s.buffers)
defer shardMatcher.Close()
// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
series := set.At()
Expand Down

0 comments on commit c1cd67d

Please sign in to comment.