From 4c680354124d24b9be02b9bcbbf5349618a1648e Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 10 Jan 2018 15:45:27 +0100 Subject: [PATCH] Fix elasticsearch create index race condition error --- plugin/storage/es/spanstore/writer.go | 10 +++++++--- plugin/storage/es/spanstore/writer_test.go | 8 ++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 24aaf666198..feae75ee840 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/converter/json" @@ -140,12 +141,15 @@ func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *jMo exists, _ := s.client.IndexExists(indexName).Do(s.ctx) // don't need to check the error because the exists variable will be false anyway if there is an error if !exists { // if there are multiple collectors writing to the same elasticsearch host, if the collectors pass - // the exists check above and try to create the same index all at once, this might fail and - // drop a couple spans (~1 per collector). Creating indices ahead of time alleviates this issue. + // the exists check above and try to create the same index all at once, this might fail + // we check the error type and return _, err := s.client.CreateIndex(indexName).Body(s.fixMapping(mapping)).Do(s.ctx) s.writerMetrics.indexCreate.Emit(err, time.Since(start)) if err != nil { - return s.logError(jsonSpan, err, "Failed to create index", s.logger) + eErr, ok := err.(*elastic.Error) + if !ok || eErr.Details.Type != "index_already_exists_exception" { + return s.logError(jsonSpan, err, "Failed to create index", s.logger) + } } } writeCache(indexName, s.indexCache) diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 0d29ca7e6fb..ccefa514273 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -258,9 +258,13 @@ func TestCheckAndCreateIndex(t *testing.T) { `"error":"index creation error"`, }, }, + { + indexExists: false, + createError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}}, + indexExistsError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}}, + }, } - for _, tc := range testCases { - testCase := tc + for _, testCase := range testCases { withSpanWriter(func(w *spanWriterTest) { existsService := &mocks.IndicesExistsService{} existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError)