Skip to content

Commit

Permalink
Regex set optimization when querying for chunks in index (#2446)
Browse files Browse the repository at this point in the history
* Moved seriesStore lookupSeriesByMetricNameMatcher to store.lookupChunkIdsByMetricNameMatcher.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Implement optimization for set regex.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added test for regex set optimization.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added CHANGELOG.md

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix race errors.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix compilation errors after rebase.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Rename method to lookupIdsByMetricNameMatcher

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Apr 15, 2020
1 parent 4207e78 commit 9051641
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-<prefix>.fifocache.size` CLI flag has been renamed to `-<prefix>.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-<prefix>.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319
* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437
* [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454
* [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109
Expand Down
85 changes: 58 additions & 27 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,36 +396,12 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
incomingErrors := make(chan error)
for _, matcher := range matchers {
go func(matcher *labels.Matcher) {
// Lookup IndexQuery's
var queries []IndexQuery
var err error
if matcher.Type != labels.MatchEqual {
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name)
} else {
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value)
}
chunkIDs, err := c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, nil)
if err != nil {
incomingErrors <- err
return
}
level.Debug(log).Log("matcher", matcher, "queries", len(queries))

// Lookup IndexEntry's
entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
incomingErrors <- err
return
}
level.Debug(log).Log("matcher", matcher, "entries", len(entries))

// Convert IndexEntry's to chunk IDs, filter out non-matchers at the same time.
chunkIDs, err := c.parseIndexEntries(ctx, entries, matcher)
if err != nil {
incomingErrors <- err
return
} else {
incomingChunkIDs <- chunkIDs
}
level.Debug(log).Log("matcher", matcher, "chunkIDs", len(chunkIDs))
incomingChunkIDs <- chunkIDs
}(matcher)
}

Expand Down Expand Up @@ -453,6 +429,61 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
return c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
}

func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]IndexQuery) []IndexQuery) ([]string, error) {
log, ctx := spanlogger.New(ctx, "Store.lookupIdsByMetricNameMatcher", "metricName", metricName, "matcher", matcher)
defer log.Span.Finish()

var err error
var queries []IndexQuery
var labelName string
if matcher == nil {
queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, metricName)
} else if matcher.Type == labels.MatchEqual {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value)
} else if matcher.Type == labels.MatchRegexp && len(findSetMatches(matcher.Value)) > 0 {
set := findSetMatches(matcher.Value)
for _, v := range set {
var qs []IndexQuery
qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v)
if err != nil {
break
}
queries = append(queries, qs...)
}
} else {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name)
}
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "queries", len(queries))

if filter != nil {
queries = filter(queries)
level.Debug(log).Log("matcher", matcher, "filteredQueries", len(queries))
}

entries, err := c.lookupEntriesByQueries(ctx, queries)
if e, ok := err.(CardinalityExceededError); ok {
e.MetricName = metricName
e.LabelName = labelName
return nil, e
} else if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "entries", len(entries))

ids, err := c.parseIndexEntries(ctx, entries, matcher)
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", matcher, "ids", len(ids))

return ids, nil
}

func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()
Expand Down
196 changes: 186 additions & 10 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
"testing"
"time"

Expand All @@ -19,6 +21,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -62,10 +65,16 @@ func newTestChunkStore(t require.TestingT, schemaName string) Store {
}

func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg StoreConfig) Store {
var (
tbmConfig TableManagerConfig
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
)
schemaCfg := DefaultSchemaConfig("", schemaName, 0)

schema, err := schemaCfg.Configs[0].CreateSchema()
require.NoError(t, err)

return newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schema, storeCfg)
}

func newTestChunkStoreConfigWithMockStorage(t require.TestingT, schemaCfg SchemaConfig, schema BaseSchema, storeCfg StoreConfig) Store {
var tbmConfig TableManagerConfig
err := schemaCfg.Validate()
require.NoError(t, err)
flagext.DefaultValues(&tbmConfig)
Expand All @@ -88,7 +97,7 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
require.NoError(t, err)

store := NewCompositeStore()
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache)
err = store.addSchema(storeCfg, schema, schemaCfg.Configs[0].From.Time, storage, storage, overrides, chunksCache, writeDedupeCache)
require.NoError(t, err)
return store
}
Expand Down Expand Up @@ -529,12 +538,179 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
}
}

func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher {
matcher, err := labels.NewMatcher(matchType, name, value)
if err != nil {
panic(err)
// TestChunkStore_verifyRegexSetOptimizations tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_verifyRegexSetOptimizations(t *testing.T) {
ctx := context.Background()
now := model.Now()

testCases := []struct {
query string
expect []string
}{
{
`foo`,
[]string{"foo"},
},
{
`foo{bar="baz"}`,
[]string{"foo{bar=\"baz\"}"},
},
{
`foo{bar!="baz"}`,
[]string{"foo"},
},
{
`foo{toms="code", bar="beep"}`,
[]string{"foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~"beep"}`,
[]string{"foo{bar=\"beep\"}"},
},
{
`foo{bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}"},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~".+"}`,
[]string{"foo{bar}"},
},
}

for _, schema := range schemas {
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)

schemaCfg := DefaultSchemaConfig("", schema, 0)
schemaObj, err := schemaCfg.Configs[0].CreateSchema()
require.NoError(t, err)

var mockSchema = &mockBaseSchema{schema: schemaObj}

switch s := schemaObj.(type) {
case StoreSchema:
schemaObj = mockStoreSchema{mockBaseSchema: mockSchema, schema: s}
case SeriesStoreSchema:
schemaObj = mockSeriesStoreSchema{mockBaseSchema: mockSchema, schema: s}
}

store := newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schemaObj, storeCfg)
defer store.Stop()

from := now.Add(-time.Hour)
through := now

for _, tc := range testCases {
t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) {
// reset queries for test
mockSchema.resetQueries()

t.Log("========= Running query", tc.query, "with schema", schema)
matchers, err := promql.ParseMetricSelector(tc.query)
if err != nil {
t.Fatal(err)
}

_, err = store.Get(ctx, userID, from, through, matchers...)
require.NoError(t, err)

qs := mockSchema.getQueries()
sort.Strings(qs)

if !reflect.DeepEqual(tc.expect, qs) {
t.Fatalf("%s: wrong queries - %s", tc.query, test.Diff(tc.expect, qs))
}
})
}
}
return matcher
}

type mockBaseSchema struct {
schema BaseSchema

mu sync.Mutex
queries []string
}

func (m *mockBaseSchema) getQueries() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.queries
}

func (m *mockBaseSchema) resetQueries() {
m.mu.Lock()
defer m.mu.Unlock()
m.queries = nil
}

func (m *mockBaseSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, metricName)
m.mu.Unlock()

return m.schema.GetReadQueriesForMetric(from, through, userID, metricName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s}", metricName, labelName))
m.mu.Unlock()

return m.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s=%q}", metricName, labelName, labelValue))
m.mu.Unlock()
return m.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
}

func (m *mockBaseSchema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return m.schema.FilterReadQueries(queries, shard)
}

type mockStoreSchema struct {
*mockBaseSchema
schema StoreSchema
}

func (m mockStoreSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID)
}

type mockSeriesStoreSchema struct {
*mockBaseSchema
schema SeriesStoreSchema
}

func (m mockSeriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) {
return m.schema.GetCacheKeysAndLabelWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetChunkWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetChunksForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetLabelNamesForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) {
return m.schema.GetSeriesDeleteEntries(from, through, userID, metric, hasChunksForIntervalFunc)
}

func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher {
return labels.MustNewMatcher(matchType, name, value)
}

func TestChunkStoreRandom(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
return err
}

var store Store
return c.addSchema(storeCfg, schema, cfg.From.Time, index, chunks, limits, chunksCache, writeDedupeCache)
}

func (c *CompositeStore) addSchema(storeCfg StoreConfig, schema BaseSchema, start model.Time, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
var (
err error
store Store
)

switch s := schema.(type) {
case SeriesStoreSchema:
store, err = newSeriesStore(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
Expand All @@ -77,7 +85,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
if err != nil {
return err
}
c.stores = append(c.stores, compositeStoreEntry{start: model.TimeFromUnixNano(cfg.From.UnixNano()), Store: store})
c.stores = append(c.stores, compositeStoreEntry{start: start, Store: store})
return nil
}

Expand Down
Loading

0 comments on commit 9051641

Please sign in to comment.