From 7fb01389fadc239545f35ded4c67e9d80a00fddf Mon Sep 17 00:00:00 2001 From: bryan Date: Sun, 21 Nov 2021 20:56:36 -0800 Subject: [PATCH] keep trucking on ES availability errors; more tests to come --- internal/pkg/coordinator/monitor.go | 20 +++++++++++++ internal/pkg/coordinator/monitor_test.go | 37 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 internal/pkg/coordinator/monitor_test.go diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 66c934154..55250ae12 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -9,6 +9,7 @@ import ( "errors" "net" "os" + "regexp" "runtime" "sync" "time" @@ -188,10 +189,29 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { return nil } +func isAvailabilityError(e error) bool { + if e == nil { + return false + } + match, err := regexp.MatchString("connect: connection refused$|net/http: timeout awaiting response headers$", e.Error()) + if err != nil { + log.Warn().Err(err).Msg("Error occured while parsing error message") + return false + } + return match +} + // ensureLeadership ensures leadership is held or needs to be taken over. func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) + + // If we are doing a read on an unavailable ES instance ignore it and we will check back later. + if isAvailabilityError(err) { + log.Info().Err(err).Msgf("Encountered availability error while attempting ES read; continuing to retry.") + return nil + } + if err != nil { return err } diff --git a/internal/pkg/coordinator/monitor_test.go b/internal/pkg/coordinator/monitor_test.go new file mode 100644 index 000000000..8c8fa6ba8 --- /dev/null +++ b/internal/pkg/coordinator/monitor_test.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package coordinator + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsAvailabilityErrorNil(t *testing.T) { + matched := isAvailabilityError(nil) + assert.Equal(t, matched, false) +} + +func TestIsAvailabilityErrorTimeout(t *testing.T) { + matched := isAvailabilityError(errors.New("net/http: timeout awaiting response headers")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorConnectRefused(t *testing.T) { + matched := isAvailabilityError(errors.New("dial tcp 127.0.0.1:9200: connect: connection refused")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorConnectRefusedRemote(t *testing.T) { + matched := isAvailabilityError(errors.New("dial tcp 65.234.123:9200: connect: connection refused")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorUnhandledError(t *testing.T) { + matched := isAvailabilityError(errors.New("novel error")) + assert.Equal(t, matched, false) +}