Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix elasticsearch create index race condition error #641

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/converter/json"
Expand Down Expand Up @@ -139,13 +140,15 @@ func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *jMo
start := time.Now()
exists, _ := s.client.IndexExists(indexName).Do(s.ctx) // don't need to check the error because the exists variable will be false anyway if there is an error
if !exists {
// if there are multiple collectors writing to the same elasticsearch host, if the collectors pass
// the exists check above and try to create the same index all at once, this might fail and
// drop a couple spans (~1 per collector). Creating indices ahead of time alleviates this issue.
// if there are multiple collectors writing to the same elasticsearch host a race condition can occur - create the index multiple times
// we check for the error type to minimize errors
_, err := s.client.CreateIndex(indexName).Body(s.fixMapping(mapping)).Do(s.ctx)
s.writerMetrics.indexCreate.Emit(err, time.Since(start))
if err != nil {
return s.logError(jsonSpan, err, "Failed to create index", s.logger)
eErr, ok := err.(*elastic.Error)
if !ok || eErr.Details != nil && eErr.Details.Type != "index_already_exists_exception" {
return s.logError(jsonSpan, err, "Failed to create index", s.logger)
}
}
}
writeCache(indexName, s.indexCache)
Expand Down
8 changes: 6 additions & 2 deletions plugin/storage/es/spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,13 @@ func TestCheckAndCreateIndex(t *testing.T) {
`"error":"index creation error"`,
},
},
{
indexExists: false,
createError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
indexExistsError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
},
}
for _, tc := range testCases {
testCase := tc
for _, testCase := range testCases {
withSpanWriter(func(w *spanWriterTest) {
existsService := &mocks.IndicesExistsService{}
existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError)
Expand Down