diff --git a/CHANGELOG.md b/CHANGELOG.md index cb79df47e..4eb79375f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,7 +75,7 @@ * [FEATURE] Add `middleware.HTTPGRPCTracer` for more detailed server-side tracing spans and tags on `httpgrpc.HTTP/Handle` requests * [FEATURE] Server: Add support for `GrpcInflightMethodLimiter` -- limiting gRPC requests before reading full request into the memory. This can be used to implement global or method-specific inflight limits for gRPC methods. #377 #392 * [FEATURE] Server: Add `-grpc.server.num-workers` flag that configures the `grpc.NumStreamWorkers()` option. This can be used to start a fixed base amount of workers to process gRPC requests and avoid stack allocation for each call. #400 -* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 +* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 #483 * [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477 * [ENHANCEMENT] Add ability to log all source hosts from http header instead of only the first one. #444 * [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients. diff --git a/ring/partition_instance_lifecycler.go b/ring/partition_instance_lifecycler.go new file mode 100644 index 000000000..ac8a65d06 --- /dev/null +++ b/ring/partition_instance_lifecycler.go @@ -0,0 +1,418 @@ +package ring + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" + + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/services" +) + +var ( + ErrPartitionDoesNotExist = errors.New("the partition does not exist") + ErrPartitionStateMismatch = errors.New("the partition state does not match the expected one") + ErrPartitionStateChangeNotAllowed = errors.New("partition state change not allowed") + + allowedPartitionStateChanges = map[PartitionState][]PartitionState{ + PartitionPending: {PartitionActive, PartitionInactive}, + PartitionActive: {PartitionInactive}, + PartitionInactive: {PartitionPending, PartitionActive}, + } +) + +type PartitionInstanceLifecyclerConfig struct { + // PartitionID is the ID of the partition managed by the lifecycler. + PartitionID int32 + + // InstanceID is the ID of the instance managed by the lifecycler. + InstanceID string + + // WaitOwnersCountOnPending is the minimum number of owners to wait before switching a + // PENDING partition to ACTIVE. + WaitOwnersCountOnPending int + + // WaitOwnersDurationOnPending is how long each owner should have been added to the + // partition before it's considered eligible for the WaitOwnersCountOnPending count. + WaitOwnersDurationOnPending time.Duration + + // DeleteInactivePartitionAfterDuration is how long the lifecycler should wait before + // deleting inactive partitions with no owners. Inactive partitions are never removed + // if this value is 0. + DeleteInactivePartitionAfterDuration time.Duration + + // PollingInterval is the internal polling interval. This setting is useful to let + // upstream projects to lower it in unit tests. + PollingInterval time.Duration +} + +// PartitionInstanceLifecycler is responsible to manage the lifecycle of a single +// partition and partition owner in the ring. +type PartitionInstanceLifecycler struct { + *services.BasicService + + // These values are initialised at startup, and never change. + cfg PartitionInstanceLifecyclerConfig + ringName string + ringKey string + store kv.Client + logger log.Logger + + // Channel used to execute logic within the lifecycler loop. + actorChan chan func() + + // Whether the partitions should be created on startup if it doesn't exist yet. + createPartitionOnStartup *atomic.Bool + + // Whether the lifecycler should remove the partition owner (identified by instance ID) on shutdown. + removeOwnerOnShutdown *atomic.Bool + + // Metrics. + reconcilesTotal *prometheus.CounterVec + reconcilesFailedTotal *prometheus.CounterVec +} + +func NewPartitionInstanceLifecycler(cfg PartitionInstanceLifecyclerConfig, ringName, ringKey string, store kv.Client, logger log.Logger, reg prometheus.Registerer) *PartitionInstanceLifecycler { + if cfg.PollingInterval == 0 { + cfg.PollingInterval = 5 * time.Second + } + + l := &PartitionInstanceLifecycler{ + cfg: cfg, + ringName: ringName, + ringKey: ringKey, + store: store, + logger: log.With(logger, "ring", ringName), + actorChan: make(chan func()), + createPartitionOnStartup: atomic.NewBool(true), + removeOwnerOnShutdown: atomic.NewBool(false), + reconcilesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "partition_ring_lifecycler_reconciles_total", + Help: "Total number of reconciliations started.", + ConstLabels: map[string]string{"name": ringName}, + }, []string{"type"}), + reconcilesFailedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "partition_ring_lifecycler_reconciles_failed_total", + Help: "Total number of reconciliations failed.", + ConstLabels: map[string]string{"name": ringName}, + }, []string{"type"}), + } + + l.BasicService = services.NewBasicService(l.starting, l.running, l.stopping) + + return l +} + +// CreatePartitionOnStartup returns whether the lifecycle creates the partition on startup +// if it doesn't exist. +func (l *PartitionInstanceLifecycler) CreatePartitionOnStartup() bool { + return l.createPartitionOnStartup.Load() +} + +// SetCreatePartitionOnStartup sets whether the lifecycler should create the partition on +// startup if it doesn't exist. +func (l *PartitionInstanceLifecycler) SetCreatePartitionOnStartup(create bool) { + l.createPartitionOnStartup.Store(create) +} + +// RemoveOwnerOnShutdown returns whether the lifecycler has been configured to remove the partition +// owner on shutdown. +func (l *PartitionInstanceLifecycler) RemoveOwnerOnShutdown() bool { + return l.removeOwnerOnShutdown.Load() +} + +// SetRemoveOwnerOnShutdown sets whether the lifecycler should remove the partition owner on shutdown. +func (l *PartitionInstanceLifecycler) SetRemoveOwnerOnShutdown(remove bool) { + l.removeOwnerOnShutdown.Store(remove) +} + +// GetPartitionState returns the current state of the partition, and the timestamp when the state was +// changed the last time. +func (l *PartitionInstanceLifecycler) GetPartitionState(ctx context.Context) (PartitionState, time.Time, error) { + ring, err := l.getRing(ctx) + if err != nil { + return PartitionUnknown, time.Time{}, err + } + + partition, exists := ring.Partitions[l.cfg.PartitionID] + if !exists { + return PartitionUnknown, time.Time{}, ErrPartitionDoesNotExist + } + + return partition.GetState(), partition.GetStateTime(), nil +} + +// ChangePartitionState changes the partition state to toState. +// This function returns ErrPartitionDoesNotExist if the partition doesn't exist, +// and ErrPartitionStateChangeNotAllowed if the state change is not allowed. +func (l *PartitionInstanceLifecycler) ChangePartitionState(ctx context.Context, toState PartitionState) error { + return l.runOnLifecyclerLoop(func() error { + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + partition, exists := ring.Partitions[l.cfg.PartitionID] + if !exists { + return false, ErrPartitionDoesNotExist + } + + if partition.State == toState { + return false, nil + } + + if !isPartitionStateChangeAllowed(partition.State, toState) { + return false, errors.Wrapf(ErrPartitionStateChangeNotAllowed, "change partition state from %s to %s", partition.State.CleanName(), toState.CleanName()) + } + + return ring.UpdatePartitionState(l.cfg.PartitionID, toState, time.Now()), nil + }) + + if err != nil { + level.Warn(l.logger).Log("msg", "failed to change partition state", "partition", l.cfg.PartitionID, "to_state", toState, "err", err) + } + + return err + }) +} + +func (l *PartitionInstanceLifecycler) starting(ctx context.Context) error { + if l.CreatePartitionOnStartup() { + return errors.Wrap(l.createPartitionAndRegisterOwner(ctx), "create partition and register owner") + } + + return errors.Wrap(l.waitPartitionAndRegisterOwner(ctx), "wait partition and register owner") +} + +func (l *PartitionInstanceLifecycler) running(ctx context.Context) error { + reconcileTicker := time.NewTicker(l.cfg.PollingInterval) + defer reconcileTicker.Stop() + + for { + select { + case <-reconcileTicker.C: + l.reconcileOwnedPartition(ctx, time.Now()) + l.reconcileOtherPartitions(ctx, time.Now()) + + case f := <-l.actorChan: + f() + + case <-ctx.Done(): + return nil + } + } +} + +func (l *PartitionInstanceLifecycler) stopping(_ error) error { + level.Info(l.logger).Log("msg", "partition ring lifecycler is shutting down", "ring", l.ringName) + + // Remove the instance from partition owners, if configured to do so. + if l.RemoveOwnerOnShutdown() { + err := l.updateRing(context.Background(), func(ring *PartitionRingDesc) (bool, error) { + return ring.RemoveOwner(l.cfg.InstanceID), nil + }) + + if err != nil { + level.Error(l.logger).Log("msg", "failed to remove instance from partition owners on shutdown", "instance", l.cfg.InstanceID, "partition", l.cfg.PartitionID, "err", err) + } else { + level.Info(l.logger).Log("msg", "instance removed from partition owners", "instance", l.cfg.InstanceID, "partition", l.cfg.PartitionID) + } + } + + return nil +} + +// runOnLifecyclerLoop runs fn within the lifecycler loop. +func (l *PartitionInstanceLifecycler) runOnLifecyclerLoop(fn func() error) error { + sc := l.ServiceContext() + if sc == nil { + return errors.New("lifecycler not running") + } + + errCh := make(chan error) + wrappedFn := func() { + errCh <- fn() + } + + select { + case <-sc.Done(): + return errors.New("lifecycler not running") + case l.actorChan <- wrappedFn: + return <-errCh + } +} + +func (l *PartitionInstanceLifecycler) getRing(ctx context.Context) (*PartitionRingDesc, error) { + in, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreatePartitionRingDesc(in), nil +} + +func (l *PartitionInstanceLifecycler) updateRing(ctx context.Context, update func(ring *PartitionRingDesc) (bool, error)) error { + return l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := GetOrCreatePartitionRingDesc(in) + + if changed, err := update(ringDesc); err != nil { + return nil, false, err + } else if !changed { + return nil, false, nil + } + + return ringDesc, true, nil + }) +} + +func (l *PartitionInstanceLifecycler) createPartitionAndRegisterOwner(ctx context.Context) error { + return l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + now := time.Now() + changed := false + + partitionDesc, exists := ring.Partitions[l.cfg.PartitionID] + if exists { + level.Info(l.logger).Log("msg", "partition found in the ring", "partition", l.cfg.PartitionID, "state", partitionDesc.GetState(), "state_timestamp", partitionDesc.GetState().String(), "tokens", len(partitionDesc.GetTokens())) + } else { + level.Info(l.logger).Log("msg", "partition not found in the ring", "partition", l.cfg.PartitionID) + } + + if !exists { + // The partition doesn't exist, so we create a new one. A new partition should always be created + // in PENDING state. + ring.AddPartition(l.cfg.PartitionID, PartitionPending, now) + changed = true + } + + // Ensure the instance is added as partition owner. + if ring.AddOrUpdateOwner(l.cfg.InstanceID, OwnerActive, l.cfg.PartitionID, now) { + changed = true + } + + return changed, nil + }) +} + +func (l *PartitionInstanceLifecycler) waitPartitionAndRegisterOwner(ctx context.Context) error { + pollTicker := time.NewTicker(l.cfg.PollingInterval) + defer pollTicker.Stop() + + // Wait until the partition exists. + checkPartitionExist := func() (bool, error) { + level.Info(l.logger).Log("msg", "checking if the partition exist in the ring", "partition", l.cfg.PartitionID) + + ring, err := l.getRing(ctx) + if err != nil { + return false, errors.Wrap(err, "read partition ring") + } + + if ring.HasPartition(l.cfg.PartitionID) { + level.Info(l.logger).Log("msg", "partition found in the ring", "partition", l.cfg.PartitionID) + return true, nil + } + + level.Info(l.logger).Log("msg", "partition not found in the ring", "partition", l.cfg.PartitionID) + return false, nil + } + + for { + if exists, err := checkPartitionExist(); err != nil { + return err + } else if exists { + break + } + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-pollTicker.C: + // Throttle. + } + } + + // Ensure the instance is added as partition owner. + return l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + return ring.AddOrUpdateOwner(l.cfg.InstanceID, OwnerActive, l.cfg.PartitionID, time.Now()), nil + }) +} + +// reconcileOwnedPartition reconciles the owned partition. +// This function should be called periodically. +func (l *PartitionInstanceLifecycler) reconcileOwnedPartition(ctx context.Context, now time.Time) { + const reconcileType = "owned-partition" + l.reconcilesTotal.WithLabelValues(reconcileType).Inc() + + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + partitionID := l.cfg.PartitionID + + partition, exists := ring.Partitions[partitionID] + if !exists { + return false, ErrPartitionDoesNotExist + } + + // A pending partition should be switched to active if there are enough owners that + // have been added since more than the waiting period. + if partition.IsPending() && ring.PartitionOwnersCountUpdatedBefore(partitionID, now.Add(-l.cfg.WaitOwnersDurationOnPending)) >= l.cfg.WaitOwnersCountOnPending { + level.Info(l.logger).Log("msg", "switching partition state because enough owners have been registered and minimum waiting time has elapsed", "partition", l.cfg.PartitionID, "from_state", PartitionPending, "to_state", PartitionActive) + return ring.UpdatePartitionState(partitionID, PartitionActive, now), nil + } + + return false, nil + }) + + if err != nil { + l.reconcilesFailedTotal.WithLabelValues(reconcileType).Inc() + level.Warn(l.logger).Log("msg", "failed to reconcile owned partition", "partition", l.cfg.PartitionID, "err", err) + } +} + +// reconcileOtherPartitions reconciles other partitions. +// This function should be called periodically. +func (l *PartitionInstanceLifecycler) reconcileOtherPartitions(ctx context.Context, now time.Time) { + const reconcileType = "other-partitions" + l.reconcilesTotal.WithLabelValues(reconcileType).Inc() + + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + changed := false + + if l.cfg.DeleteInactivePartitionAfterDuration > 0 { + deleteBefore := now.Add(-l.cfg.DeleteInactivePartitionAfterDuration) + + for partitionID, partition := range ring.Partitions { + // Never delete the partition owned by this lifecycler, since it's expected to have at least + // this instance as owner. + if partitionID == l.cfg.PartitionID { + continue + } + + // A partition is safe to be removed only if it's inactive since longer than the wait period + // and it has no owners registered. + if partition.IsInactiveSince(deleteBefore) && ring.PartitionOwnersCount(partitionID) == 0 { + level.Info(l.logger).Log("msg", "removing inactive partition with no owners from ring", "partition", partitionID, "state", partition.State.CleanName(), "state_timestamp", partition.GetStateTime().String()) + ring.RemovePartition(partitionID) + changed = true + } + } + } + + return changed, nil + }) + + if err != nil { + l.reconcilesFailedTotal.WithLabelValues(reconcileType).Inc() + level.Warn(l.logger).Log("msg", "failed to reconcile other partitions", "err", err) + } +} + +func isPartitionStateChangeAllowed(from, to PartitionState) bool { + for _, allowed := range allowedPartitionStateChanges[from] { + if to == allowed { + return true + } + } + + return false +} diff --git a/ring/partition_instance_lifecycler_test.go b/ring/partition_instance_lifecycler_test.go new file mode 100644 index 000000000..c719b7f0b --- /dev/null +++ b/ring/partition_instance_lifecycler_test.go @@ -0,0 +1,336 @@ +package ring + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/services" +) + +func TestPartitionInstanceLifecycler(t *testing.T) { + const eventuallyTick = 10 * time.Millisecond + + ctx := context.Background() + logger := log.NewNopLogger() + + t.Run("should wait for the configured minimum number of owners before switching a pending partition to active", func(t *testing.T) { + t.Parallel() + + lifecycler1aConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-a-1") + lifecycler1bConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-b-1") + for _, cfg := range []*PartitionInstanceLifecyclerConfig{&lifecycler1aConfig, &lifecycler1bConfig} { + cfg.WaitOwnersCountOnPending = 2 + cfg.WaitOwnersDurationOnPending = 0 + } + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Start instance-zone-a-1 lifecycler. + lifecycler1a := NewPartitionInstanceLifecycler(lifecycler1aConfig, "test", ringKey, store, logger, nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1a)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1a)) + }) + + // We expect the partition to NOT switch to active. + time.Sleep(2 * lifecycler1aConfig.PollingInterval) + + actual := getPartitionRingFromStore(t, store, ringKey) + assert.Len(t, actual.Partitions, 1) + assert.True(t, actual.HasPartition(1)) + assert.Equal(t, PartitionPending, actual.Partitions[1].State) + assert.ElementsMatch(t, []string{"instance-zone-a-1"}, actual.ownersByPartition()[1]) + + // Start instance-zone-b-1 lifecycler. + lifecycler1b := NewPartitionInstanceLifecycler(lifecycler1bConfig, "test", ringKey, store, logger, nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1b)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1b)) + }) + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.ElementsMatch(t, []string{"instance-zone-a-1", "instance-zone-b-1"}, actual.ownersByPartition()[1]) + + // We expect the partition to switch to active state. + assert.Eventually(t, func() bool { + actual := getPartitionRingFromStore(t, store, ringKey) + return actual.Partitions[1].State == PartitionActive + }, time.Second, eventuallyTick) + }) + + t.Run("should wait for the configured minimum waiting time before switching a pending partition to active", func(t *testing.T) { + t.Parallel() + + lifecyclerConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-1") + lifecyclerConfig.WaitOwnersCountOnPending = 1 + lifecyclerConfig.WaitOwnersDurationOnPending = 2 * time.Second + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Start lifecycler. + startTime := time.Now() + lifecycler := NewPartitionInstanceLifecycler(lifecyclerConfig, "test", ringKey, store, logger, nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) + }) + + actual := getPartitionRingFromStore(t, store, ringKey) + assert.Len(t, actual.Partitions, 1) + assert.True(t, actual.HasPartition(1)) + assert.Equal(t, PartitionPending, actual.Partitions[1].State) + assert.ElementsMatch(t, []string{"instance-1"}, actual.ownersByPartition()[1]) + + // Wait until active. + assert.Eventually(t, func() bool { + actual := getPartitionRingFromStore(t, store, ringKey) + return actual.Partitions[1].State == PartitionActive + }, 2*lifecyclerConfig.WaitOwnersDurationOnPending, eventuallyTick) + + // The partition should have been switch from pending to active after the minimum waiting period. + assert.GreaterOrEqual(t, time.Since(startTime), lifecyclerConfig.WaitOwnersDurationOnPending) + }) + + t.Run("inactive partitions should be removed only if the waiting period passed and there are no owners", func(t *testing.T) { + t.Parallel() + + lifecycler1aConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-a-1") + lifecycler1bConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-b-1") + lifecycler2aConfig := createTestPartitionInstanceLifecyclerConfig(2, "instance-zone-a-2") + for _, cfg := range []*PartitionInstanceLifecyclerConfig{&lifecycler1aConfig, &lifecycler1bConfig, &lifecycler2aConfig} { + cfg.DeleteInactivePartitionAfterDuration = 100 * time.Millisecond + } + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Start all lifecyclers. + lifecycler1a := NewPartitionInstanceLifecycler(lifecycler1aConfig, "test", ringKey, store, logger, nil) + lifecycler1b := NewPartitionInstanceLifecycler(lifecycler1bConfig, "test", ringKey, store, logger, nil) + lifecycler2a := NewPartitionInstanceLifecycler(lifecycler2aConfig, "test", ringKey, store, logger, nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1a)) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1b)) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler2a)) + t.Cleanup(func() { + // Ensure we stop all lifecyclers once the test is terminated. + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1a)) + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1b)) + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler2a)) + }) + + actual := getPartitionRingFromStore(t, store, ringKey) + assert.Len(t, actual.Partitions, 2) + assert.True(t, actual.HasPartition(1)) + assert.True(t, actual.HasPartition(2)) + assert.Equal(t, map[int32][]string{1: {"instance-zone-a-1", "instance-zone-b-1"}, 2: {"instance-zone-a-2"}}, actual.ownersByPartition()) + + // Switch partition 1 to inactive. + require.NoError(t, lifecycler1a.ChangePartitionState(ctx, PartitionInactive)) + + // Wait longer than deletion wait period. We expect that the partition is not + // delete yet because there are still owners. + time.Sleep(2 * lifecycler1aConfig.DeleteInactivePartitionAfterDuration) + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.Len(t, actual.Partitions, 2) + assert.True(t, actual.HasPartition(1)) + assert.True(t, actual.HasPartition(2)) + assert.Equal(t, map[int32][]string{1: {"instance-zone-a-1", "instance-zone-b-1"}, 2: {"instance-zone-a-2"}}, actual.ownersByPartition()) + + // Stop instance-zone-a-1. + lifecycler1a.SetRemoveOwnerOnShutdown(true) + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1a)) + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.Len(t, actual.Partitions, 2) + assert.True(t, actual.HasPartition(1)) + assert.True(t, actual.HasPartition(2)) + assert.Equal(t, map[int32][]string{1: {"instance-zone-b-1"}, 2: {"instance-zone-a-2"}}, actual.ownersByPartition()) + + // Stop instance-zone-b-1. + lifecycler1b.SetRemoveOwnerOnShutdown(true) + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1b)) + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.Equal(t, map[int32][]string{2: {"instance-zone-a-2"}}, actual.ownersByPartition()) + + // We expect remaining lifecyclers to clean up the inactive partition now. + assert.Eventually(t, func() bool { + actual := getPartitionRingFromStore(t, store, ringKey) + return !actual.HasPartition(1) + }, time.Second, eventuallyTick) + + // Partition 2 should still exist. + actual = getPartitionRingFromStore(t, store, ringKey) + assert.True(t, actual.HasPartition(2)) + }) + + t.Run("should not create the partition but wait until partition exists in the ring if lifecycler has been configured to not create partition at startup", func(t *testing.T) { + t.Parallel() + + cfg := createTestPartitionInstanceLifecyclerConfig(1, "instance-1") + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Create the lifecycler. + lifecycler := NewPartitionInstanceLifecycler(cfg, "test", ringKey, store, logger, nil) + lifecycler.SetCreatePartitionOnStartup(false) + + // Start the lifecycler (will block until the partition is created). + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) + }) + }() + + // No matter how long we wait, we expect the lifecycler hasn't been + // started yet and the partition was not created. + time.Sleep(10 * cfg.PollingInterval) + + assert.Equal(t, services.Starting, lifecycler.State()) + actual := getPartitionRingFromStore(t, store, ringKey) + assert.False(t, actual.HasPartition(1)) + assert.Equal(t, map[int32][]string{}, actual.ownersByPartition()) + + // Create the partition. + require.NoError(t, store.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + ring := GetOrCreatePartitionRingDesc(in) + ring.AddPartition(1, PartitionPending, time.Now()) + return ring, true, nil + })) + + // The partition has been created, so we expect the lifecycler to complete the startup. + wg.Wait() + require.Equal(t, services.Running, lifecycler.State()) + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.True(t, actual.HasPartition(1)) + assert.Equal(t, map[int32][]string{1: {"instance-1"}}, actual.ownersByPartition()) + }) + + t.Run("should stop waiting for partition creation if the context gets canceled", func(t *testing.T) { + t.Parallel() + + cfg := createTestPartitionInstanceLifecyclerConfig(1, "instance-1") + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Create the lifecycler. + lifecycler := NewPartitionInstanceLifecycler(cfg, "test", ringKey, store, logger, nil) + lifecycler.SetCreatePartitionOnStartup(false) + + // Start the lifecycler (will block until the partition is created). + startCtx, cancelStart := context.WithCancel(ctx) + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + + err := services.StartAndAwaitRunning(startCtx, lifecycler) + require.ErrorIs(t, err, context.Canceled) + }() + + // No matter how long we wait, we expect the lifecycler hasn't been + // started yet and the partition was not created. + time.Sleep(10 * cfg.PollingInterval) + + assert.Equal(t, services.Starting, lifecycler.State()) + actual := getPartitionRingFromStore(t, store, ringKey) + assert.False(t, actual.HasPartition(1)) + assert.Equal(t, map[int32][]string{}, actual.ownersByPartition()) + + // We expect the service starting to interrupt once we cancel the context. + cancelStart() + wg.Wait() + + actual = getPartitionRingFromStore(t, store, ringKey) + assert.False(t, actual.HasPartition(1)) + assert.Equal(t, map[int32][]string{}, actual.ownersByPartition()) + + assert.Eventually(t, func() bool { + return lifecycler.State() == services.Failed + }, time.Second, eventuallyTick) + }) +} + +func TestPartitionInstanceLifecycler_GetAndChangePartitionState(t *testing.T) { + ctx := context.Background() + + store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Start lifecycler. + cfg := createTestPartitionInstanceLifecyclerConfig(1, "instance-1") + lifecycler := NewPartitionInstanceLifecycler(cfg, "test", ringKey, store, log.NewNopLogger(), nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) + }) + + assertPartitionState := func(expected PartitionState) { + actualState, _, err := lifecycler.GetPartitionState(ctx) + require.NoError(t, err) + assert.Equal(t, expected, actualState) + + actualRing := getPartitionRingFromStore(t, store, ringKey) + assert.Equal(t, expected, actualRing.Partitions[1].State) + } + + // Wait until active. + assert.Eventually(t, func() bool { + actual := getPartitionRingFromStore(t, store, ringKey) + return actual.Partitions[1].State == PartitionActive + }, time.Second, 10*time.Millisecond) + + actualState, _, err := lifecycler.GetPartitionState(ctx) + require.NoError(t, err) + assert.Equal(t, PartitionActive, actualState) + + // A request to switch to state whose transition is not allowed should return error. + require.ErrorIs(t, lifecycler.ChangePartitionState(ctx, PartitionPending), ErrPartitionStateChangeNotAllowed) + assertPartitionState(PartitionActive) + + // Switch to inactive. + require.NoError(t, lifecycler.ChangePartitionState(ctx, PartitionInactive)) + assertPartitionState(PartitionInactive) + + // A request to switch to the same state should be a no-op. + require.NoError(t, lifecycler.ChangePartitionState(ctx, PartitionInactive)) + assertPartitionState(PartitionInactive) +} + +func getPartitionRingFromStore(t *testing.T, store kv.Client, key string) *PartitionRingDesc { + in, err := store.Get(context.Background(), key) + require.NoError(t, err) + + return GetOrCreatePartitionRingDesc(in) +} + +func createTestPartitionInstanceLifecyclerConfig(partitionID int32, instanceID string) PartitionInstanceLifecyclerConfig { + return PartitionInstanceLifecyclerConfig{ + PartitionID: partitionID, + InstanceID: instanceID, + WaitOwnersCountOnPending: 0, + WaitOwnersDurationOnPending: 0, + DeleteInactivePartitionAfterDuration: 0, + PollingInterval: 10 * time.Millisecond, + } +} diff --git a/ring/partition_ring_model.go b/ring/partition_ring_model.go index e397d1915..59c8aab03 100644 --- a/ring/partition_ring_model.go +++ b/ring/partition_ring_model.go @@ -27,7 +27,12 @@ func GetOrCreatePartitionRingDesc(in any) *PartitionRingDesc { return NewPartitionRingDesc() } - return in.(*PartitionRingDesc) + desc := in.(*PartitionRingDesc) + if desc == nil { + return NewPartitionRingDesc() + } + + return desc } func NewPartitionRingDesc() *PartitionRingDesc { @@ -69,6 +74,12 @@ func (m *PartitionRingDesc) ownersByPartition() map[int32][]string { for id, o := range m.Owners { out[o.OwnedPartition] = append(out[o.OwnedPartition], id) } + + // Sort owners to have predictable tests. + for id := range out { + slices.Sort(out[id]) + } + return out } @@ -176,13 +187,24 @@ func (m *PartitionRingDesc) AddOrUpdateOwner(id string, state OwnerState, ownedP } updated.UpdatedTimestamp = now.Unix() - m.Owners[id] = updated + + if m.Owners == nil { + m.Owners = map[string]OwnerDesc{id: updated} + } else { + m.Owners[id] = updated + } + return true } -// RemoveOwner removes a partition owner. -func (m *PartitionRingDesc) RemoveOwner(id string) { +// RemoveOwner removes a partition owner. Returns true if the ring has been changed. +func (m *PartitionRingDesc) RemoveOwner(id string) bool { + if _, ok := m.Owners[id]; !ok { + return false + } + delete(m.Owners, id) + return true } // HasOwner returns whether a owner exists. @@ -191,6 +213,31 @@ func (m *PartitionRingDesc) HasOwner(id string) bool { return ok } +// PartitionOwnersCount returns the number of owners for a given partition. +func (m *PartitionRingDesc) PartitionOwnersCount(partitionID int32) int { + count := 0 + for _, o := range m.Owners { + if o.OwnedPartition == partitionID { + count++ + } + } + return count +} + +// PartitionOwnersCountUpdatedBefore returns the number of owners for a given partition, +// including only owners which have been updated the last time before the input timestamp. +func (m *PartitionRingDesc) PartitionOwnersCountUpdatedBefore(partitionID int32, before time.Time) int { + count := 0 + beforeSeconds := before.Unix() + + for _, o := range m.Owners { + if o.OwnedPartition == partitionID && o.GetUpdatedTimestamp() < beforeSeconds { + count++ + } + } + return count +} + // Merge implements memberlist.Mergeable. func (m *PartitionRingDesc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) { return m.mergeWithTime(mergeable, localCAS, time.Now()) @@ -363,6 +410,14 @@ func (m *PartitionDesc) IsInactive() bool { return m.GetState() == PartitionInactive } +func (m *PartitionDesc) IsInactiveSince(since time.Time) bool { + return m.IsInactive() && m.GetStateTimestamp() < since.Unix() +} + +func (m *PartitionDesc) GetStateTime() time.Time { + return time.Unix(m.GetStateTimestamp(), 0) +} + func (m *PartitionDesc) Clone() PartitionDesc { return *(proto.Clone(m).(*PartitionDesc)) } diff --git a/ring/partition_ring_model_test.go b/ring/partition_ring_model_test.go index 6805c54cf..6eb2c8b4e 100644 --- a/ring/partition_ring_model_test.go +++ b/ring/partition_ring_model_test.go @@ -1031,3 +1031,19 @@ func TestPartitionRingDesc_RemoveTombstones(t *testing.T) { assert.False(t, desc.HasOwner("owner-3")) }) } + +func TestPartitionRingDesc_PartitionOwnersCountUpdatedBefore(t *testing.T) { + now := time.Now() + + desc := NewPartitionRingDesc() + desc.AddPartition(1, PartitionActive, now) + desc.AddPartition(2, PartitionActive, now) + desc.AddOrUpdateOwner("owner-1-a", OwnerActive, 1, now) + desc.AddOrUpdateOwner("owner-1-b", OwnerActive, 1, now.Add(-1*time.Second)) + desc.AddOrUpdateOwner("owner-1-c", OwnerActive, 1, now.Add(-2*time.Second)) + desc.AddOrUpdateOwner("owner-2-a", OwnerActive, 2, now.Add(-3*time.Second)) + + assert.Equal(t, 2, desc.PartitionOwnersCountUpdatedBefore(1, now)) + assert.Equal(t, 1, desc.PartitionOwnersCountUpdatedBefore(1, now.Add(-1*time.Second))) + assert.Equal(t, 0, desc.PartitionOwnersCountUpdatedBefore(1, now.Add(-2*time.Second))) +}