From a375d1768a8add922e65d55968515eea12b86413 Mon Sep 17 00:00:00 2001 From: Dao Thanh Tung Date: Wed, 15 May 2024 12:16:10 +0100 Subject: [PATCH] Add logic to scale to zero on invalid offset even with earliest offsetResetPolicy (#5689) Signed-off-by: dttung2905 --- CHANGELOG.md | 1 + pkg/scalers/apache_kafka_scaler.go | 3 + pkg/scalers/kafka_scaler.go | 3 + .../scalers/apache_kafka/apache_kafka_test.go | 105 +++++++++++++++-- tests/scalers/kafka/kafka_test.go | 109 +++++++++++++++--- 5 files changed, 195 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3bb3b658b0..96338167e70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) +- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) ### Fixes diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index bc13daf3fb9..890cc7d61cf 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -531,6 +531,9 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } producerOffset := producerOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { + if s.metadata.scaleToZeroOnInvalidOffset { + return 0, 0, nil + } return producerOffset, producerOffset, nil } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index ec9c7fc5927..1d05b4a6527 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -805,6 +805,9 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { + if s.metadata.scaleToZeroOnInvalidOffset { + return 0, 0, nil + } return latestOffset, latestOffset, nil } diff --git a/tests/scalers/apache_kafka/apache_kafka_test.go b/tests/scalers/apache_kafka/apache_kafka_test.go index 05de962a9a6..1d4cf3ae48c 100644 --- a/tests/scalers/apache_kafka/apache_kafka_test.go +++ b/tests/scalers/apache_kafka/apache_kafka_test.go @@ -205,7 +205,7 @@ spec: lagThreshold: '1' offsetResetPolicy: 'latest'` - invalidOffsetScaledObjectTemplate = ` + invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -243,6 +243,44 @@ spec: scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` + invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'earliest'` + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -407,8 +445,10 @@ func TestScaler(t *testing.T) { testEarliestPolicy(t, kc, data) testLatestPolicy(t, kc, data) testMultiTopic(t, kc, data) - testZeroOnInvalidOffset(t, kc, data) - testOneOnInvalidOffset(t, kc, data) + testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) testPersistentLag(t, kc, data) testScalingOnlyPartitionsWithLag(t, kc, data) } @@ -509,7 +549,7 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { "replica count should be %d after 2 minute", 2) } -func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { +func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing zeroInvalidOffsetTopic: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue @@ -518,14 +558,54 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa data.ScaleToZeroOnInvalid = StringTrue KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { +func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringFalse + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) +} + +func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing oneInvalidOffsetTopic: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue @@ -534,8 +614,9 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0 + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Should scale to 1 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), @@ -570,7 +651,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData t.Log("--- testing persistentLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) data.Commit = StringTrue data.TopicName = persistentLagTopic @@ -583,7 +664,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData publishMessage(t, persistentLagTopic) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // Recreate Deployment to deliberately assign different consumer group to deployment and scaled object // This is to simulate inability to consume from topic // Scaled Object remains unchanged KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) @@ -613,7 +694,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da t.Log("--- testing limitToPartitionsWithLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) commitPartition(t, limitToPartitionsWithLagTopic, "latest") data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index 550b3fb8138..accf7e293f2 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -204,7 +204,7 @@ spec: lagThreshold: '1' offsetResetPolicy: 'latest'` - invalidOffsetScaledObjectTemplate = ` + invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -242,6 +242,44 @@ spec: scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` + invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'earliest'` + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -406,8 +444,10 @@ func TestScaler(t *testing.T) { testEarliestPolicy(t, kc, data) testLatestPolicy(t, kc, data) testMultiTopic(t, kc, data) - testZeroOnInvalidOffset(t, kc, data) - testOneOnInvalidOffset(t, kc, data) + testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) testPersistentLag(t, kc, data) testScalingOnlyPartitionsWithLag(t, kc, data) } @@ -507,8 +547,8 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { "replica count should be %d after 2 minute", 2) } -func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing zeroInvalidOffsetTopic: scale out ---") +func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = zeroInvalidOffsetTopic @@ -516,15 +556,55 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa data.ScaleToZeroOnInvalid = StringTrue KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing oneInvalidOffsetTopic: scale out ---") +func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringFalse + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) +} + +func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = oneInvalidOffsetTopic @@ -532,8 +612,9 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0 + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Should scale to 1 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), @@ -568,7 +649,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData t.Log("--- testing persistentLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) data.Commit = StringTrue data.TopicName = persistentLagTopic @@ -581,7 +662,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData publishMessage(t, persistentLagTopic) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // Recreate Deployment to deliberately assign different consumer group to deployment and scaled object // This is to simulate inability to consume from topic // Scaled Object remains unchanged KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) @@ -611,7 +692,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da t.Log("--- testing limitToPartitionsWithLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) commitPartition(t, limitToPartitionsWithLagTopic, "latest") data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)