Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester health check with zone awareness #256

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ func (i *Lifecycler) checkRingHealthForReadiness(ctx context.Context) error {
}

if i.cfg.ReadinessCheckRingHealth {
if i.Zone != "" {
if err := ringDesc.IsReadyZoneAware(time.Now(), i.cfg.RingConfig.HeartbeatTimeout, i.Zone); err != nil {
level.Warn(i.logger).Log("msg", "found an existing instance(s) not in the expected zone with a problem in the ring, "+
"this instance cannot become ready until this problem is resolved. "+
"The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.",
"ring", i.RingName, "err", err, "expectedZone", i.Zone)
return err
}
}
if err := ringDesc.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil {
level.Warn(i.logger).Log("msg", "found an existing instance(s) with a problem in the ring, "+
"this instance cannot become ready until this problem is resolved. "+
Expand Down
28 changes: 26 additions & 2 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"container/heap"
"errors"
"fmt"
"sort"
"sync"
Expand All @@ -13,6 +14,11 @@ import (
"github.com/grafana/dskit/kv/memberlist"
)

var (
ErrHeartbeat = errors.New("heartbeat error")
ErrState = errors.New("state error")
)

// ByAddr is a sortable list of InstanceDesc.
type ByAddr []InstanceDesc

Expand Down Expand Up @@ -115,6 +121,24 @@ func (d *Desc) IsReady(now time.Time, heartbeatTimeout time.Duration) error {
return nil
}

func (d *Desc) IsReadyZoneAware(now time.Time, heartbeatTimeout time.Duration, zone string) error {
numTokens := 0
for _, instance := range d.Ingesters {
if err := instance.IsReady(now, heartbeatTimeout); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to skip iteration on all errors from the same zone. We only need to check states on instances from different zones (this will also skip heartbeat errors in the same zone, which is fine IMO).

if errors.Is(err, ErrState) && zone == instance.Zone {
continue
}
return err
}
numTokens += len(instance.Tokens)
}

if numTokens == 0 {
return fmt.Errorf("no tokens in ring")
}
return nil
}

// TokensFor return all ring tokens and tokens for the input provided ID.
// Returned tokens are guaranteed to be sorted.
func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens) {
Expand Down Expand Up @@ -151,10 +175,10 @@ func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now ti
// IsReady returns no error if the instance is ACTIVE and healthy.
func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) error {
if !i.IsHeartbeatHealthy(heartbeatTimeout, now) {
return fmt.Errorf("instance %s past heartbeat timeout", i.Addr)
return fmt.Errorf("%w: instance %s past heartbeat timeout", ErrHeartbeat, i.Addr)
}
if i.State != ACTIVE {
return fmt.Errorf("instance %s in state %v", i.Addr, i.State)
return fmt.Errorf("%w: instance %s in state %v", ErrState, i.Addr, i.State)
}
return nil
}
Expand Down
97 changes: 97 additions & 0 deletions ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) {
Expand Down Expand Up @@ -171,6 +172,102 @@ func TestDesc_Ready(t *testing.T) {
}
}

func TestDesc_ReadyZone(t *testing.T) {
now := time.Now()

r := &Desc{
cstyan marked this conversation as resolved.
Show resolved Hide resolved
Ingesters: map[string]InstanceDesc{
"ing1": {
Tokens: []uint32{100, 200, 300},
State: ACTIVE,
Timestamp: now.Unix(),
Zone: "a",
},
},
}

expectedZone := "a"
t.Run("default cases", func(t *testing.T) {
if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil {
t.Fatal("expected ready, got", err)
}

if err := r.IsReadyZoneAware(now, 0, expectedZone); err != nil {
t.Fatal("expected ready, got", err)
}

require.ErrorIs(t, r.IsReadyZoneAware(now.Add(5*time.Minute), 10*time.Second, expectedZone), ErrHeartbeat)

require.NoError(t, r.IsReadyZoneAware(now.Add(5*time.Minute), 0, expectedZone), "expected ready (no heartbeat but timeout disabled)")

r = &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
State: ACTIVE,
Timestamp: now.Unix(),
Zone: "a",
},
},
}

require.Error(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected !ready (no tokens), but got no error")

r.Ingesters["some ingester"] = InstanceDesc{
Tokens: []uint32{12345},
Timestamp: now.Unix(),
Zone: "b",
}
require.NoError(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected ready")
})

// An inactive ingester in the same zone is still valid
t.Run("inactive ingester in same zone", func(t *testing.T) {
r.Ingesters["some inactive ingester"] = InstanceDesc{
Tokens: []uint32{12345},
Timestamp: now.Unix(),
Zone: "a",
State: LEAVING,
}

require.NoError(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected ready")
})

// An inactive ingester in a different zone is invalid
t.Run("inactive ingester in different zone", func(t *testing.T) {
r.Ingesters["some inactive ingester different zone"] = InstanceDesc{
Tokens: []uint32{12345},
Timestamp: now.Unix(),
Zone: "b",
State: LEAVING,
}

require.Error(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected !ready (inactive ingester in different zone), but got no error")
})

// An inactive ingester in a same zone due to heartbeat is invalid
t.Run("inactive ingester in same zone, heartbeat fails", func(t *testing.T) {
// for this test we need to reset the ring description
r = &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
State: ACTIVE,
Timestamp: now.Unix(),
Zone: "a",
},
},
}

r.Ingesters["some inactive ingester heartbeat"] = InstanceDesc{
Tokens: []uint32{12345},
Timestamp: now.Unix() - 11*int64(time.Second),
Zone: "a",
State: ACTIVE,
}

require.ErrorIs(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), ErrHeartbeat)
})
}

func TestDesc_getTokensByZone(t *testing.T) {
tests := map[string]struct {
desc *Desc
Expand Down