Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester remains in the LEAVING state if starting() terminates #6923

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,14 @@ func (i *Ingester) startingForFlusher(ctx context.Context) error {
return nil
}

func (i *Ingester) starting(ctx context.Context) error {
func (i *Ingester) starting(ctx context.Context) (err error) {
defer func() {
if err != nil {
// if starting() fails for any reason (e.g., context canceled),
// the lifecycler must be stopped.
_ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
}()
if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()
Expand Down
153 changes: 153 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/kv"
dslog "github.com/grafana/dskit/log"
dskit_metrics "github.com/grafana/dskit/metrics"
"github.com/grafana/dskit/middleware"
Expand Down Expand Up @@ -9778,3 +9779,155 @@ func buildSeriesSet(t *testing.T, series *Series) []labels.Labels {
}
return labelSets
}

func TestIngester_Starting(t *testing.T) {
tests := map[string]struct {
failingCause error
expectedLifecyclerStateAfterStarting services.State
expectedRingStateAfterStarting ring.InstanceState
}{
"if starting() runs into context.Canceled, its lifecycler terminates, and its ring state is LEAVING": {
failingCause: context.Canceled,
expectedLifecyclerStateAfterStarting: services.Terminated,
expectedRingStateAfterStarting: ring.LEAVING,
},
"if starting() runs into an error, its lifecycler terminates, and its ring state is LEAVING": {
failingCause: errors.New("this is an error"),
expectedLifecyclerStateAfterStarting: services.Terminated,
expectedRingStateAfterStarting: ring.LEAVING,
},
}

// For each test case we do the following things:
// - We start an ingester without errors, and we show it was not in the ring before,
// but it is in the ring in the ACTIVE state after we started it.
// - We shut down ingester properly, and we show it remains in the ring in the LEAVING state.
// - We try to start the ingester one more time, but we run into the error specified in the test,
// and we show that it remains in the ring in the LEAVING state.
for testName, testCase := range tests {
cfg := defaultIngesterTestConfig(t)
cfg.IngesterRing.UnregisterOnShutdown = false
t.Run(testName, func(t *testing.T) {
// Start the first ingester.
fI1 := newFailingIngester(t, cfg, nil, nil)
kvStore := fI1.kvStore
ctx := context.Background()
// Ensure that ingester fI1 is not in the ring yet.
require.Nil(t, fI1.getInstance(ctx))

fI1.startWaitAndCheck(ctx, t)
// Check that fI1's lifecycler is running, and that fI1 is in the ring with state ACTIVE.
require.Equal(t, services.Running, fI1.lifecycler.State())
fI1.checkRingState(ctx, t, ring.ACTIVE)

fI1.shutDownWaitAndCheck(ctx, t)

// Start the same ingester, but with an error.
fI2 := newFailingIngester(t, cfg, kvStore, testCase.failingCause)
ctx = context.Background()
// Before starting check that ingester fI2 is already in the ring, and its state is LEAVING.
fI2.checkRingState(ctx, t, ring.LEAVING)

fI2.startWaitAndCheck(ctx, t)
require.Equal(t, testCase.expectedLifecyclerStateAfterStarting, fI2.lifecycler.State())
fI2.checkRingState(ctx, t, testCase.expectedRingStateAfterStarting)
})
}
}

type failingIngester struct {
*Ingester
kvStore kv.Client
failingCause error
}

func newFailingIngester(t *testing.T, cfg Config, kvStore kv.Client, failingCause error) *failingIngester {
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
fI := &failingIngester{Ingester: i, failingCause: failingCause}
if kvStore != nil {
fI.kvStore = kvStore
}
fI.BasicService = services.NewBasicService(fI.starting, fI.updateLoop, fI.stopping)
return fI
}

func (i *failingIngester) startWaitAndCheck(ctx context.Context, t *testing.T) {
err := services.StartAndAwaitRunning(ctx, i)
var expectedHealthyIngesters int
if i.failingCause == nil {
require.NoError(t, err)
expectedHealthyIngesters = 1
} else {
require.Error(t, err)
require.ErrorIs(t, err, i.failingCause)
expectedHealthyIngesters = 0
}
test.Poll(t, 100*time.Millisecond, expectedHealthyIngesters, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})
}

func (i *failingIngester) shutDownWaitAndCheck(ctx context.Context, t *testing.T) {
// We properly shut down ingester, and ensure that it lifecycler is terminated,
// but that the ingester stays in the ring with the state LEAVING.
require.NoError(t, services.StopAndAwaitTerminated(ctx, i))
i.checkRingState(ctx, t, ring.LEAVING)
require.Equal(t, services.Terminated, i.lifecycler.BasicService.State())
}

func (i *failingIngester) starting(parentCtx context.Context) error {
if i.failingCause == nil {
return i.Ingester.starting(parentCtx)
}
if errors.Is(i.failingCause, context.Canceled) {
ctx, cancel := context.WithCancel(parentCtx)
go func() {
cancel()
fmt.Println("Context has been canceled")
}()
return i.Ingester.starting(ctx)
}
return i.Ingester.starting(&mockContext{ctx: parentCtx, err: i.failingCause})
}

func (i *failingIngester) getInstance(ctx context.Context) *ring.InstanceDesc {
d, err := i.lifecycler.KVStore.Get(ctx, IngesterRingKey)
if err != nil {
return nil
}
instanceDesc, ok := ring.GetOrCreateRingDesc(d).Ingesters[i.lifecycler.ID]
if !ok {
return nil
}
return &instanceDesc
}

func (i *failingIngester) checkRingState(ctx context.Context, t *testing.T, expectedState ring.InstanceState) {
instance := i.getInstance(ctx)
require.NotNil(t, instance)
require.Equal(t, expectedState, instance.GetState())
}

type mockContext struct {
ctx context.Context
err error
}

func (c *mockContext) Deadline() (time.Time, bool) {
return c.ctx.Deadline()
}

func (c *mockContext) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (c *mockContext) Err() error {
return c.err
}

func (c *mockContext) Value(key any) interface{} {
return c.ctx.Value(key)
}
Loading