Skip to content

Commit

Permalink
Ingester remains in the LEAVING state if starting() terminates (#6923)
Browse files Browse the repository at this point in the history
* Ingester remains in the LEAVING state if starting() terminates

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Making lint happy

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Dec 13, 2023
1 parent 7c16572 commit 1c97e32
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,14 @@ func (i *Ingester) startingForFlusher(ctx context.Context) error {
return nil
}

func (i *Ingester) starting(ctx context.Context) error {
func (i *Ingester) starting(ctx context.Context) (err error) {
defer func() {
if err != nil {
// if starting() fails for any reason (e.g., context canceled),
// the lifecycler must be stopped.
_ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
}()
if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()
Expand Down
165 changes: 165 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/kv"
dslog "github.com/grafana/dskit/log"
dskit_metrics "github.com/grafana/dskit/metrics"
"github.com/grafana/dskit/middleware"
Expand Down Expand Up @@ -9778,3 +9779,167 @@ func buildSeriesSet(t *testing.T, series *Series) []labels.Labels {
}
return labelSets
}

func TestIngester_Starting(t *testing.T) {
tests := map[string]struct {
failingCause error
isIngesterInTheRing bool
expectedLifecyclerStateAfterStarting services.State
expectedRingStateAfterStarting ring.InstanceState
}{
"if starting() of an ingester which is not in the ring runs into context.Canceled, its lifecycler terminates, and it is not in the ring": {
failingCause: context.Canceled,
isIngesterInTheRing: false,
expectedLifecyclerStateAfterStarting: services.Terminated,
},
"if starting() of an ingester which is not in the ring runs into an error, its lifecycler terminates, and it is not in the ring": {
failingCause: errors.New("this is an error"),
isIngesterInTheRing: false,
expectedLifecyclerStateAfterStarting: services.Terminated,
},
"if starting() of an ingester with ring state LEAVING runs into context.Canceled, its lifecycler terminates, and its ring state is LEAVING": {
failingCause: context.Canceled,
expectedLifecyclerStateAfterStarting: services.Terminated,
expectedRingStateAfterStarting: ring.LEAVING,
},
"if starting() of an ingester with ring state LEAVING runs into an error, its lifecycler terminates, and its ring state is LEAVING": {
failingCause: errors.New("this is an error"),
expectedLifecyclerStateAfterStarting: services.Terminated,
expectedRingStateAfterStarting: ring.LEAVING,
},
}

for testName, testCase := range tests {
cfg := defaultIngesterTestConfig(t)
t.Run(testName, func(t *testing.T) {
var checkInitalRingState func(context.Context, *failingIngester)
var checkFinalRingState func(context.Context, *failingIngester)
if testCase.isIngesterInTheRing {
cfg.IngesterRing.UnregisterOnShutdown = false
// Ensure that ingester fI is already in the ring, and its state is LEAVING.
checkInitalRingState = func(ctx context.Context, fI *failingIngester) { fI.checkRingState(ctx, t, ring.LEAVING) }
// Ensure that ingester fI is in the expected state after failing starting().
checkFinalRingState = func(ctx context.Context, fI *failingIngester) {
fI.checkRingState(ctx, t, testCase.expectedRingStateAfterStarting)
}
} else {
// Ensure that ingester fI is not in the ring either before or after failing starting().
checkInitalRingState = func(ctx context.Context, fI *failingIngester) { require.Nil(t, fI.getInstance(ctx)) }
checkFinalRingState = func(ctx context.Context, fI *failingIngester) { require.Nil(t, fI.getInstance(ctx)) }
}
fI := setupFailingIngester(t, cfg, testCase.failingCause)
ctx := context.Background()
checkInitalRingState(ctx, fI)

fI.startWaitAndCheck(ctx, t)
require.Equal(t, testCase.expectedLifecyclerStateAfterStarting, fI.lifecycler.State())
checkFinalRingState(ctx, fI)
})
}
}

func setupFailingIngester(t *testing.T, cfg Config, failingCause error) *failingIngester {
// Start the first ingester. This ensures the ring will be created.
fI := newFailingIngester(t, cfg, nil, nil)
kvStore := fI.kvStore
ctx := context.Background()
fI.startWaitAndCheck(ctx, t)
fI.shutDownWaitAndCheck(ctx, t)

// Start the same ingester with an error.
return newFailingIngester(t, cfg, kvStore, failingCause)
}

type failingIngester struct {
*Ingester
kvStore kv.Client
failingCause error
}

func newFailingIngester(t *testing.T, cfg Config, kvStore kv.Client, failingCause error) *failingIngester {
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
fI := &failingIngester{Ingester: i, failingCause: failingCause}
if kvStore != nil {
fI.kvStore = kvStore
}
fI.BasicService = services.NewBasicService(fI.starting, fI.updateLoop, fI.stopping)
return fI
}

func (i *failingIngester) startWaitAndCheck(ctx context.Context, t *testing.T) {
err := services.StartAndAwaitRunning(ctx, i)
var expectedHealthyIngesters int
if i.failingCause == nil {
require.NoError(t, err)
expectedHealthyIngesters = 1
} else {
require.Error(t, err)
require.ErrorIs(t, err, i.failingCause)
expectedHealthyIngesters = 0
}
test.Poll(t, 100*time.Millisecond, expectedHealthyIngesters, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})
}

func (i *failingIngester) shutDownWaitAndCheck(ctx context.Context, t *testing.T) {
// We properly shut down ingester, and ensure that it lifecycler is terminated.
require.NoError(t, services.StopAndAwaitTerminated(ctx, i))
require.Equal(t, services.Terminated, i.lifecycler.BasicService.State())
}

func (i *failingIngester) starting(parentCtx context.Context) error {
if i.failingCause == nil {
return i.Ingester.starting(parentCtx)
}
if errors.Is(i.failingCause, context.Canceled) {
ctx, cancel := context.WithCancel(parentCtx)
go func() {
cancel()
}()
return i.Ingester.starting(ctx)
}
return i.Ingester.starting(&mockContext{ctx: parentCtx, err: i.failingCause})
}

func (i *failingIngester) getInstance(ctx context.Context) *ring.InstanceDesc {
d, err := i.lifecycler.KVStore.Get(ctx, IngesterRingKey)
if err != nil {
return nil
}
instanceDesc, ok := ring.GetOrCreateRingDesc(d).Ingesters[i.lifecycler.ID]
if !ok {
return nil
}
return &instanceDesc
}

func (i *failingIngester) checkRingState(ctx context.Context, t *testing.T, expectedState ring.InstanceState) {
instance := i.getInstance(ctx)
require.NotNil(t, instance)
require.Equal(t, expectedState, instance.GetState())
}

type mockContext struct {
ctx context.Context
err error
}

func (c *mockContext) Deadline() (time.Time, bool) {
return c.ctx.Deadline()
}

func (c *mockContext) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (c *mockContext) Err() error {
return c.err
}

func (c *mockContext) Value(key any) interface{} {
return c.ctx.Value(key)
}

0 comments on commit 1c97e32

Please sign in to comment.