Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configurable date separator for Elasticsearch index names #2637

Merged
merged 9 commits into from
Dec 3, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestLoadConfigAndFlags(t *testing.T) {
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--config-file=./testdata/jaeger-config.yaml"})
err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--es.index-date-separator=-", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
Expand All @@ -74,6 +74,7 @@ func TestLoadConfigAndFlags(t *testing.T) {
assert.Equal(t, []string{"someUrl"}, esCfg.Servers)
assert.Equal(t, true, esCfg.CreateIndexTemplates)
assert.Equal(t, "staging", esCfg.IndexPrefix)
assert.Equal(t, "2006-01-02", esCfg.IndexDateLayout)
assert.Equal(t, int64(100), esCfg.NumShards)
assert.Equal(t, "user", esCfg.Username)
assert.Equal(t, "pass", esCfg.Password)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
indexDateLayout = "2006-01-02"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
Expand Down Expand Up @@ -91,8 +92,9 @@ func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error {

func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
cfg := config.Configuration{
Servers: []string{esURL},
IndexPrefix: indexPrefix,
Servers: []string{esURL},
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Tags: config.TagsAsFields{
AllAsFields: allTagsAsFields,
},
Expand All @@ -118,14 +120,15 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
}
reader := esspanreader.NewEsSpanReader(elasticsearchClient, s.logger, esspanreader.Config{
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
TagDotReplacement: tagKeyDeDotChar,
MaxSpanAge: maxSpanAge,
MaxDocCount: defaultMaxDocCount,
})
s.SpanReader = reader

depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount)
if err := depStore.CreateTemplates(depMapping); err != nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger, archive bo
logger: logger,
nameTag: tag.Insert(storagemetrics.TagExporterName(), name),
client: client,
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, alias, archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, alias, archive),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive),
translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()),
isArchive: archive,
serviceCache: cache.NewLRUWithOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func TestWriteSpans(t *testing.T) {
w := esSpanWriter{
logger: zap.NewNop(),
client: esClient,
spanIndexName: esutil.NewIndexNameProvider("span", "", esutil.AliasNone, false),
serviceIndexName: esutil.NewIndexNameProvider("service", "", esutil.AliasNone, false),
spanIndexName: esutil.NewIndexNameProvider("span", "", "2006-01-02", esutil.AliasNone, false),
serviceIndexName: esutil.NewIndexNameProvider("service", "", "2006-01-02", esutil.AliasNone, false),
serviceCache: cache.NewLRU(1),
nameTag: tag.Insert(storagemetrics.TagExporterName(), "name"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) {
Archive: false,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
Expand All @@ -101,7 +102,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error
if err != nil {
return nil, err
}
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetIndexDateLayout(), cfg.GetMaxDocCount()), nil
}

// CreateArchiveSpanReader creates archive spanstore.Reader
Expand All @@ -115,6 +116,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
Archive: true,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
Expand Down
14 changes: 7 additions & 7 deletions cmd/opentelemetry/app/internal/esutil/index_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package esutil

import "time"

const indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20

// Alias is used to configure the kind of index alias
type Alias string

Expand All @@ -33,11 +31,12 @@ const (
// IndexNameProvider creates standard index names from dates
type IndexNameProvider struct {
index string
dateLayout string
useSingleIndex bool
}

// NewIndexNameProvider constructs a new IndexNameProvider
func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) IndexNameProvider {
func NewIndexNameProvider(index, prefix, layout string, alias Alias, archive bool) IndexNameProvider {
if prefix != "" {
index = prefix + "-" + index
}
Expand All @@ -53,6 +52,7 @@ func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) Index
}
return IndexNameProvider{
index: index,
dateLayout: layout,
useSingleIndex: archive || (alias != AliasNone),
}
}
Expand All @@ -63,12 +63,12 @@ func (n IndexNameProvider) IndexNameRange(start, end time.Time) []string {
return []string{n.index}
}
var indices []string
firstIndex := n.index + start.UTC().Format(indexDateFormat)
currentIndex := n.index + end.UTC().Format(indexDateFormat)
firstIndex := n.index + start.UTC().Format(n.dateLayout)
currentIndex := n.index + end.UTC().Format(n.dateLayout)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
end = end.Add(-24 * time.Hour)
currentIndex = n.index + end.UTC().Format(indexDateFormat)
currentIndex = n.index + end.UTC().Format(n.dateLayout)
}
indices = append(indices, firstIndex)
return indices
Expand All @@ -79,6 +79,6 @@ func (n IndexNameProvider) IndexName(date time.Time) string {
if n.useSingleIndex {
return n.index
}
spanDate := date.UTC().Format(indexDateFormat)
spanDate := date.UTC().Format(n.dateLayout)
return n.index + spanDate
}
20 changes: 10 additions & 10 deletions cmd/opentelemetry/app/internal/esutil/index_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,33 @@ func TestIndexNames(t *testing.T) {
}{
{
name: "index prefix",
nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false),
indices: []string{"production-myindex-0001-01-01"},
},
{
name: "multiple dates",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false),
indices: []string{"myindex-2020-08-30", "myindex-2020-08-29", "myindex-2020-08-28"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use aliases",
nameProvider: NewIndexNameProvider("myindex", "", AliasRead, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, false),
indices: []string{"myindex-read"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true),
indices: []string{"myindex-archive"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive alias",
nameProvider: NewIndexNameProvider("myindex", "", AliasRead, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, true),
indices: []string{"myindex-archive-read"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
Expand All @@ -81,30 +81,30 @@ func TestIndexName(t *testing.T) {
}{
{
name: "index prefix",
nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false),
index: "production-myindex-0001-01-01",
},
{
name: "no prefix",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false),
index: "myindex-2020-08-28",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use aliases",
nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, false),
index: "myindex-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true),
index: "myindex-archive",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive alias",
nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, true),
index: "myindex-archive-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ const (
dependencyIndexBaseName = "jaeger-dependencies"

timestampField = "timestamp"

indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20
)

// DependencyStore defines Elasticsearch dependency store.
type DependencyStore struct {
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
maxDocCount int
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
indexDateLayout string
maxDocCount int
}

var _ dependencystore.Reader = (*DependencyStore)(nil)
var _ dependencystore.Writer = (*DependencyStore)(nil)

// NewDependencyStore creates dependency store.
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore {
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore {
if indexPrefix != "" {
indexPrefix += "-"
}
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
maxDocCount: maxDocCount,
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
indexDateLayout: indexDateLayout,
maxDocCount: maxDocCount,
}
}

Expand All @@ -78,14 +78,14 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
if err != nil {
return err
}
return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, ts), dependencyType)
return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, r.indexDateLayout, ts), dependencyType)
}

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback, r.maxDocCount)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
indices := dailyIndices(r.indexPrefix, r.indexDateLayout, endTs, lookback)
response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -114,18 +114,18 @@ func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esc
}
}

func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateFormat)
func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateLayout)
}

func dailyIndices(prefix string, ts time.Time, lookback time.Duration) []string {
func dailyIndices(prefix, format string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
firstIndex := indexWithDate(prefix, format, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, format, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, ts)
currentIndex = indexWithDate(prefix, format, ts)
}
return append(indices, firstIndex)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const defaultMaxDocCount = 10_000

func TestCreateTemplates(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
template := "template"
err := store.CreateTemplates(template)
require.NoError(t, err)
Expand All @@ -48,7 +48,7 @@ func TestCreateTemplates(t *testing.T) {

func TestWriteDependencies(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}}
tsNow := time.Now()
err := store.WriteDependencies(tsNow, dependencies)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGetDependencies(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Expand All @@ -109,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid character")
Expand All @@ -121,7 +121,7 @@ func TestGetDependencies_err_client(t *testing.T) {
client := &mockClient{
searchErr: searchErr,
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
tsNow := time.Now()
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
Expand Down Expand Up @@ -151,11 +151,12 @@ func TestSearchBody(t *testing.T) {
}

func TestIndexWithDate(t *testing.T) {
assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC)))
assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", "2006-01-02",
time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC)))
}

func TestDailyIndices(t *testing.T) {
indices := dailyIndices("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour)
indices := dailyIndices("foo-", "2006-01-02", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour)
assert.Equal(t, []string{"foo-2020-09-30", "foo-2020-09-29"}, indices)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
Archive bool
UseReadWriteAliases bool
IndexPrefix string
IndexDateLayout string
MaxSpanAge time.Duration
MaxDocCount int
TagDotReplacement string
Expand All @@ -89,8 +90,8 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co
maxSpanAge: config.MaxSpanAge,
maxDocCount: config.MaxDocCount,
converter: dbmodel.NewToDomain(config.TagDotReplacement),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, alias, config.Archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, alias, config.Archive),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive),
}
}

Expand Down
Loading