diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b22cb1400fc..5e475b25d04 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -433,7 +433,14 @@ func (i *Ingester) startingForFlusher(ctx context.Context) error { return nil } -func (i *Ingester) starting(ctx context.Context) error { +func (i *Ingester) starting(ctx context.Context) (err error) { + defer func() { + if err != nil { + // if starting() fails for any reason (e.g., context canceled), + // the lifecycler must be stopped. + _ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + } + }() if err := i.openExistingTSDB(ctx); err != nil { // Try to rollback and close opened TSDBs before halting the ingester. i.closeAllTSDB() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b1085f60608..3d9ba47d643 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -32,6 +32,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/status" + "github.com/grafana/dskit/kv" dslog "github.com/grafana/dskit/log" dskit_metrics "github.com/grafana/dskit/metrics" "github.com/grafana/dskit/middleware" @@ -9778,3 +9779,167 @@ func buildSeriesSet(t *testing.T, series *Series) []labels.Labels { } return labelSets } + +func TestIngester_Starting(t *testing.T) { + tests := map[string]struct { + failingCause error + isIngesterInTheRing bool + expectedLifecyclerStateAfterStarting services.State + expectedRingStateAfterStarting ring.InstanceState + }{ + "if starting() of an ingester which is not in the ring runs into context.Canceled, its lifecycler terminates, and it is not in the ring": { + failingCause: context.Canceled, + isIngesterInTheRing: false, + expectedLifecyclerStateAfterStarting: services.Terminated, + }, + "if starting() of an ingester which is not in the ring runs into an error, its lifecycler terminates, and it is not in the ring": { + failingCause: errors.New("this is an error"), + isIngesterInTheRing: false, + expectedLifecyclerStateAfterStarting: services.Terminated, + }, + "if starting() of an ingester with ring state LEAVING runs into context.Canceled, its lifecycler terminates, and its ring state is LEAVING": { + failingCause: context.Canceled, + expectedLifecyclerStateAfterStarting: services.Terminated, + expectedRingStateAfterStarting: ring.LEAVING, + }, + "if starting() of an ingester with ring state LEAVING runs into an error, its lifecycler terminates, and its ring state is LEAVING": { + failingCause: errors.New("this is an error"), + expectedLifecyclerStateAfterStarting: services.Terminated, + expectedRingStateAfterStarting: ring.LEAVING, + }, + } + + for testName, testCase := range tests { + cfg := defaultIngesterTestConfig(t) + t.Run(testName, func(t *testing.T) { + var checkInitalRingState func(context.Context, *failingIngester) + var checkFinalRingState func(context.Context, *failingIngester) + if testCase.isIngesterInTheRing { + cfg.IngesterRing.UnregisterOnShutdown = false + // Ensure that ingester fI is already in the ring, and its state is LEAVING. + checkInitalRingState = func(ctx context.Context, fI *failingIngester) { fI.checkRingState(ctx, t, ring.LEAVING) } + // Ensure that ingester fI is in the expected state after failing starting(). + checkFinalRingState = func(ctx context.Context, fI *failingIngester) { + fI.checkRingState(ctx, t, testCase.expectedRingStateAfterStarting) + } + } else { + // Ensure that ingester fI is not in the ring either before or after failing starting(). + checkInitalRingState = func(ctx context.Context, fI *failingIngester) { require.Nil(t, fI.getInstance(ctx)) } + checkFinalRingState = func(ctx context.Context, fI *failingIngester) { require.Nil(t, fI.getInstance(ctx)) } + } + fI := setupFailingIngester(t, cfg, testCase.failingCause) + ctx := context.Background() + checkInitalRingState(ctx, fI) + + fI.startWaitAndCheck(ctx, t) + require.Equal(t, testCase.expectedLifecyclerStateAfterStarting, fI.lifecycler.State()) + checkFinalRingState(ctx, fI) + }) + } +} + +func setupFailingIngester(t *testing.T, cfg Config, failingCause error) *failingIngester { + // Start the first ingester. This ensures the ring will be created. + fI := newFailingIngester(t, cfg, nil, nil) + kvStore := fI.kvStore + ctx := context.Background() + fI.startWaitAndCheck(ctx, t) + fI.shutDownWaitAndCheck(ctx, t) + + // Start the same ingester with an error. + return newFailingIngester(t, cfg, kvStore, failingCause) +} + +type failingIngester struct { + *Ingester + kvStore kv.Client + failingCause error +} + +func newFailingIngester(t *testing.T, cfg Config, kvStore kv.Client, failingCause error) *failingIngester { + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + fI := &failingIngester{Ingester: i, failingCause: failingCause} + if kvStore != nil { + fI.kvStore = kvStore + } + fI.BasicService = services.NewBasicService(fI.starting, fI.updateLoop, fI.stopping) + return fI +} + +func (i *failingIngester) startWaitAndCheck(ctx context.Context, t *testing.T) { + err := services.StartAndAwaitRunning(ctx, i) + var expectedHealthyIngesters int + if i.failingCause == nil { + require.NoError(t, err) + expectedHealthyIngesters = 1 + } else { + require.Error(t, err) + require.ErrorIs(t, err, i.failingCause) + expectedHealthyIngesters = 0 + } + test.Poll(t, 100*time.Millisecond, expectedHealthyIngesters, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) +} + +func (i *failingIngester) shutDownWaitAndCheck(ctx context.Context, t *testing.T) { + // We properly shut down ingester, and ensure that it lifecycler is terminated. + require.NoError(t, services.StopAndAwaitTerminated(ctx, i)) + require.Equal(t, services.Terminated, i.lifecycler.BasicService.State()) +} + +func (i *failingIngester) starting(parentCtx context.Context) error { + if i.failingCause == nil { + return i.Ingester.starting(parentCtx) + } + if errors.Is(i.failingCause, context.Canceled) { + ctx, cancel := context.WithCancel(parentCtx) + go func() { + cancel() + }() + return i.Ingester.starting(ctx) + } + return i.Ingester.starting(&mockContext{ctx: parentCtx, err: i.failingCause}) +} + +func (i *failingIngester) getInstance(ctx context.Context) *ring.InstanceDesc { + d, err := i.lifecycler.KVStore.Get(ctx, IngesterRingKey) + if err != nil { + return nil + } + instanceDesc, ok := ring.GetOrCreateRingDesc(d).Ingesters[i.lifecycler.ID] + if !ok { + return nil + } + return &instanceDesc +} + +func (i *failingIngester) checkRingState(ctx context.Context, t *testing.T, expectedState ring.InstanceState) { + instance := i.getInstance(ctx) + require.NotNil(t, instance) + require.Equal(t, expectedState, instance.GetState()) +} + +type mockContext struct { + ctx context.Context + err error +} + +func (c *mockContext) Deadline() (time.Time, bool) { + return c.ctx.Deadline() +} + +func (c *mockContext) Done() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (c *mockContext) Err() error { + return c.err +} + +func (c *mockContext) Value(key any) interface{} { + return c.ctx.Value(key) +}