From 18e9d5b2b569cdec5345bee8554c1362158baece Mon Sep 17 00:00:00 2001 From: Christian Rohmann Date: Thu, 3 Feb 2022 02:41:17 +0100 Subject: [PATCH] Add support for ES index aliases / rollover to the dependency store (Resolves #2143) (#2144) * Add support for ES index aliases / rollover to the dependency store * Give DependencyStore a params struct like the SpanStore to carry its configuration parameters * Adapt and extend the tests accordingly Signed-off-by: Christian Rohmann * Extend es-rollover and es-index-cleaner to support rolling dependencies indices Signed-off-by: Christian Rohmann Co-authored-by: Christian Rohmann Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com> --- cmd/es-index-cleaner/app/index_filter.go | 7 +- cmd/es-rollover/app/index_options.go | 5 ++ cmd/es-rollover/app/index_options_test.go | 14 ++++ .../app/renderer/render.go | 5 +- plugin/storage/es/dependencystore/storage.go | 75 +++++++++++------ .../es/dependencystore/storage_test.go | 83 +++++++++++++++---- plugin/storage/es/factory.go | 20 ++++- .../fixtures/jaeger-dependencies-7.json | 7 ++ .../es/mappings/jaeger-dependencies-7.json | 11 +++ .../storage/integration/elasticsearch_test.go | 9 +- .../integration/es_index_cleaner_test.go | 6 +- .../integration/es_index_rollover_test.go | 4 +- 12 files changed, 193 insertions(+), 53 deletions(-) diff --git a/cmd/es-index-cleaner/app/index_filter.go b/cmd/es-index-cleaner/app/index_filter.go index e7695984a6e..52079e27748 100644 --- a/cmd/es-index-cleaner/app/index_filter.go +++ b/cmd/es-index-cleaner/app/index_filter.go @@ -49,7 +49,7 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index { // archive works only for rollover reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-span-archive-\\d{6}", i.IndexPrefix)) } else if i.Rollover { - reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service)-\\d{6}", i.IndexPrefix)) + reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{6}", i.IndexPrefix)) } else { reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator)) } @@ -58,7 +58,10 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index { for _, in := range indices { if reg.MatchString(in.Index) { // index in write alias cannot be removed - if in.Aliases[i.IndexPrefix+"jaeger-span-write"] || in.Aliases[i.IndexPrefix+"jaeger-service-write"] || in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] { + if in.Aliases[i.IndexPrefix+"jaeger-span-write"] || + in.Aliases[i.IndexPrefix+"jaeger-service-write"] || + in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] || + in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] { continue } filtered = append(filtered, in) diff --git a/cmd/es-rollover/app/index_options.go b/cmd/es-rollover/app/index_options.go index b5e6f9a628d..8a446f36123 100644 --- a/cmd/es-rollover/app/index_options.go +++ b/cmd/es-rollover/app/index_options.go @@ -52,6 +52,11 @@ func RolloverIndices(archive bool, prefix string) []IndexOption { Mapping: "jaeger-service", indexType: "jaeger-service", }, + { + prefix: prefix, + Mapping: "jaeger-dependencies", + indexType: "jaeger-dependencies", + }, } } diff --git a/cmd/es-rollover/app/index_options_test.go b/cmd/es-rollover/app/index_options_test.go index d0e257e0042..6161e603d44 100644 --- a/cmd/es-rollover/app/index_options_test.go +++ b/cmd/es-rollover/app/index_options_test.go @@ -52,6 +52,13 @@ func TestRolloverIndices(t *testing.T) { writeAliasName: "jaeger-service-write", initialRolloverIndex: "jaeger-service-000001", }, + { + templateName: "jaeger-dependencies", + mapping: "jaeger-dependencies", + readAliasName: "jaeger-dependencies-read", + writeAliasName: "jaeger-dependencies-write", + initialRolloverIndex: "jaeger-dependencies-000001", + }, }, }, { @@ -99,6 +106,13 @@ func TestRolloverIndices(t *testing.T) { writeAliasName: "mytenant-jaeger-service-write", initialRolloverIndex: "mytenant-jaeger-service-000001", }, + { + mapping: "jaeger-dependencies", + templateName: "mytenant-jaeger-dependencies", + readAliasName: "mytenant-jaeger-dependencies-read", + writeAliasName: "mytenant-jaeger-dependencies-write", + initialRolloverIndex: "mytenant-jaeger-dependencies-000001", + }, }, }, } diff --git a/cmd/esmapping-generator/app/renderer/render.go b/cmd/esmapping-generator/app/renderer/render.go index 993ab5bc12e..31cee8ecd23 100644 --- a/cmd/esmapping-generator/app/renderer/render.go +++ b/cmd/esmapping-generator/app/renderer/render.go @@ -23,8 +23,9 @@ import ( ) var supportedMappings = map[string]struct{}{ - "jaeger-span": {}, - "jaeger-service": {}, + "jaeger-span": {}, + "jaeger-service": {}, + "jaeger-dependencies": {}, } // GetMappingAsString returns rendered index templates as string diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 9a1f818de20..290a0daec6d 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "time" "github.com/olivere/elastic" @@ -32,38 +31,54 @@ import ( ) const ( - dependencyType = "dependencies" - dependencyIndex = "jaeger-dependencies-" + dependencyType = "dependencies" + dependencyIndex = "jaeger-dependencies-" + indexPrefixSeparator = "-" ) // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - client es.Client - logger *zap.Logger - indexPrefix string - indexDateLayout string - maxDocCount int + client es.Client + logger *zap.Logger + dependencyIndexPrefix string + indexDateLayout string + maxDocCount int + useReadWriteAliases bool +} + +// DependencyStoreParams holds constructor parameters for NewDependencyStore +type DependencyStoreParams struct { + Client es.Client + Logger *zap.Logger + IndexPrefix string + IndexDateLayout string + MaxDocCount int + UseReadWriteAliases bool } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore { - var prefix string - if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") { - prefix = indexPrefix + "-" - } +func NewDependencyStore(p DependencyStoreParams) *DependencyStore { return &DependencyStore{ - client: client, - logger: logger, - indexPrefix: prefix + dependencyIndex, - indexDateLayout: indexDateLayout, - maxDocCount: maxDocCount, + client: p.Client, + logger: p.Logger, + dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex), + indexDateLayout: p.IndexDateLayout, + maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, + } +} + +func prefixIndexName(prefix, index string) string { + if prefix != "" { + return prefix + indexPrefixSeparator + index } + return index } // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - indexName := indexWithDate(s.indexPrefix, s.indexDateLayout, ts) - s.writeDependencies(indexName, ts, dependencies) + writeIndexName := s.getWriteIndex(ts) + s.writeDependencies(writeIndexName, ts, dependencies) return nil } @@ -85,7 +100,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - indices := getIndices(s.indexPrefix, s.indexDateLayout, endTs, lookback) + indices := s.getReadIndices(endTs, lookback) searchResult, err := s.client.Search(indices...). Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). @@ -112,14 +127,17 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query { return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs) } -func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) []string { +func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string { + if s.useReadWriteAliases { + return []string{s.dependencyIndexPrefix + "read"} + } var indices []string - firstIndex := indexWithDate(prefix, dateLayout, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, dateLayout, ts) + firstIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts.Add(-lookback)) + currentIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, dateLayout, ts) + currentIndex = indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) } return append(indices, firstIndex) } @@ -127,3 +145,10 @@ func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { return indexNamePrefix + date.UTC().Format(indexDateLayout) } + +func (s *DependencyStore) getWriteIndex(ts time.Time) string { + if s.useReadWriteAliases { + return s.dependencyIndexPrefix + "write" + } + return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts) +} diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 3465752d2e1..c01760d73c3 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -50,7 +50,13 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix, indexDateLayout, maxDocCount), + storage: NewDependencyStore(DependencyStoreParams{ + Client: client, + Logger: logger, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, + MaxDocCount: maxDocCount, + }), } fn(r) } @@ -69,8 +75,15 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, "2006-01-02", defaultMaxDocCount) - assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) + r := NewDependencyStore(DependencyStoreParams{ + Client: client, + Logger: zap.NewNop(), + IndexPrefix: testCase.prefix, + IndexDateLayout: "2006-01-02", + MaxDocCount: defaultMaxDocCount, + }) + + assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix) } } @@ -200,36 +213,76 @@ func createSearchResult(dependencyLink string) *elastic.SearchResult { return searchResult } -func TestGetIndices(t *testing.T) { +func TestGetReadIndices(t *testing.T) { fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) testCases := []struct { - expected []string + indices []string lookback time.Duration - prefix string + params DependencyStoreParams }{ { - expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, + params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true}, + lookback: 23 * time.Hour, + indices: []string{ + dependencyIndex + "read", + }, + }, + { + params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"}, lookback: 23 * time.Hour, - prefix: "", + indices: []string{ + dependencyIndex + fixedTime.Format("2006-01-02"), + dependencyIndex + fixedTime.Add(-23*time.Hour).Format("2006-01-02"), + }, }, { - expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, + params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"}, lookback: 13 * time.Hour, - prefix: "", + indices: []string{ + dependencyIndex + fixedTime.UTC().Format("2006-01-02"), + dependencyIndex + fixedTime.Add(-13*time.Hour).Format("2006-01-02"), + }, }, { - expected: []string{indexWithDate("foo:", "2006-01-02", fixedTime)}, + params: DependencyStoreParams{IndexPrefix: "foo:", IndexDateLayout: "2006-01-02"}, lookback: 1 * time.Hour, - prefix: "foo:", + indices: []string{ + "foo:" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"), + }, }, { - expected: []string{indexWithDate("foo-", "2006-01-02", fixedTime)}, + params: DependencyStoreParams{IndexPrefix: "foo-", IndexDateLayout: "2006-01-02"}, lookback: 0, - prefix: "foo-", + indices: []string{ + "foo-" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"), + }, + }, + } + for _, testCase := range testCases { + s := NewDependencyStore(testCase.params) + assert.EqualValues(t, testCase.indices, s.getReadIndices(fixedTime, testCase.lookback)) + } +} + +func TestGetWriteIndex(t *testing.T) { + fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) + testCases := []struct { + writeIndex string + lookback time.Duration + params DependencyStoreParams + }{ + { + params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true}, + writeIndex: dependencyIndex + "write", + }, + { + params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: false}, + writeIndex: dependencyIndex + fixedTime.Format("2006-01-02"), }, } for _, testCase := range testCases { - assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, "2006-01-02", fixedTime, testCase.lookback)) + s := NewDependencyStore(testCase.params) + assert.EqualValues(t, testCase.writeIndex, s.getWriteIndex(fixedTime)) } } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 6ce1f25d34f..99ee9b19cd5 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -109,9 +109,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), - f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount()) - return reader, nil + return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig) } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -210,6 +208,22 @@ func createSpanWriter( return writer, nil } +func createDependencyReader( + logger *zap.Logger, + client es.Client, + cfg config.ClientBuilder, +) (dependencystore.Reader, error) { + + reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ + Client: client, + Logger: logger, + IndexPrefix: cfg.GetIndexPrefix(), + MaxDocCount: cfg.GetMaxDocCount(), + UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + }) + return reader, nil +} + var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory diff --git a/plugin/storage/es/mappings/fixtures/jaeger-dependencies-7.json b/plugin/storage/es/mappings/fixtures/jaeger-dependencies-7.json index 6c4f46fe96d..c0cabb74ea9 100644 --- a/plugin/storage/es/mappings/fixtures/jaeger-dependencies-7.json +++ b/plugin/storage/es/mappings/fixtures/jaeger-dependencies-7.json @@ -1,10 +1,17 @@ { "index_patterns": "*jaeger-dependencies-*", + "aliases": { + "test-jaeger-dependencies-read" : {} + }, "settings":{ "index.number_of_shards": 3, "index.number_of_replicas": 3, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true + ,"lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-dependencies-write" + } }, "mappings":{} } diff --git a/plugin/storage/es/mappings/jaeger-dependencies-7.json b/plugin/storage/es/mappings/jaeger-dependencies-7.json index 770f94a680d..18afe1b056e 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-7.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-7.json @@ -1,10 +1,21 @@ { "index_patterns": "*jaeger-dependencies-*", + {{- if .UseILM }} + "aliases": { + "{{ .IndexPrefix }}jaeger-dependencies-read" : {} + }, + {{- end }} "settings":{ "index.number_of_shards": {{ .Shards }}, "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true + {{- if .UseILM }} + ,"lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + } + {{- end }} }, "mappings":{} } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index f9eac7ca64c..30ae0520e3d 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -152,7 +152,14 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro Archive: archive, MaxDocCount: defaultMaxDocCount, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount) + dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ + Client: client, + Logger: s.logger, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, + MaxDocCount: defaultMaxDocCount, + }) + depMapping, err := mappingBuilder.GetDependenciesMappings() if err != nil { return err diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go index f30c05a993e..f284671b597 100644 --- a/plugin/storage/integration/es_index_cleaner_test.go +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -91,7 +91,7 @@ func TestIndexCleaner(t *testing.T) { envVars: []string{}, expectedIndices: []string{ archiveIndexName, - "jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001", "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000001", "jaeger-span-archive-000002", }, }, @@ -100,7 +100,7 @@ func TestIndexCleaner(t *testing.T) { envVars: []string{"ROLLOVER=true"}, expectedIndices: []string{ archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, - "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000001", "jaeger-span-archive-000002", }, }, @@ -109,7 +109,7 @@ func TestIndexCleaner(t *testing.T) { envVars: []string{"ARCHIVE=true"}, expectedIndices: []string{ archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, - "jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001", "jaeger-span-000002", "jaeger-service-000002", "jaeger-dependencies-000002", "jaeger-span-archive-000002", }, }, diff --git a/plugin/storage/integration/es_index_rollover_test.go b/plugin/storage/integration/es_index_rollover_test.go index 7a78813da52..a3452b329e0 100644 --- a/plugin/storage/integration/es_index_rollover_test.go +++ b/plugin/storage/integration/es_index_rollover_test.go @@ -90,7 +90,7 @@ func runCreateIndicesWithILM(t *testing.T, ilmPolicyName string) { assert.Empty(t, indices) } else { - expectedIndices := []string{"jaeger-span-000001", "jaeger-service-000001"} + expectedIndices := []string{"jaeger-span-000001", "jaeger-service-000001", "jaeger-dependencies-000001"} t.Run(fmt.Sprintf("NoPrefix"), func(t *testing.T) { runIndexRolloverWithILMTest(t, client, "", expectedIndices, envVars, ilmPolicyName) }) @@ -101,7 +101,7 @@ func runCreateIndicesWithILM(t *testing.T, ilmPolicyName string) { } func runIndexRolloverWithILMTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string, ilmPolicyName string) { - writeAliases := []string{"jaeger-service-write", "jaeger-span-write"} + writeAliases := []string{"jaeger-service-write", "jaeger-span-write", "jaeger-dependencies-write"} // make sure ES is cleaned before test cleanES(t, client, ilmPolicyName)