From c1a7b8d3bc9a7cb4151a5a29ca015a6bf0ed23a8 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 20 Jan 2023 10:13:16 -0800 Subject: [PATCH 1/2] Modify the ready/health check to accept leaving ingesters if they are in the same zone. Signed-off-by: Callum Styan --- ring/lifecycler.go | 9 ++++ ring/model.go | 39 ++++++++++++++++- ring/model_test.go | 103 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index dfef6afb6..5f865c9b5 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -237,6 +237,15 @@ func (i *Lifecycler) checkRingHealthForReadiness(ctx context.Context) error { } if i.cfg.ReadinessCheckRingHealth { + if i.Zone != "" { + if err := ringDesc.IsReadyZoneAware(time.Now(), i.cfg.RingConfig.HeartbeatTimeout, i.Zone); err != nil { + level.Warn(i.logger).Log("msg", "found an existing instance(s) not in the expected zone with a problem in the ring, "+ + "this instance cannot become ready until this problem is resolved. "+ + "The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.", + "ring", i.RingName, "err", err, "expectedZone", i.Zone) + return err + } + } if err := ringDesc.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil { level.Warn(i.logger).Log("msg", "found an existing instance(s) with a problem in the ring, "+ "this instance cannot become ready until this problem is resolved. "+ diff --git a/ring/model.go b/ring/model.go index fd58e534c..b15fac6a5 100644 --- a/ring/model.go +++ b/ring/model.go @@ -2,6 +2,7 @@ package ring import ( "container/heap" + "errors" "fmt" "sort" "sync" @@ -115,6 +116,24 @@ func (d *Desc) IsReady(now time.Time, heartbeatTimeout time.Duration) error { return nil } +func (d *Desc) IsReadyZoneAware(now time.Time, heartbeatTimeout time.Duration, zone string) error { + numTokens := 0 + for _, instance := range d.Ingesters { + if err := instance.IsReady(now, heartbeatTimeout); err != nil { + if errors.As(err, &StateError{}) && zone == instance.Zone { + continue + } + return err + } + numTokens += len(instance.Tokens) + } + + if numTokens == 0 { + return fmt.Errorf("no tokens in ring") + } + return nil +} + // TokensFor return all ring tokens and tokens for the input provided ID. // Returned tokens are guaranteed to be sorted. func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens) { @@ -151,14 +170,30 @@ func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now ti // IsReady returns no error if the instance is ACTIVE and healthy. func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) error { if !i.IsHeartbeatHealthy(heartbeatTimeout, now) { - return fmt.Errorf("instance %s past heartbeat timeout", i.Addr) + return HeartbeatError{err: fmt.Errorf("instance %s past heartbeat timeout", i.Addr)} } if i.State != ACTIVE { - return fmt.Errorf("instance %s in state %v", i.Addr, i.State) + return StateError{err: fmt.Errorf("instance %s in state %v", i.Addr, i.State)} } return nil } +type HeartbeatError struct { + err error +} + +func (he HeartbeatError) Error() string { + return he.err.Error() +} + +type StateError struct { + err error +} + +func (se StateError) Error() string { + return se.err.Error() +} + // Merge merges other ring into this one. Returns sub-ring that represents the change, // and can be sent out to other clients. // diff --git a/ring/model_test.go b/ring/model_test.go index ae39b1ba0..0c2d183e7 100644 --- a/ring/model_test.go +++ b/ring/model_test.go @@ -171,6 +171,109 @@ func TestDesc_Ready(t *testing.T) { } } +func TestDesc_ReadyZone(t *testing.T) { + now := time.Now() + + r := &Desc{ + Ingesters: map[string]InstanceDesc{ + "ing1": { + Tokens: []uint32{100, 200, 300}, + State: ACTIVE, + Timestamp: now.Unix(), + Zone: "a", + }, + }, + } + + expectedZone := "a" + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } + + if err := r.IsReadyZoneAware(now, 0, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } + + if err := r.IsReadyZoneAware(now.Add(5*time.Minute), 10*time.Second, expectedZone); err == nil { + t.Fatal("expected !ready (no heartbeat from active ingester), but got no error") + } + + if err := r.IsReadyZoneAware(now.Add(5*time.Minute), 0, expectedZone); err != nil { + t.Fatal("expected ready (no heartbeat but timeout disabled), got", err) + } + + r = &Desc{ + Ingesters: map[string]InstanceDesc{ + "ing1": { + State: ACTIVE, + Timestamp: now.Unix(), + Zone: "a", + }, + }, + } + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { + t.Fatal("expected !ready (no tokens), but got no error") + } + + r.Ingesters["some ingester"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "b", + } + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } + + // An inactive ingester in the same zone is still valid + r.Ingesters["some inactive ingester"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "a", + State: LEAVING, + } + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } + + // An inactive ingester in a different zone is invalid + r.Ingesters["some inactive ingester different zone"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "b", + State: LEAVING, + } + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { + t.Fatal("expected !ready (inactive ingester in different zone), but got no error") + } + + r = &Desc{ + Ingesters: map[string]InstanceDesc{ + "ing1": { + State: ACTIVE, + Timestamp: now.Unix(), + Zone: "a", + }, + }, + } + + // An inactive ingester in a same zone due to heartbeat is invalid + r.Ingesters["some inactive ingester heartbeat"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix() - 11*int64(time.Second), + Zone: "a", + State: ACTIVE, + } + + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { + t.Fatal("expected !ready (heartbeat error), but got no error", err) + } +} + func TestDesc_getTokensByZone(t *testing.T) { tests := map[string]struct { desc *Desc From 202f5d8236cf5bdce3b5ce56273fd258091621c9 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 20 Jan 2023 16:36:35 -0800 Subject: [PATCH 2/2] define error via errors.New instead of defining a type, and clean up tests a bit Signed-off-by: Callum Styan --- ring/model.go | 27 +++------- ring/model_test.go | 132 ++++++++++++++++++++++----------------------- 2 files changed, 71 insertions(+), 88 deletions(-) diff --git a/ring/model.go b/ring/model.go index b15fac6a5..4a634e5d0 100644 --- a/ring/model.go +++ b/ring/model.go @@ -14,6 +14,11 @@ import ( "github.com/grafana/dskit/kv/memberlist" ) +var ( + ErrHeartbeat = errors.New("heartbeat error") + ErrState = errors.New("state error") +) + // ByAddr is a sortable list of InstanceDesc. type ByAddr []InstanceDesc @@ -120,7 +125,7 @@ func (d *Desc) IsReadyZoneAware(now time.Time, heartbeatTimeout time.Duration, z numTokens := 0 for _, instance := range d.Ingesters { if err := instance.IsReady(now, heartbeatTimeout); err != nil { - if errors.As(err, &StateError{}) && zone == instance.Zone { + if errors.Is(err, ErrState) && zone == instance.Zone { continue } return err @@ -170,30 +175,14 @@ func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now ti // IsReady returns no error if the instance is ACTIVE and healthy. func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) error { if !i.IsHeartbeatHealthy(heartbeatTimeout, now) { - return HeartbeatError{err: fmt.Errorf("instance %s past heartbeat timeout", i.Addr)} + return fmt.Errorf("%w: instance %s past heartbeat timeout", ErrHeartbeat, i.Addr) } if i.State != ACTIVE { - return StateError{err: fmt.Errorf("instance %s in state %v", i.Addr, i.State)} + return fmt.Errorf("%w: instance %s in state %v", ErrState, i.Addr, i.State) } return nil } -type HeartbeatError struct { - err error -} - -func (he HeartbeatError) Error() string { - return he.err.Error() -} - -type StateError struct { - err error -} - -func (se StateError) Error() string { - return se.err.Error() -} - // Merge merges other ring into this one. Returns sub-ring that represents the change, // and can be sent out to other clients. // diff --git a/ring/model_test.go b/ring/model_test.go index 0c2d183e7..8c08a9c6e 100644 --- a/ring/model_test.go +++ b/ring/model_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) { @@ -186,92 +187,85 @@ func TestDesc_ReadyZone(t *testing.T) { } expectedZone := "a" + t.Run("default cases", func(t *testing.T) { + if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { - t.Fatal("expected ready, got", err) - } + if err := r.IsReadyZoneAware(now, 0, expectedZone); err != nil { + t.Fatal("expected ready, got", err) + } - if err := r.IsReadyZoneAware(now, 0, expectedZone); err != nil { - t.Fatal("expected ready, got", err) - } + require.ErrorIs(t, r.IsReadyZoneAware(now.Add(5*time.Minute), 10*time.Second, expectedZone), ErrHeartbeat) - if err := r.IsReadyZoneAware(now.Add(5*time.Minute), 10*time.Second, expectedZone); err == nil { - t.Fatal("expected !ready (no heartbeat from active ingester), but got no error") - } + require.NoError(t, r.IsReadyZoneAware(now.Add(5*time.Minute), 0, expectedZone), "expected ready (no heartbeat but timeout disabled)") - if err := r.IsReadyZoneAware(now.Add(5*time.Minute), 0, expectedZone); err != nil { - t.Fatal("expected ready (no heartbeat but timeout disabled), got", err) - } - - r = &Desc{ - Ingesters: map[string]InstanceDesc{ - "ing1": { - State: ACTIVE, - Timestamp: now.Unix(), - Zone: "a", + r = &Desc{ + Ingesters: map[string]InstanceDesc{ + "ing1": { + State: ACTIVE, + Timestamp: now.Unix(), + Zone: "a", + }, }, - }, - } - - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { - t.Fatal("expected !ready (no tokens), but got no error") - } + } - r.Ingesters["some ingester"] = InstanceDesc{ - Tokens: []uint32{12345}, - Timestamp: now.Unix(), - Zone: "b", - } + require.Error(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected !ready (no tokens), but got no error") - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { - t.Fatal("expected ready, got", err) - } + r.Ingesters["some ingester"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "b", + } + require.NoError(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected ready") + }) // An inactive ingester in the same zone is still valid - r.Ingesters["some inactive ingester"] = InstanceDesc{ - Tokens: []uint32{12345}, - Timestamp: now.Unix(), - Zone: "a", - State: LEAVING, - } + t.Run("inactive ingester in same zone", func(t *testing.T) { + r.Ingesters["some inactive ingester"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "a", + State: LEAVING, + } - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err != nil { - t.Fatal("expected ready, got", err) - } + require.NoError(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected ready") + }) // An inactive ingester in a different zone is invalid - r.Ingesters["some inactive ingester different zone"] = InstanceDesc{ - Tokens: []uint32{12345}, - Timestamp: now.Unix(), - Zone: "b", - State: LEAVING, - } + t.Run("inactive ingester in different zone", func(t *testing.T) { + r.Ingesters["some inactive ingester different zone"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix(), + Zone: "b", + State: LEAVING, + } - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { - t.Fatal("expected !ready (inactive ingester in different zone), but got no error") - } + require.Error(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), "expected !ready (inactive ingester in different zone), but got no error") + }) - r = &Desc{ - Ingesters: map[string]InstanceDesc{ - "ing1": { - State: ACTIVE, - Timestamp: now.Unix(), - Zone: "a", + // An inactive ingester in a same zone due to heartbeat is invalid + t.Run("inactive ingester in same zone, heartbeat fails", func(t *testing.T) { + // for this test we need to reset the ring description + r = &Desc{ + Ingesters: map[string]InstanceDesc{ + "ing1": { + State: ACTIVE, + Timestamp: now.Unix(), + Zone: "a", + }, }, - }, - } + } - // An inactive ingester in a same zone due to heartbeat is invalid - r.Ingesters["some inactive ingester heartbeat"] = InstanceDesc{ - Tokens: []uint32{12345}, - Timestamp: now.Unix() - 11*int64(time.Second), - Zone: "a", - State: ACTIVE, - } + r.Ingesters["some inactive ingester heartbeat"] = InstanceDesc{ + Tokens: []uint32{12345}, + Timestamp: now.Unix() - 11*int64(time.Second), + Zone: "a", + State: ACTIVE, + } - if err := r.IsReadyZoneAware(now, 10*time.Second, expectedZone); err == nil { - t.Fatal("expected !ready (heartbeat error), but got no error", err) - } + require.ErrorIs(t, r.IsReadyZoneAware(now, 10*time.Second, expectedZone), ErrHeartbeat) + }) } func TestDesc_getTokensByZone(t *testing.T) {