Skip to content

Commit

Permalink
keep trucking on ES availability errors; more tests to come
Browse files Browse the repository at this point in the history
  • Loading branch information
lykkin committed Nov 22, 2021
1 parent bc91269 commit 7fb0138
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
20 changes: 20 additions & 0 deletions internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"net"
"os"
"regexp"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -188,10 +189,29 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error {
return nil
}

func isAvailabilityError(e error) bool {
if e == nil {
return false
}
match, err := regexp.MatchString("connect: connection refused$|net/http: timeout awaiting response headers$", e.Error())
if err != nil {
log.Warn().Err(err).Msg("Error occured while parsing error message")
return false
}
return match
}

// ensureLeadership ensures leadership is held or needs to be taken over.
func (m *monitorT) ensureLeadership(ctx context.Context) error {
m.log.Debug().Msg("ensuring leadership of policies")
err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex))

// If we are doing a read on an unavailable ES instance ignore it and we will check back later.
if isAvailabilityError(err) {
log.Info().Err(err).Msgf("Encountered availability error while attempting ES read; continuing to retry.")
return nil
}

if err != nil {
return err
}
Expand Down
37 changes: 37 additions & 0 deletions internal/pkg/coordinator/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package coordinator

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsAvailabilityErrorNil(t *testing.T) {
matched := isAvailabilityError(nil)
assert.Equal(t, matched, false)
}

func TestIsAvailabilityErrorTimeout(t *testing.T) {
matched := isAvailabilityError(errors.New("net/http: timeout awaiting response headers"))
assert.Equal(t, matched, true)
}

func TestIsAvailabilityErrorConnectRefused(t *testing.T) {
matched := isAvailabilityError(errors.New("dial tcp 127.0.0.1:9200: connect: connection refused"))
assert.Equal(t, matched, true)
}

func TestIsAvailabilityErrorConnectRefusedRemote(t *testing.T) {
matched := isAvailabilityError(errors.New("dial tcp 65.234.123:9200: connect: connection refused"))
assert.Equal(t, matched, true)
}

func TestIsAvailabilityErrorUnhandledError(t *testing.T) {
matched := isAvailabilityError(errors.New("novel error"))
assert.Equal(t, matched, false)
}

0 comments on commit 7fb0138

Please sign in to comment.