From 07f7ba35451af154ecd4c23199a640e7295cf688 Mon Sep 17 00:00:00 2001 From: jmadajczak Date: Mon, 20 Mar 2023 13:13:21 +0100 Subject: [PATCH] feat: Allow scaling using redis stream length (#4277) Signed-off-by: jmadajczak --- CHANGELOG.md | 1 + pkg/scalers/redis_streams_scaler.go | 139 ++++++---- pkg/scalers/redis_streams_scaler_test.go | 96 ++++++- pkg/scaling/scalers_builder.go | 6 +- .../redis_cluster_streams_length_test.go | 229 +++++++++++++++++ .../redis_sentinel_streams_length_test.go | 243 ++++++++++++++++++ .../redis_standalone_streams_length_test.go | 227 ++++++++++++++++ 7 files changed, 886 insertions(+), 55 deletions(-) create mode 100644 tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go create mode 100644 tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go create mode 100644 tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ed5eb5b591f..3d7c81489e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269)) - TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277)) ### Improvements diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 6d481cb7860..b012d77a5bb 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -14,13 +14,20 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) +type scaleFactor int8 + +const ( + xPendingFactor scaleFactor = iota + 1 + xLengthFactor +) + const ( // defaults - defaultTargetPendingEntriesCount = 5 - defaultDBIndex = 0 + defaultDBIndex = 0 // metadata names pendingEntriesCountMetadata = "pendingEntriesCount" + streamLengthMetadata = "streamLength" streamNameMetadata = "stream" consumerGroupNameMetadata = "consumerGroup" usernameMetadata = "username" @@ -30,15 +37,17 @@ const ( ) type redisStreamsScaler struct { - metricType v2.MetricTargetType - metadata *redisStreamsMetadata - closeFn func() error - getPendingEntriesCountFn func(ctx context.Context) (int64, error) - logger logr.Logger + metricType v2.MetricTargetType + metadata *redisStreamsMetadata + closeFn func() error + getEntriesCountFn func(ctx context.Context) (int64, error) + logger logr.Logger } type redisStreamsMetadata struct { + scaleFactor scaleFactor targetPendingEntriesCount int64 + targetStreamLength int64 streamName string consumerGroupName string databaseIndex int @@ -89,21 +98,15 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe return nil } - pendingEntriesCountFn := func(ctx context.Context) (int64, error) { - pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() - if err != nil { - return -1, err - } - return pendingEntries.Count, nil - } + entriesCountFn, err := createEntriesCountFn(client, meta) return &redisStreamsScaler{ - metricType: metricType, - metadata: meta, - closeFn: closeFn, - getPendingEntriesCountFn: pendingEntriesCountFn, - logger: logger, - }, nil + metricType: metricType, + metadata: meta, + closeFn: closeFn, + getEntriesCountFn: entriesCountFn, + logger: logger, + }, err } func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { @@ -133,29 +136,50 @@ func createScaler(client *redis.Client, meta *redisStreamsMetadata, metricType v return nil } - pendingEntriesCountFn := func(ctx context.Context) (int64, error) { - pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() - if err != nil { - return -1, err - } - return pendingEntries.Count, nil - } + pendingEntriesCountFn, err := createEntriesCountFn(client, meta) return &redisStreamsScaler{ - metricType: metricType, - metadata: meta, - closeFn: closeFn, - getPendingEntriesCountFn: pendingEntriesCountFn, - logger: logger, - }, nil + metricType: metricType, + metadata: meta, + closeFn: closeFn, + getEntriesCountFn: pendingEntriesCountFn, + logger: logger, + }, err +} + +func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (entriesCountFn func(ctx context.Context) (int64, error), err error) { + switch meta.scaleFactor { + case xPendingFactor: + entriesCountFn = func(ctx context.Context) (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil + } + case xLengthFactor: + entriesCountFn = func(ctx context.Context) (int64, error) { + entriesLength, err := client.XLen(ctx, meta.streamName).Result() + if err != nil { + return -1, err + } + return entriesLength, nil + } + default: + err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor) + } + return } var ( - // ErrRedisMissingPendingEntriesCount is returned when "pendingEntriesCount" is missing. - ErrRedisMissingPendingEntriesCount = errors.New("missing pending entries count") + // ErrRedisMissingPendingEntriesCountOrStreamLength is returned when "pendingEntriesCount" is missing. + ErrRedisMissingPendingEntriesCountOrStreamLength = errors.New("missing pending entries count or stream length") // ErrRedisMissingStreamName is returned when "stream" is missing. ErrRedisMissingStreamName = errors.New("missing redis stream name") + + // ErrRedisMissingConsumerGroupName is returned when "consumerGroup" is missing but "pendingEntriesCount" is passed. + ErrRedisMissingConsumerGroupName = errors.New("missing redis stream consumer group name") ) func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) (*redisStreamsMetadata, error) { @@ -185,28 +209,34 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) meta.connectionInfo.unsafeSsl = parsedVal } - meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount + if val, ok := config.TriggerMetadata[streamNameMetadata]; ok { + meta.streamName = val + } else { + return nil, ErrRedisMissingStreamName + } if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok { + meta.scaleFactor = xPendingFactor pendingEntriesCount, err := strconv.ParseInt(val, 10, 64) if err != nil { return nil, fmt.Errorf("error parsing pending entries count: %w", err) } meta.targetPendingEntriesCount = pendingEntriesCount - } else { - return nil, ErrRedisMissingPendingEntriesCount - } - if val, ok := config.TriggerMetadata[streamNameMetadata]; ok { - meta.streamName = val - } else { - return nil, ErrRedisMissingStreamName - } - - if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok { - meta.consumerGroupName = val + if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok { + meta.consumerGroupName = val + } else { + return nil, ErrRedisMissingConsumerGroupName + } + } else if val, ok = config.TriggerMetadata[streamLengthMetadata]; ok { + meta.scaleFactor = xLengthFactor + streamLength, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing stream length: %w", err) + } + meta.targetStreamLength = streamLength } else { - return nil, fmt.Errorf("missing redis stream consumer group name") + return nil, ErrRedisMissingPendingEntriesCountOrStreamLength } meta.databaseIndex = defaultDBIndex @@ -228,11 +258,20 @@ func (s *redisStreamsScaler) Close(context.Context) error { // GetMetricSpecForScaling returns the metric spec for the HPA func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + var metricValue int64 + + switch s.metadata.scaleFactor { + case xPendingFactor: + metricValue = s.metadata.targetPendingEntriesCount + case xLengthFactor: + metricValue = s.metadata.targetStreamLength + } + externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.streamName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetPendingEntriesCount), + Target: GetMetricTarget(s.metricType, metricValue), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -240,7 +279,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri // GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx) + pendingEntriesCount, err := s.getEntriesCountFn(ctx) if err != nil { s.logger.Error(err, "error fetching pending entries count") diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 2eadc19ad3e..ac13a018b08 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -196,11 +196,12 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "stream": "my-stream", }, wantMeta: nil, - wantErr: ErrRedisMissingPendingEntriesCount, + wantErr: ErrRedisMissingPendingEntriesCountOrStreamLength, }, { name: "invalid pending entries count", metadata: map[string]string{ + "stream": "my-stream", "hosts": "a, b, c", "ports": "1, 2, 3", "pendingEntriesCount": "invalid", @@ -225,6 +226,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { connectionInfo: redisConnectionInfo{ addresses: []string{":7001", ":7002"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -248,6 +250,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { hosts: []string{"a", "b", "c"}, ports: []string{"1", "2", "3"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -273,6 +276,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -297,6 +301,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -322,6 +327,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -347,6 +353,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "password", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -372,6 +379,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -400,6 +408,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: false, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -429,6 +438,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: true, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -494,11 +504,12 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "stream": "my-stream", }, wantMeta: nil, - wantErr: ErrRedisMissingPendingEntriesCount, + wantErr: ErrRedisMissingPendingEntriesCountOrStreamLength, }, { name: "invalid pending entries count", metadata: map[string]string{ + "stream": "my-stream", "hosts": "a, b, c", "ports": "1, 2, 3", "pendingEntriesCount": "invalid", @@ -523,6 +534,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { connectionInfo: redisConnectionInfo{ addresses: []string{":7001", ":7002"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -546,6 +558,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { hosts: []string{"a", "b", "c"}, ports: []string{"1", "2", "3"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -571,6 +584,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -595,6 +609,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -620,6 +635,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -645,6 +661,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "password", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -670,6 +687,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -695,6 +713,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "sentinelUsername", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -719,6 +738,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "sentinelUsername", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -744,6 +764,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -769,6 +790,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelPassword: "sentinelPassword", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -794,6 +816,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelPassword: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -819,6 +842,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "sentinelMaster", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -843,6 +867,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "sentinelMaster", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -868,6 +893,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -896,6 +922,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: false, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -925,9 +952,74 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: true, }, + scaleFactor: xPendingFactor, + }, + wantErr: nil, + }, + { + name: "streamLength passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetStreamLength: 15, + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1"}, + hosts: []string{"a"}, + ports: []string{"1"}, + password: "", + enableTLS: false, + unsafeSsl: false, + }, + scaleFactor: xLengthFactor, + }, + wantErr: nil, + }, + { + name: "streamLength, pendingEntriesCount and consumerGroup passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + "pendingEntriesCount": "30", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 30, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1"}, + hosts: []string{"a"}, + ports: []string{"1"}, + password: "", + enableTLS: false, + unsafeSsl: false, + }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, + { + name: "streamLength, pendingEntriesCount and consumerGroup passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + "pendingEntriesCount": "30", + }, + authParams: map[string]string{}, + wantMeta: nil, + wantErr: ErrRedisMissingConsumerGroupName, + }, } for _, testCase := range cases { diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 4f198040558..b10c2b9546c 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -30,9 +30,9 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/resolver" ) -/// --------------------------------------------------------------------------- /// -/// ---------- Scaler-Building related methods --------- /// -/// --------------------------------------------------------------------------- /// +// / --------------------------------------------------------------------------- /// +// / ---------- Scaler-Building related methods --------- /// +// / --------------------------------------------------------------------------- /// // buildScalers returns list of Scalers for the specified triggers func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]cache.ScalerBuilder, error) { diff --git a/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go b/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go new file mode 100644 index 00000000000..ba0a910c9c5 --- /dev/null +++ b/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go @@ -0,0 +1,229 @@ +//go:build e2e +// +build e2e + +package redis_cluster_streams_length_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-cluster-streams-length-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisHost = fmt.Sprintf("%s-headless", testName) + minReplicaCount = 1 + maxReplicaCount = 5 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisHost string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-cluster-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-cluster-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + stream: my-stream + streamLength: "15" + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-cluster-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Cluster + redis.InstallCluster(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveCluster(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisHost: redisHost, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go b/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go new file mode 100644 index 00000000000..2b691978639 --- /dev/null +++ b/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go @@ -0,0 +1,243 @@ +//go:build e2e +// +build e2e + +package redis_sentinel_streams_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-sentinel-streams-test-length" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisHost = fmt.Sprintf("%s-headless", testName) + minReplicaCount = 1 + maxReplicaCount = 4 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisHost string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-sentinel-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_MASTER + value: mymaster +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password + - parameter: sentinelPassword + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-sentinel-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + stream: my-stream + streamLength: "10" + sentinelMaster: mymaster + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-sentinel-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_MASTER + value: mymaster + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Sentinel + redis.InstallSentinel(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveSentinel(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisHost: redisHost, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go b/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go new file mode 100644 index 00000000000..3755f3985c3 --- /dev/null +++ b/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go @@ -0,0 +1,227 @@ +//go:build e2e +// +build e2e + +package redis_standalone_streams_length_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-standalone-streams-length-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisStreamName = "stream" + redisAddress = fmt.Sprintf("redis.%s.svc.cluster.local:6379", redisNamespace) + minReplicaCount = 1 + maxReplicaCount = 4 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisStreamName string + RedisAddress string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_ADDRESS + value: {{.RedisAddress}} + - name: REDIS_STREAM_NAME + value: {{.RedisStreamName}} + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: "consumer-group-1" +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-streams + metadata: + addressFromEnv: REDIS_ADDRESS + stream: {{.RedisStreamName}} + streamLength: "10" + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-streams-length:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_ADDRESS + value: {{.RedisAddress}} + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_STREAM_NAME + value: {{.RedisStreamName}} + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Standalone + redis.InstallStandalone(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveStandalone(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisStreamName: redisStreamName, + RedisAddress: redisAddress, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +}