diff --git a/CHANGELOG.md b/CHANGELOG.md index 036cdd1368..e7fa95dee4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-.fifocache.size` CLI flag has been renamed to `-.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319 * [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437 * [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454 +* [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446 * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 * [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400 * [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109 diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 111493969d..02cc7da879 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -396,36 +396,12 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro incomingErrors := make(chan error) for _, matcher := range matchers { go func(matcher *labels.Matcher) { - // Lookup IndexQuery's - var queries []IndexQuery - var err error - if matcher.Type != labels.MatchEqual { - queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name) - } else { - queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) - } + chunkIDs, err := c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, nil) if err != nil { incomingErrors <- err - return - } - level.Debug(log).Log("matcher", matcher, "queries", len(queries)) - - // Lookup IndexEntry's - entries, err := c.lookupEntriesByQueries(ctx, queries) - if err != nil { - incomingErrors <- err - return - } - level.Debug(log).Log("matcher", matcher, "entries", len(entries)) - - // Convert IndexEntry's to chunk IDs, filter out non-matchers at the same time. - chunkIDs, err := c.parseIndexEntries(ctx, entries, matcher) - if err != nil { - incomingErrors <- err - return + } else { + incomingChunkIDs <- chunkIDs } - level.Debug(log).Log("matcher", matcher, "chunkIDs", len(chunkIDs)) - incomingChunkIDs <- chunkIDs }(matcher) } @@ -453,6 +429,61 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) } +func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]IndexQuery) []IndexQuery) ([]string, error) { + log, ctx := spanlogger.New(ctx, "Store.lookupIdsByMetricNameMatcher", "metricName", metricName, "matcher", matcher) + defer log.Span.Finish() + + var err error + var queries []IndexQuery + var labelName string + if matcher == nil { + queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, metricName) + } else if matcher.Type == labels.MatchEqual { + labelName = matcher.Name + queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) + } else if matcher.Type == labels.MatchRegexp && len(findSetMatches(matcher.Value)) > 0 { + set := findSetMatches(matcher.Value) + for _, v := range set { + var qs []IndexQuery + qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v) + if err != nil { + break + } + queries = append(queries, qs...) + } + } else { + labelName = matcher.Name + queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name) + } + if err != nil { + return nil, err + } + level.Debug(log).Log("matcher", matcher, "queries", len(queries)) + + if filter != nil { + queries = filter(queries) + level.Debug(log).Log("matcher", matcher, "filteredQueries", len(queries)) + } + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if e, ok := err.(CardinalityExceededError); ok { + e.MetricName = metricName + e.LabelName = labelName + return nil, e + } else if err != nil { + return nil, err + } + level.Debug(log).Log("matcher", matcher, "entries", len(entries)) + + ids, err := c.parseIndexEntries(ctx, entries, matcher) + if err != nil { + return nil, err + } + level.Debug(log).Log("matcher", matcher, "ids", len(ids)) + + return ids, nil +} + func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries") defer log.Span.Finish() diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index deaea2ad38..34aa72852a 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -5,6 +5,8 @@ import ( "fmt" "math/rand" "reflect" + "sort" + "sync" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -62,10 +65,16 @@ func newTestChunkStore(t require.TestingT, schemaName string) Store { } func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg StoreConfig) Store { - var ( - tbmConfig TableManagerConfig - schemaCfg = DefaultSchemaConfig("", schemaName, 0) - ) + schemaCfg := DefaultSchemaConfig("", schemaName, 0) + + schema, err := schemaCfg.Configs[0].CreateSchema() + require.NoError(t, err) + + return newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schema, storeCfg) +} + +func newTestChunkStoreConfigWithMockStorage(t require.TestingT, schemaCfg SchemaConfig, schema BaseSchema, storeCfg StoreConfig) Store { + var tbmConfig TableManagerConfig err := schemaCfg.Validate() require.NoError(t, err) flagext.DefaultValues(&tbmConfig) @@ -88,7 +97,7 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto require.NoError(t, err) store := NewCompositeStore() - err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache) + err = store.addSchema(storeCfg, schema, schemaCfg.Configs[0].From.Time, storage, storage, overrides, chunksCache, writeDedupeCache) require.NoError(t, err) return store } @@ -529,12 +538,179 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { } } -func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher { - matcher, err := labels.NewMatcher(matchType, name, value) - if err != nil { - panic(err) +// TestChunkStore_verifyRegexSetOptimizations tests if chunks are fetched correctly when we have the metric name +func TestChunkStore_verifyRegexSetOptimizations(t *testing.T) { + ctx := context.Background() + now := model.Now() + + testCases := []struct { + query string + expect []string + }{ + { + `foo`, + []string{"foo"}, + }, + { + `foo{bar="baz"}`, + []string{"foo{bar=\"baz\"}"}, + }, + { + `foo{bar!="baz"}`, + []string{"foo"}, + }, + { + `foo{toms="code", bar="beep"}`, + []string{"foo{bar=\"beep\"}", "foo{toms=\"code\"}"}, + }, + { + `foo{bar=~"beep"}`, + []string{"foo{bar=\"beep\"}"}, + }, + { + `foo{bar=~"beep|baz"}`, + []string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}"}, + }, + { + `foo{toms="code", bar=~"beep|baz"}`, + []string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}", "foo{toms=\"code\"}"}, + }, + { + `foo{bar=~".+"}`, + []string{"foo{bar}"}, + }, + } + + for _, schema := range schemas { + var storeCfg StoreConfig + flagext.DefaultValues(&storeCfg) + + schemaCfg := DefaultSchemaConfig("", schema, 0) + schemaObj, err := schemaCfg.Configs[0].CreateSchema() + require.NoError(t, err) + + var mockSchema = &mockBaseSchema{schema: schemaObj} + + switch s := schemaObj.(type) { + case StoreSchema: + schemaObj = mockStoreSchema{mockBaseSchema: mockSchema, schema: s} + case SeriesStoreSchema: + schemaObj = mockSeriesStoreSchema{mockBaseSchema: mockSchema, schema: s} + } + + store := newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schemaObj, storeCfg) + defer store.Stop() + + from := now.Add(-time.Hour) + through := now + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) { + // reset queries for test + mockSchema.resetQueries() + + t.Log("========= Running query", tc.query, "with schema", schema) + matchers, err := promql.ParseMetricSelector(tc.query) + if err != nil { + t.Fatal(err) + } + + _, err = store.Get(ctx, userID, from, through, matchers...) + require.NoError(t, err) + + qs := mockSchema.getQueries() + sort.Strings(qs) + + if !reflect.DeepEqual(tc.expect, qs) { + t.Fatalf("%s: wrong queries - %s", tc.query, test.Diff(tc.expect, qs)) + } + }) + } } - return matcher +} + +type mockBaseSchema struct { + schema BaseSchema + + mu sync.Mutex + queries []string +} + +func (m *mockBaseSchema) getQueries() []string { + m.mu.Lock() + defer m.mu.Unlock() + return m.queries +} + +func (m *mockBaseSchema) resetQueries() { + m.mu.Lock() + defer m.mu.Unlock() + m.queries = nil +} + +func (m *mockBaseSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) { + m.mu.Lock() + m.queries = append(m.queries, metricName) + m.mu.Unlock() + + return m.schema.GetReadQueriesForMetric(from, through, userID, metricName) +} + +func (m *mockBaseSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) { + m.mu.Lock() + m.queries = append(m.queries, fmt.Sprintf("%s{%s}", metricName, labelName)) + m.mu.Unlock() + + return m.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) +} + +func (m *mockBaseSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) { + m.mu.Lock() + m.queries = append(m.queries, fmt.Sprintf("%s{%s=%q}", metricName, labelName, labelValue)) + m.mu.Unlock() + return m.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue) +} + +func (m *mockBaseSchema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return m.schema.FilterReadQueries(queries, shard) +} + +type mockStoreSchema struct { + *mockBaseSchema + schema StoreSchema +} + +func (m mockStoreSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { + return m.schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID) +} + +type mockSeriesStoreSchema struct { + *mockBaseSchema + schema SeriesStoreSchema +} + +func (m mockSeriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) { + return m.schema.GetCacheKeysAndLabelWriteEntries(from, through, userID, metricName, labels, chunkID) +} + +func (m mockSeriesStoreSchema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { + return m.schema.GetChunkWriteEntries(from, through, userID, metricName, labels, chunkID) +} + +func (m mockSeriesStoreSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + return m.schema.GetChunksForSeries(from, through, userID, seriesID) +} + +func (m mockSeriesStoreSchema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + return m.schema.GetLabelNamesForSeries(from, through, userID, seriesID) +} + +func (m mockSeriesStoreSchema) GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) { + return m.schema.GetSeriesDeleteEntries(from, through, userID, metric, hasChunksForIntervalFunc) +} + +func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher { + return labels.MustNewMatcher(matchType, name, value) } func TestChunkStoreRandom(t *testing.T) { diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 5f55ec4310..2dc4b155bb 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -65,7 +65,15 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index return err } - var store Store + return c.addSchema(storeCfg, schema, cfg.From.Time, index, chunks, limits, chunksCache, writeDedupeCache) +} + +func (c *CompositeStore) addSchema(storeCfg StoreConfig, schema BaseSchema, start model.Time, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error { + var ( + err error + store Store + ) + switch s := schema.(type) { case SeriesStoreSchema: store, err = newSeriesStore(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) @@ -77,7 +85,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index if err != nil { return err } - c.stores = append(c.stores, compositeStoreEntry{start: model.TimeFromUnixNano(cfg.From.UnixNano()), Store: store}) + c.stores = append(c.stores, compositeStoreEntry{start: start, Store: store}) return nil } diff --git a/pkg/chunk/opts.go b/pkg/chunk/opts.go new file mode 100644 index 0000000000..10d07b012f --- /dev/null +++ b/pkg/chunk/opts.go @@ -0,0 +1,60 @@ +package chunk + +import ( + "strings" + "unicode/utf8" +) + +// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped. +var regexMetaCharacterBytes [16]byte + +// isRegexMetaCharacter reports whether byte b needs to be escaped. +func isRegexMetaCharacter(b byte) bool { + return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0 +} + +func init() { + for _, b := range []byte(`.+*?()|[]{}^$`) { + regexMetaCharacterBytes[b%16] |= 1 << (b / 16) + } +} + +// copied from Prometheus querier.go, removed check for Prometheus wrapper. +// Returns list of values that can regex matches. +func findSetMatches(pattern string) []string { + escaped := false + sets := []*strings.Builder{{}} + for i := 0; i < len(pattern); i++ { + if escaped { + switch { + case isRegexMetaCharacter(pattern[i]): + sets[len(sets)-1].WriteByte(pattern[i]) + case pattern[i] == '\\': + sets[len(sets)-1].WriteByte('\\') + default: + return nil + } + escaped = false + } else { + switch { + case isRegexMetaCharacter(pattern[i]): + if pattern[i] == '|' { + sets = append(sets, &strings.Builder{}) + } else { + return nil + } + case pattern[i] == '\\': + escaped = true + default: + sets[len(sets)-1].WriteByte(pattern[i]) + } + } + } + matches := make([]string, 0, len(sets)) + for _, s := range sets { + if s.Len() > 0 { + matches = append(matches, s.String()) + } + } + return matches +} diff --git a/pkg/chunk/opts_test.go b/pkg/chunk/opts_test.go new file mode 100644 index 0000000000..282049f669 --- /dev/null +++ b/pkg/chunk/opts_test.go @@ -0,0 +1,50 @@ +package chunk + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// Refer to https://github.com/prometheus/prometheus/issues/2651. +func TestFindSetMatches(t *testing.T) { + cases := []struct { + pattern string + exp []string + }{ + // Simple sets. + { + pattern: "foo|bar|baz", + exp: []string{ + "foo", + "bar", + "baz", + }, + }, + // Simple sets containing escaped characters. + { + pattern: "fo\\.o|bar\\?|\\^baz", + exp: []string{ + "fo.o", + "bar?", + "^baz", + }, + }, + // Simple sets containing special characters without escaping. + { + pattern: "fo.o|bar?|^baz", + exp: nil, + }, + { + pattern: "foo\\|bar\\|baz", + exp: []string{ + "foo|bar|baz", + }, + }, + } + + for _, c := range cases { + matches := findSetMatches(c.pattern) + require.Equal(t, c.exp, matches) + } +} diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index d4966ceb5c..3c94143c07 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -334,47 +334,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from } func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) { - log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher) - defer log.Span.Finish() - - var err error - var queries []IndexQuery - var labelName string - if matcher == nil { - queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, metricName) - } else if matcher.Type != labels.MatchEqual { - labelName = matcher.Name - queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name) - } else { - labelName = matcher.Name - queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) - } - if err != nil { - return nil, err - } - level.Debug(log).Log("queries", len(queries)) - - queries = c.schema.FilterReadQueries(queries, shard) - - level.Debug(log).Log("filteredQueries", len(queries)) - - entries, err := c.lookupEntriesByQueries(ctx, queries) - if e, ok := err.(CardinalityExceededError); ok { - e.MetricName = metricName - e.LabelName = labelName - return nil, e - } else if err != nil { - return nil, err - } - level.Debug(log).Log("entries", len(entries)) - - ids, err := c.parseIndexEntries(ctx, entries, matcher) - if err != nil { - return nil, err - } - level.Debug(log).Log("ids", len(ids)) - - return ids, nil + return c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, func(queries []IndexQuery) []IndexQuery { + return c.schema.FilterReadQueries(queries, shard) + }) } func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {