Skip to content

Commit

Permalink
feat: Allow scaling using redis stream length (kedacore#4277)
Browse files Browse the repository at this point in the history
Signed-off-by: jmadajczak <jmadajczak@transparentdata.pl>
  • Loading branch information
jmadajczak committed Mar 20, 2023
1 parent b1b1be1 commit 07f7ba3
Show file tree
Hide file tree
Showing 7 changed files with 886 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))

### Improvements

Expand Down
139 changes: 89 additions & 50 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type scaleFactor int8

const (
xPendingFactor scaleFactor = iota + 1
xLengthFactor
)

const (
// defaults
defaultTargetPendingEntriesCount = 5
defaultDBIndex = 0
defaultDBIndex = 0

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
Expand All @@ -30,15 +37,17 @@ const (
)

type redisStreamsScaler struct {
metricType v2.MetricTargetType
metadata *redisStreamsMetadata
closeFn func() error
getPendingEntriesCountFn func(ctx context.Context) (int64, error)
logger logr.Logger
metricType v2.MetricTargetType
metadata *redisStreamsMetadata
closeFn func() error
getEntriesCountFn func(ctx context.Context) (int64, error)
logger logr.Logger
}

type redisStreamsMetadata struct {
scaleFactor scaleFactor
targetPendingEntriesCount int64
targetStreamLength int64
streamName string
consumerGroupName string
databaseIndex int
Expand Down Expand Up @@ -89,21 +98,15 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe
return nil
}

pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
entriesCountFn, err := createEntriesCountFn(client, meta)

return &redisStreamsScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getPendingEntriesCountFn: pendingEntriesCountFn,
logger: logger,
}, nil
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getEntriesCountFn: entriesCountFn,
logger: logger,
}, err
}

func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
Expand Down Expand Up @@ -133,29 +136,50 @@ func createScaler(client *redis.Client, meta *redisStreamsMetadata, metricType v
return nil
}

pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
pendingEntriesCountFn, err := createEntriesCountFn(client, meta)

return &redisStreamsScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getPendingEntriesCountFn: pendingEntriesCountFn,
logger: logger,
}, nil
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getEntriesCountFn: pendingEntriesCountFn,
logger: logger,
}, err
}

func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (entriesCountFn func(ctx context.Context) (int64, error), err error) {
switch meta.scaleFactor {
case xPendingFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
case xLengthFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
entriesLength, err := client.XLen(ctx, meta.streamName).Result()
if err != nil {
return -1, err
}
return entriesLength, nil
}
default:
err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor)
}
return
}

var (
// ErrRedisMissingPendingEntriesCount is returned when "pendingEntriesCount" is missing.
ErrRedisMissingPendingEntriesCount = errors.New("missing pending entries count")
// ErrRedisMissingPendingEntriesCountOrStreamLength is returned when "pendingEntriesCount" is missing.
ErrRedisMissingPendingEntriesCountOrStreamLength = errors.New("missing pending entries count or stream length")

// ErrRedisMissingStreamName is returned when "stream" is missing.
ErrRedisMissingStreamName = errors.New("missing redis stream name")

// ErrRedisMissingConsumerGroupName is returned when "consumerGroup" is missing but "pendingEntriesCount" is passed.
ErrRedisMissingConsumerGroupName = errors.New("missing redis stream consumer group name")
)

func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) (*redisStreamsMetadata, error) {
Expand Down Expand Up @@ -185,28 +209,34 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)
meta.connectionInfo.unsafeSsl = parsedVal
}

meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount
if val, ok := config.TriggerMetadata[streamNameMetadata]; ok {
meta.streamName = val
} else {
return nil, ErrRedisMissingStreamName
}

if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
meta.scaleFactor = xPendingFactor
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
} else {
return nil, ErrRedisMissingPendingEntriesCount
}

if val, ok := config.TriggerMetadata[streamNameMetadata]; ok {
meta.streamName = val
} else {
return nil, ErrRedisMissingStreamName
}

if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
} else {
return nil, ErrRedisMissingConsumerGroupName
}
} else if val, ok = config.TriggerMetadata[streamLengthMetadata]; ok {
meta.scaleFactor = xLengthFactor
streamLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing stream length: %w", err)
}
meta.targetStreamLength = streamLength
} else {
return nil, fmt.Errorf("missing redis stream consumer group name")
return nil, ErrRedisMissingPendingEntriesCountOrStreamLength
}

meta.databaseIndex = defaultDBIndex
Expand All @@ -228,19 +258,28 @@ func (s *redisStreamsScaler) Close(context.Context) error {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricValue int64

switch s.metadata.scaleFactor {
case xPendingFactor:
metricValue = s.metadata.targetPendingEntriesCount
case xLengthFactor:
metricValue = s.metadata.targetStreamLength
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.streamName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetPendingEntriesCount),
Target: GetMetricTarget(s.metricType, metricValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream
func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx)
pendingEntriesCount, err := s.getEntriesCountFn(ctx)

if err != nil {
s.logger.Error(err, "error fetching pending entries count")
Expand Down
Loading

0 comments on commit 07f7ba3

Please sign in to comment.