Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] add elasticsearch & opensearch e2e integration test #5345

Merged
merged 21 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ extensions:
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-main"
use_aliases: true
use_aliases: false
create_mappings: true
es_archive:
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-archive"
use_aliases: true
use_aliases: false
create_mappings: true

receivers:
otlp:
protocols:
Expand Down
48 changes: 48 additions & 0 deletions cmd/jaeger/internal/integration/es_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2024 The Jaeger Authors.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

const (
host = "0.0.0.0"
queryPort = "9200"
queryHostPort = host + ":" + queryPort
queryURL = "http://" + queryHostPort
)

type ESStorageIntegration struct {
E2EStorageIntegration
esClient *integration.EsClient
}

func (s *ESStorageIntegration) initializeES(t *testing.T) {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
s.esClient = integration.StartEsClient(t, queryURL)
s.CleanUp = func(t *testing.T) {
s.esClient.DeleteAllIndixes(t)
}
s.esClient.DeleteAllIndixes(t)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// TODO: remove this flag after ES supports returning spanKind
// Issue https://github.com/jaegertracing/jaeger/issues/1923
s.GetOperationsMissingSpanKind = true
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func TestESStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "elasticsearch", "opensearch")

s := &ESStorageIntegration{}
s.initializeES(t)
s.Fixtures = integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json")
s.ConfigFile = "cmd/jaeger/config-elasticsearch.yaml"
s.SkipBinaryAttrs = true
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
s.e2eInitialize(t)
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
4 changes: 2 additions & 2 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func NewFactoryWithConfig(
return nil, err
}

cfg.MaxDocCount = defaultMaxDocCount
cfg.Enabled = true
defaultConfig := getDefaultConfig()
cfg.ApplyDefaults(&defaultConfig)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

archive := make(map[string]*namespaceConfig)
archive[archiveNamespace] = &namespaceConfig{
Expand Down
60 changes: 33 additions & 27 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,33 +98,7 @@ type namespaceConfig struct {
// NewOptions creates a new Options struct.
func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
// TODO all default values should be defined via cobra flags
defaultConfig := config.Configuration{
Username: "",
Password: "",
Sniffer: false,
MaxSpanAge: 72 * time.Hour,
AdaptiveSamplingLookback: 72 * time.Hour,
NumShards: 5,
NumReplicas: 1,
PrioritySpanTemplate: 0,
PriorityServiceTemplate: 0,
PriorityDependenciesTemplate: 0,
BulkSize: 5 * 1000 * 1000,
BulkWorkers: 1,
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
Tags: config.TagsAsFields{
DotReplacement: "@",
},
Enabled: true,
CreateIndexTemplates: true,
Version: 0,
Servers: []string{defaultServerURL},
RemoteReadClusters: []string{},
MaxDocCount: defaultMaxDocCount,
LogLevel: "error",
SendGetBodyAs: defaultSendGetBodyAs,
}
defaultConfig := getDefaultConfig()
options := &Options{
Primary: namespaceConfig{
Configuration: defaultConfig,
Expand Down Expand Up @@ -426,3 +400,35 @@ func initDateLayout(rolloverFreq, sep string) string {
}
return indexLayout
}

func getDefaultConfig() config.Configuration {
return config.Configuration{
Username: "",
Password: "",
Sniffer: false,
MaxSpanAge: 72 * time.Hour,
AdaptiveSamplingLookback: 72 * time.Hour,
NumShards: 5,
NumReplicas: 1,
PrioritySpanTemplate: 0,
PriorityServiceTemplate: 0,
PriorityDependenciesTemplate: 0,
BulkSize: 5 * 1000 * 1000,
BulkWorkers: 1,
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
Tags: config.TagsAsFields{
DotReplacement: "@",
},
Enabled: true,
CreateIndexTemplates: true,
Version: 0,
UseReadWriteAliases: false,
UseILM: false,
Servers: []string{defaultServerURL},
RemoteReadClusters: []string{},
MaxDocCount: defaultMaxDocCount,
LogLevel: "error",
SendGetBodyAs: defaultSendGetBodyAs,
}
}
40 changes: 40 additions & 0 deletions plugin/storage/integration/elasticsearch_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"context"
"testing"

elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"github.com/stretchr/testify/require"
)

type EsClient struct {
client *elastic.Client
v8Client *elasticsearch8.Client
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
}

func StartEsClient(t *testing.T, queryURL string) *EsClient {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
require.NoError(t, err)

rawV8Client, err := elasticsearch8.NewClient(elasticsearch8.Config{
Addresses: []string{queryURL},
DiscoverNodesOnStart: false,
})
require.NoError(t, err)
return &EsClient{
client: rawClient,
v8Client: rawV8Client,
}
}

func (s *EsClient) DeleteAllIndixes(t *testing.T) {
_, err := s.client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
}
40 changes: 13 additions & 27 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
"testing"
"time"

elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / unit-tests

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / cassandra 3.x

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / opensearch 1.x

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / kafka

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint

"github.com/olivere/elastic" imported and not used (typecheck)

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / grpc (v1)

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / elasticsearch 5.x

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / badger (v1)

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / opensearch 2.x

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / elasticsearch 6.x

"github.com/olivere/elastic" imported and not used

Check failure on line 28 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / elasticsearch 8.x

"github.com/olivere/elastic" imported and not used
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -58,14 +57,12 @@

type ESStorageIntegration struct {
StorageIntegration

client *elastic.Client
v8Client *elasticsearch8.Client
logger *zap.Logger
logger *zap.Logger
client *EsClient
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
pingResult, _, err := s.client.Ping(queryURL).Do(context.Background())
pingResult, _, err := s.client.client.Ping(queryURL).Do(context.Background())
if err != nil {
return 0, err
}
Expand All @@ -83,19 +80,9 @@
}

func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
require.NoError(t, err)
s.client = StartEsClient(t, queryURL)
s.logger, _ = testutils.NewLogger()

s.client = rawClient
s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{
Addresses: []string{queryURL},
DiscoverNodesOnStart: false,
})
require.NoError(t, err)

s.initSpanstore(t, allTagsAsFields)

s.CleanUp = func(t *testing.T) {
Expand All @@ -109,8 +96,7 @@
}

func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
_, err := s.client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
s.client.DeleteAllIndixes(t)
s.initSpanstore(t, allTagsAsFields)
}

Expand Down Expand Up @@ -204,17 +190,17 @@
require.NoError(t, err)
// TODO abstract this into pkg/es/client.IndexManagementLifecycleAPI
if esVersion <= 7 {
serviceTemplateExists, err := s.client.IndexTemplateExists(indexPrefix + "-jaeger-service").Do(context.Background())
serviceTemplateExists, err := s.client.client.IndexTemplateExists(indexPrefix + "-jaeger-service").Do(context.Background())
require.NoError(t, err)
assert.True(t, serviceTemplateExists)
spanTemplateExists, err := s.client.IndexTemplateExists(indexPrefix + "-jaeger-span").Do(context.Background())
spanTemplateExists, err := s.client.client.IndexTemplateExists(indexPrefix + "-jaeger-span").Do(context.Background())
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
assert.True(t, spanTemplateExists)
} else {
serviceTemplateExistsResponse, err := s.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-service")
serviceTemplateExistsResponse, err := s.client.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-service")
require.NoError(t, err)
assert.Equal(t, 200, serviceTemplateExistsResponse.StatusCode)
spanTemplateExistsResponse, err := s.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-span")
spanTemplateExistsResponse, err := s.client.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-span")
require.NoError(t, err)
assert.Equal(t, 200, spanTemplateExistsResponse.StatusCode)
}
Expand All @@ -229,14 +215,14 @@
if prefix != "" {
prefixWithSeparator += "-"
}
_, err := s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + spanTemplateName)
_, err := s.client.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + spanTemplateName)
require.NoError(t, err)
_, err = s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + serviceTemplateName)
_, err = s.client.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + serviceTemplateName)
require.NoError(t, err)
_, err = s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + dependenciesTemplateName)
_, err = s.client.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + dependenciesTemplateName)
require.NoError(t, err)
} else {
_, err := s.client.IndexDeleteTemplate("*").Do(context.Background())
_, err := s.client.client.IndexDeleteTemplate("*").Do(context.Background())
require.NoError(t, err)
}
return nil
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/integration/es_index_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,10 @@ func createESV8Client() (*elasticsearch8.Client, error) {

func cleanESIndexTemplates(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string) {
s := &ESStorageIntegration{
client: client,
v8Client: v8Client,
client: &EsClient{
client: client,
v8Client: v8Client,
},
}
s.logger, _ = testutils.NewLogger()
s.cleanESIndexTemplates(t, prefix)
Expand Down
1 change: 1 addition & 0 deletions scripts/es-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ main() {

bring_up_storage "${distro}" "${version}"
STORAGE=${distro} make storage-integration-test
STORAGE=${distro} SPAN_STORAGE_TYPE=${distro} make jaeger-v2-storage-integration-test
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
make index-cleaner-integration-test
make index-rollover-integration-test
}
Expand Down
Loading