From d3b7485cb69a89498842ad039046a1912bf269f6 Mon Sep 17 00:00:00 2001 From: haanhvu Date: Tue, 18 Apr 2023 19:24:39 +0700 Subject: [PATCH] Yuri's approach for swapping client Signed-off-by: haanhvu --- pkg/es/config/config.go | 241 ++++++++---------- plugin/storage/es/dependencystore/storage.go | 10 +- .../es/dependencystore/storage_test.go | 5 +- plugin/storage/es/factory.go | 119 ++++++--- plugin/storage/es/factory_test.go | 17 +- plugin/storage/es/spanstore/reader.go | 8 +- plugin/storage/es/spanstore/reader_test.go | 27 +- .../storage/es/spanstore/service_operation.go | 10 +- plugin/storage/es/spanstore/writer.go | 12 +- plugin/storage/es/spanstore/writer_test.go | 25 +- .../storage/integration/elasticsearch_test.go | 6 +- 11 files changed, 245 insertions(+), 235 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 6b08bc5574a..8c5d112c10e 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -38,7 +38,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" - "github.com/jaegertracing/jaeger/pkg/fswatcher" "github.com/jaegertracing/jaeger/pkg/metrics" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) @@ -78,8 +77,6 @@ type Configuration struct { Version uint `mapstructure:"version"` LogLevel string `mapstructure:"log_level"` SendGetBodyAs string `mapstructure:"send_get_body_as"` - optionsChan chan []elastic.ClientOptionFunc - passwordFileWatcher *fswatcher.FSWatcher } // TagsAsFields holds configuration for tag schema. @@ -98,7 +95,8 @@ type TagsAsFields struct { // ClientBuilder creates new es.Client type ClientBuilder interface { - ChannelNewClient(clientChan chan<- es.Client, logger *zap.Logger, metricsFactory metrics.Factory) error + NewClient(logger *zap.Logger, options []elastic.ClientOptionFunc, metricsFactory metrics.Factory) (es.Client, error) + GetConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) GetRemoteReadClusters() []string GetNumShards() int64 GetNumReplicas() int64 @@ -115,6 +113,8 @@ type ClientBuilder interface { GetTagDotReplacement() string GetUseReadWriteAliases() bool GetTokenFilePath() string + GetPasswordFilePath() string + GetUsername() string IsStorageEnabled() bool IsCreateIndexTemplates() bool GetVersion() uint @@ -124,115 +124,94 @@ type ClientBuilder interface { GetSendGetBodyAs() string } -// ChannelNewClient channel new ElasticSearch client. -// We need a channel because the password (if set from file) of client can change. -func (c *Configuration) ChannelNewClient(clientChan chan<- es.Client, logger *zap.Logger, metricsFactory metrics.Factory) error { +// NewClient creates a new ElasticSearch client +func (c *Configuration) NewClient(logger *zap.Logger, options []elastic.ClientOptionFunc, metricsFactory metrics.Factory) (es.Client, error) { if len(c.Servers) < 1 { - return errors.New("no servers specified") + return nil, errors.New("no servers specified") } - err := c.channelConfigOptions(logger) + rawClient, err := elastic.NewClient(options...) if err != nil { - close(clientChan) - return err + return nil, err } - go func() { - for options := range c.optionsChan { - rawClient, err := elastic.NewClient(options...) - if err != nil { - close(c.optionsChan) - close(clientChan) - logger.Error("Error creating ES client", zap.Error(err)) - } - - sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index") - m := sync.Map{} - - service, err := rawClient.BulkProcessor(). - Before(func(id int64, requests []elastic.BulkableRequest) { - m.Store(id, time.Now()) - }). - After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { - start, ok := m.Load(id) - if !ok { - return - } - m.Delete(id) - - // log individual errors, note that err might be false and these errors still present - if response != nil && response.Errors { - for _, it := range response.Items { - for key, val := range it { - if val.Error != nil { - logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key), - zap.Reflect("response", val)) - } - } - } - } + sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index") + m := sync.Map{} - sm.Emit(err, time.Since(start.(time.Time))) - if err != nil { - var failed int - if response == nil { - failed = 0 - } else { - failed = len(response.Failed()) + service, err := rawClient.BulkProcessor(). + Before(func(id int64, requests []elastic.BulkableRequest) { + m.Store(id, time.Now()) + }). + After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + start, ok := m.Load(id) + if !ok { + return + } + m.Delete(id) + + // log individual errors, note that err might be false and these errors still present + if response != nil && response.Errors { + for _, it := range response.Items { + for key, val := range it { + if val.Error != nil { + logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key), + zap.Reflect("response", val)) } - total := len(requests) - logger.Error("Elasticsearch could not process bulk request", - zap.Int("request_count", total), - zap.Int("failed_count", failed), - zap.Error(err), - zap.Any("response", response)) } - }). - BulkSize(c.BulkSize). - Workers(c.BulkWorkers). - BulkActions(c.BulkActions). - FlushInterval(c.BulkFlushInterval). - Do(context.Background()) - if err != nil { - close(c.optionsChan) - close(clientChan) - logger.Error("Error setting up concurrent processor of bulk requests", zap.Error(err)) + } } - if c.Version == 0 { - // Determine ElasticSearch Version - pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background()) - if err != nil { - close(c.optionsChan) - close(clientChan) - logger.Error("Error checking node exists", zap.Error(err)) - } - esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) - if err != nil { - close(c.optionsChan) - close(clientChan) - logger.Error("Error parsing ES version", zap.Error(err)) - } - // OpenSearch is based on ES 7.x - if strings.Contains(pingResult.TagLine, "OpenSearch") { - if pingResult.Version.Number[0] == '1' { - logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings") - esVersion = 7 - } - if pingResult.Version.Number[0] == '2' { - logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings") - esVersion = 7 - } + sm.Emit(err, time.Since(start.(time.Time))) + if err != nil { + var failed int + if response == nil { + failed = 0 + } else { + failed = len(response.Failed()) } - logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) - c.Version = uint(esVersion) + total := len(requests) + logger.Error("Elasticsearch could not process bulk request", + zap.Int("request_count", total), + zap.Int("failed_count", failed), + zap.Error(err), + zap.Any("response", response)) } + }). + BulkSize(c.BulkSize). + Workers(c.BulkWorkers). + BulkActions(c.BulkActions). + FlushInterval(c.BulkFlushInterval). + Do(context.Background()) + if err != nil { + return nil, err + } - clientChan <- eswrapper.WrapESClient(rawClient, service, c.Version) + if c.Version == 0 { + // Determine ElasticSearch Version + pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background()) + if err != nil { + return nil, err + } + esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) + if err != nil { + return nil, err } - }() + // OpenSearch is based on ES 7.x + if strings.Contains(pingResult.TagLine, "OpenSearch") { + if pingResult.Version.Number[0] == '1' { + logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings") + esVersion = 7 + } + if pingResult.Version.Number[0] == '2' { + logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings") + esVersion = 7 + } + } + logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) + c.Version = uint(esVersion) + } - return nil + return eswrapper.WrapESClient(rawClient, service, c.Version), nil } // ApplyDefaults copies settings from source unless its own value is non-zero. @@ -403,6 +382,16 @@ func (c *Configuration) GetTokenFilePath() string { return c.TokenFilePath } +// GetPasswordFilePath returns file path containing the password +func (c *Configuration) GetPasswordFilePath() string { + return c.PasswordFilePath +} + +// GetUsername returns the username of Elasticsearch client +func (c *Configuration) GetUsername() string { + return c.Username +} + // IsStorageEnabled determines whether storage is enabled func (c *Configuration) IsStorageEnabled() bool { return c.Enabled @@ -444,15 +433,12 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) { return tags, nil } -// channelConfigOptions wraps the configs to feed to the ElasticSearch client init. -// We need a channel because the password from PasswordFilePath can change. -func (c *Configuration) channelConfigOptions(logger *zap.Logger) error { +// GetConfigOptions wraps the configs to feed to the ElasticSearch client init +func (c *Configuration) GetConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) { if c.Password != "" && c.PasswordFilePath != "" { - return fmt.Errorf("both Password and PasswordFilePath are set") + return nil, fmt.Errorf("both Password and PasswordFilePath are set") } - c.optionsChan = make(chan []elastic.ClientOptionFunc, 1) - options := []elastic.ClientOptionFunc{ elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer), // Disable health check when token from context is allowed, this is because at this time @@ -468,55 +454,32 @@ func (c *Configuration) channelConfigOptions(logger *zap.Logger) error { } options = append(options, elastic.SetHttpClient(httpClient)) - if c.SendGetBodyAs != "" { - options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs)) - } - - options, err := addLoggerOptions(options, c.LogLevel) - if err != nil { - close(c.optionsChan) - return err - } - - transport, err := GetHTTPRoundTripper(c, logger) - if err != nil { - close(c.optionsChan) - return err - } - httpClient.Transport = transport - if c.Password != "" { options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) - c.optionsChan <- options - close(c.optionsChan) } - if c.PasswordFilePath != "" { passwordFromFile, err := loadFileContent(c.PasswordFilePath) if err != nil { - close(c.optionsChan) - return err + return nil, fmt.Errorf("failed to load password from file: %w", err) } options = append(options, elastic.SetBasicAuth(c.Username, passwordFromFile)) - c.optionsChan <- options + } - onPasswordChange := func() { c.onPasswordChange(c.optionsChan, options) } - c.passwordFileWatcher, err = fswatcher.New([]string{c.PasswordFilePath}, onPasswordChange, logger) - if err != nil { - close(c.optionsChan) - return err - } + if c.SendGetBodyAs != "" { + options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs)) } - return nil -} + options, err := addLoggerOptions(options, c.LogLevel) + if err != nil { + return options, err + } -func (c *Configuration) onPasswordChange(optionsChan chan<- []elastic.ClientOptionFunc, options []elastic.ClientOptionFunc) { - passwordFromFile, _ := loadFileContent(c.PasswordFilePath) - if passwordFromFile != "" { - options = append(options, elastic.SetBasicAuth(c.Username, passwordFromFile)) - optionsChan <- options + transport, err := GetHTTPRoundTripper(c, logger) + if err != nil { + return nil, err } + httpClient.Transport = transport + return options, nil } func addLoggerOptions(options []elastic.ClientOptionFunc, logLevel string) ([]elastic.ClientOptionFunc, error) { diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 8a4203ae4f9..521f2e7b799 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -38,7 +38,7 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - client es.Client + client func() es.Client logger *zap.Logger dependencyIndexPrefix string indexDateLayout string @@ -48,7 +48,7 @@ type DependencyStore struct { // DependencyStoreParams holds constructor parameters for NewDependencyStore type DependencyStoreParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger IndexPrefix string IndexDateLayout string @@ -84,7 +84,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D // CreateTemplates creates index templates. func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error { - _, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background()) + _, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background()) if err != nil { return err } @@ -92,7 +92,7 @@ func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error { } func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) { - s.client.Index().Index(indexName).Type(dependencyType). + s.client().Index().Index(indexName).Type(dependencyType). BodyJson(&dbmodel.TimeDependencies{ Timestamp: ts, Dependencies: dbmodel.FromDomainDependencies(dependencies), @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := s.getReadIndices(endTs, lookback) - searchResult, err := s.client.Search(indices...). + searchResult, err := s.client().Search(indices...). Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 6b27f1cc5d5..86ad2177f44 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun logger: logger, logBuffer: logBuffer, storage: NewDependencyStore(DependencyStoreParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, IndexPrefix: indexPrefix, IndexDateLayout: indexDateLayout, @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { for _, testCase := range testCases { client := &mocks.Client{} r := NewDependencyStore(DependencyStoreParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), IndexPrefix: testCase.prefix, IndexDateLayout: "2006-01-02", diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 51f0abe7e57..c891be9857d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -16,15 +16,22 @@ package es import ( + "errors" "flag" "fmt" "io" + "os" + "path/filepath" + "strings" + "sync/atomic" + "github.com/olivere/elastic" "github.com/spf13/viper" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/pkg/fswatcher" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" @@ -51,12 +58,17 @@ type Factory struct { metricsFactory metrics.Factory logger *zap.Logger - primaryConfig config.ClientBuilder - primaryClientChan chan es.Client - primaryClient es.Client - archiveConfig config.ClientBuilder - archiveClientChan chan es.Client - archiveClient es.Client + primaryConfig config.ClientBuilder + primaryOptions []elastic.ClientOptionFunc + primaryClient es.Client + atomicPrimaryClient atomic.Value + + archiveConfig config.ClientBuilder + archiveOptions []elastic.ClientOptionFunc + archiveClient es.Client + atomicArchiveClient atomic.Value + + watchers []*fswatcher.FSWatcher } // NewFactory creates a new Factory. @@ -90,47 +102,27 @@ func (f *Factory) InitFromOptions(o Options) { // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger + f.watchers = make([]*fswatcher.FSWatcher, 0) + f.primaryOptions = make([]elastic.ClientOptionFunc, 0) + f.primaryClient, _ = f.primaryConfig.NewClient(nil, nil, nil) - f.primaryClientChan = make(chan es.Client, 1) - err := f.primaryConfig.ChannelNewClient(f.primaryClientChan, logger, metricsFactory) + err := f.initializeClient(f.primaryConfig, f.primaryOptions, f.primaryClient, f.atomicPrimaryClient, "primary") if err != nil { - close(f.primaryClientChan) - return fmt.Errorf("failed to channel primary Elasticsearch client: %w", err) + return err } - go func() { - f.primaryClient = <-f.primaryClientChan - }() if f.archiveConfig.IsStorageEnabled() { - f.archiveClientChan = make(chan es.Client, 1) - err := f.archiveConfig.ChannelNewClient(f.archiveClientChan, logger, metricsFactory) + f.archiveOptions = make([]elastic.ClientOptionFunc, 0) + f.archiveClient, _ = f.archiveConfig.NewClient(nil, nil, nil) + err = f.initializeClient(f.archiveConfig, f.archiveOptions, f.archiveClient, f.atomicArchiveClient, "archive") if err != nil { - close(f.archiveClientChan) - return fmt.Errorf("failed to channel archive Elasticsearch client: %w", err) + return err } - go func() { - f.archiveClient = <-f.archiveClientChan - }() } return nil } -/*func (f *Factory) assignClientFromChannel(config config.ClientBuilder, clientChan chan es.Client, esClient es.Client, metricsFactory metrics.Factory, logger *zap.Logger) error { - err := config.ChannelNewClient(clientChan, logger, metricsFactory) - if err != nil { - return err - } - - go func() { - for client := range clientChan { - esClient = <-clientChan - } - }() - - return nil -}*/ - // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) @@ -173,7 +165,7 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: mFactory, MaxDocCount: cfg.GetMaxDocCount(), @@ -221,7 +213,7 @@ func createSpanWriter( return nil, err } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: mFactory, IndexPrefix: cfg.GetIndexPrefix(), @@ -250,7 +242,7 @@ func createDependencyReader( cfg config.ClientBuilder, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, IndexPrefix: cfg.GetIndexPrefix(), IndexDateLayout: cfg.GetIndexDateLayoutDependencies(), @@ -264,8 +256,57 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { + var errs []error + for _, w := range f.watchers { + errs = append(errs, w.Close()) + } if cfg := f.Options.Get(archiveNamespace); cfg != nil { cfg.TLS.Close() } - return f.Options.GetPrimary().TLS.Close() + errs = append(errs, f.Options.GetPrimary().TLS.Close()) + return errors.Join(errs...) +} + +// initializeClient initializes either primary or archive ES client +func (f *Factory) initializeClient(config config.ClientBuilder, options []elastic.ClientOptionFunc, + client es.Client, atomic atomic.Value, clientType string) error { + initialOptions, err := config.GetConfigOptions(f.logger) + if err != nil { + return fmt.Errorf("failed to create config options for "+clientType+" Elasticsearch client: %w", err) + } + options = initialOptions + + initialClient, err := config.NewClient(f.logger, options, f.metricsFactory) + if err != nil { + return fmt.Errorf("failed to create "+clientType+" Elasticsearch client: %w", err) + } + client = initialClient + atomic.Store(client) + + onPasswordChange := func() { + f.onPasswordChange(config, options, client) + } + watcher, err := fswatcher.New([]string{config.GetPasswordFilePath()}, onPasswordChange, f.logger) + if err != nil { + return fmt.Errorf("failed to create watcher for "+clientType+" client's password: %w", err) + } + f.watchers = append(f.watchers, watcher) + + return nil +} + +func (f *Factory) onPasswordChange(config config.ClientBuilder, options []elastic.ClientOptionFunc, esClient es.Client) { + b, err := os.ReadFile(filepath.Clean(config.GetPasswordFilePath())) + if err == nil { + newPassword := strings.TrimRight(string(b), "\r\n") + options = append(options, elastic.SetBasicAuth(config.GetUsername(), newPassword)) + client, err := config.NewClient(f.logger, options, f.metricsFactory) + if err != nil { + f.logger.Error("failed to recreate Elasticsearch client from new password", zap.Error(err)) + } else { + esClient = client + } + } else { + f.logger.Error("failed to read password file", zap.Error(err)) + } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 1d70e06dbd3..bf630528b9c 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -20,6 +20,7 @@ import ( "errors" "testing" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -41,7 +42,11 @@ type mockClientBuilder struct { createTemplateError error } -func (m *mockClientBuilder) ChannelNewClient(clientChan chan<- es.Client, logger *zap.Logger, metricsFactory metrics.Factory) error { +func (m *mockClientBuilder) GetConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) { + return nil, nil +} + +func (m *mockClientBuilder) NewClient(logger *zap.Logger, options []elastic.ClientOptionFunc, metricsFactory metrics.Factory) (es.Client, error) { if m.err == nil { c := &mocks.Client{} tService := &mocks.TemplateCreateService{} @@ -49,9 +54,9 @@ func (m *mockClientBuilder) ChannelNewClient(clientChan chan<- es.Client, logger tService.On("Do", context.Background()).Return(nil, m.createTemplateError) c.On("CreateTemplate", mock.Anything).Return(tService) c.On("GetVersion").Return(uint(6)) - clientChan <- c + return c, nil } - return m.err + return nil, m.err } func TestElasticsearchFactory(t *testing.T) { @@ -63,11 +68,11 @@ func TestElasticsearchFactory(t *testing.T) { // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. f.primaryConfig = &mockClientBuilder{err: errors.New("made-up error")} - assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to channel primary Elasticsearch client: made-up error") + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error") f.primaryConfig = &mockClientBuilder{} f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration: escfg.Configuration{Enabled: true}} - assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to channel archive Elasticsearch client: made-up error2") + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2") f.archiveConfig = &mockClientBuilder{} assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -96,8 +101,6 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { f.primaryConfig = mockConf f.archiveConfig = mockConf assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - assert.NotNil(t, <-f.primaryClientChan) - assert.NotNil(t, f.primaryClient) r, err := f.CreateSpanWriter() require.Error(t, err) assert.Nil(t, r) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 0d6ccaa7c29..7396d8e479b 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -92,7 +92,7 @@ var ( // SpanReader can query for and load traces from ElasticSearch type SpanReader struct { - client es.Client + client func() es.Client logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. @@ -113,7 +113,7 @@ type SpanReader struct { // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger MaxSpanAge time.Duration MaxDocCount int @@ -401,7 +401,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } // set traceIDs to empty traceIDs = nil - results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx) + results, err := s.client().MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx) if err != nil { logErrorToSpan(childSpan, err) return nil, err @@ -563,7 +563,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra boolQuery := s.buildFindTraceIDsQuery(traceQuery) jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency) - searchService := s.client.Search(jaegerIndices...). + searchService := s.client().Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. Aggregation(traceIDAggregation, aggregation). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index c581a423d1e..1ab2cd12ed6 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -97,7 +98,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, reader: NewSpanReader(SpanReaderParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), MaxSpanAge: 0, IndexPrefix: "", @@ -116,7 +117,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, reader: NewSpanReader(SpanReaderParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), MaxSpanAge: 0, IndexPrefix: "", @@ -177,56 +178,56 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true, }, indices: []string{spanIndex + "read", serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{ @@ -240,7 +241,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -254,7 +255,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -268,7 +269,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 71ad0c2ee39..d2303196e6c 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -39,14 +39,14 @@ const ( // ServiceOperationStorage stores service to operation pairs. type ServiceOperationStorage struct { - client es.Client + client func() es.Client logger *zap.Logger serviceCache cache.Cache } // NewServiceOperationStorage returns a new ServiceOperationStorage. func NewServiceOperationStorage( - client es.Client, + client func() es.Client, logger *zap.Logger, cacheTTL time.Duration, ) *ServiceOperationStorage { @@ -72,7 +72,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span cacheKey := hashCode(service) if !keyInCache(cacheKey, s.serviceCache) { - s.client.Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() + s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() writeCache(cacheKey, s.serviceCache) } } @@ -80,7 +80,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, maxDocCount int) ([]string, error) { serviceAggregation := getServicesAggregation(maxDocCount) - searchService := s.client.Search(indices...). + searchService := s.client().Search(indices...). Size(0). // set to 0 because we don't want actual documents. IgnoreUnavailable(true). Aggregation(servicesAggregation, serviceAggregation) @@ -110,7 +110,7 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices serviceQuery := elastic.NewTermQuery(serviceName, service) serviceFilter := getOperationsAggregation(maxDocCount) - searchService := s.client.Search(indices...). + searchService := s.client().Search(indices...). Size(0). Query(serviceQuery). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 589e3464d86..c9c584a6615 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -45,7 +45,7 @@ type serviceWriter func(string, *dbmodel.Span) // SpanWriter is a wrapper around elastic.Client type SpanWriter struct { - client es.Client + client func() es.Client logger *zap.Logger writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn indexCache cache.Cache @@ -56,7 +56,7 @@ type SpanWriter struct { // SpanWriterParams holds constructor parameters for NewSpanWriter type SpanWriterParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger MetricsFactory metrics.Factory IndexPrefix string @@ -107,11 +107,11 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") { indexPrefix += "-" } - _, err := s.client.CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) + _, err := s.client().CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) if err != nil { return err } - _, err = s.client.CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) + _, err = s.client().CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) if err != nil { return err } @@ -159,7 +159,7 @@ func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { // Close closes SpanWriter func (s *SpanWriter) Close() error { - return s.client.Close() + return s.client().Close() } func keyInCache(key string, c cache.Cache) bool { @@ -175,5 +175,5 @@ func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span) { } func (s *SpanWriter) writeSpan(indexName string, jsonSpan *dbmodel.Span) { - s.client.Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add() + s.client().Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add() } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 69f1dea89fe..a270704454d 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -50,7 +51,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), + writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), } fn(w) } @@ -72,49 +73,49 @@ func TestSpanWriterIndices(t *testing.T) { }{ { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, indices: []string{spanIndex + "write", serviceIndex + "write"}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}, @@ -364,7 +365,7 @@ func TestNewSpanTags(t *testing.T) { }{ { writer: NewSpanWriter(SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, AllTagsAsFields: true, }), expected: dbmodel.Span{ @@ -375,7 +376,7 @@ func TestNewSpanTags(t *testing.T) { }, { writer: NewSpanWriter(SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, TagKeysAsFields: []string{"foo", "bar", "rere"}, }), expected: dbmodel.Span{ @@ -385,7 +386,7 @@ func TestNewSpanTags(t *testing.T) { name: "definedTagNames", }, { - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory}), + writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory}), expected: dbmodel.Span{ Tags: []dbmodel.KeyValue{{ Key: "foo", @@ -444,7 +445,7 @@ func TestSpanWriterParamsTTL(t *testing.T) { t.Run(test.name, func(t *testing.T) { client := &mocks.Client{} params := SpanWriterParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, ServiceCacheTTL: test.serviceTTL, diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 9d2ec031b1f..6e1684d77be 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -129,7 +129,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro } w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ - Client: client, + Client: func() estemplate.Client { return client }, Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, @@ -143,7 +143,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro } s.SpanWriter = w s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: client, + Client: func() estemplate.Client { return client }, Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, @@ -153,7 +153,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxDocCount: defaultMaxDocCount, }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ - Client: client, + Client: func() estemplate.Client { return client }, Logger: s.logger, IndexPrefix: indexPrefix, IndexDateLayout: indexDateLayout,